cosmos 在 java 中更改 feed 监听器

问题描述 投票:0回答:1

我正在 Java 中实现更改源侦听器。我不想使用功能应用程序。 实现已接近完成,但未运行并抛出错误:

leaseClient:必须启用写入设置时的内容响应

我手动创建了租赁容器,它具有所有访问权限。

@Component
public class ChangeFeedListener {

    private CosmosAsyncContainer sourceContainer;
    private CosmosAsyncContainer targetContainer;
    private CosmosAsyncContainer leaseContainer;

    @Autowired
    public ChangeFeedListener(CosmosAsyncDatabase cosmosAsyncDatabase) {
        this.sourceContainer = cosmosAsyncDatabase.getContainer("sourceContainer");
        this.targetContainer = cosmosAsyncDatabase.getContainer("targetContainer");
        this.leaseContainer = cosmosAsyncDatabase.getContainer("leaseContainer");
    }

    @EventListener({ApplicationStartedEvent.class})
    public void initialize() {

        // Create change feed processor options
        ChangeFeedProcessorOptions changeFeedProcessorOptions = new ChangeFeedProcessorOptions();
        // Create change feed processor
        ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder()
                .hostName(UUID.randomUUID().toString())
                .feedContainer(sourceContainer)
                .handleChanges((List<JsonNode> docs) -> {
                    // Process each changed document
                    for (JsonNode document : docs) {
                        System.out.println("Processing document: " + document);
                        // Write the changed document to the target container
                        targetContainer.createItem(document).block();
                        System.out.println("Document written to target container.");
                    }
                })
                .leaseContainer(leaseContainer)
                .options(changeFeedProcessorOptions)
                .buildChangeFeedProcessor();

        // Start change feed processor
        changeFeedProcessor.start();
    }
}

有人可以帮我启动并运行这段代码吗? 谢谢

java azure-cosmosdb azure-java-sdk azure-cosmosdb-changefeed
1个回答
0
投票

参考:https://github.com/Azure/azure-sdk-for-java/blob/0ad1e12ce687547139be4a7da5b06fbc232aa5a4/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/ epkversion/ChangeFeedProcessorImplBase.java#L149

这意味着你的客户端需要在写操作上启用内容内容,这是Java中的

CosmosClientBuilder

 CosmosAsyncClient client = new CosmosClientBuilder()
         .endpoint(serviceEndpoint)
         .key(key)
         // other settings...
         // This one below
         .contentResponseOnWriteEnabled(true)
         .buildAsyncClient();
© www.soinside.com 2019 - 2024. All rights reserved.