我正在使用 Azure Cosmos DB NoSQL API,需要使用 Java SDK 在指定日期范围内或两个连续令牌之间重新处理来自更改源的更改。
根据文档,我没有找到按日期范围过滤更改的直接选项。我的用例需要检索特定时间戳之间发生的更改或通过提供两个延续令牌来检索。
有没有办法使用Java SDK来实现这个功能,或者有其他方法来实现这个功能?任何指导或建议将不胜感激。
我已经实现如下,但根据Azure文档lsn将被标记到多个更新的项目,我知道我们可以使用setStartTime,那么endTime(如果可能的话,我需要更新到未来日期的文档)或者我可以在比较两个 continuationToken 的黑白方式。
@Override
public Boolean call() throws Exception {
ChangeFeedProcessorOptions options = new ChangeFeedProcessorOptions();
AtomicReference<ChangeFeedProcessor> changeFeedProcessorRef = new AtomicReference<>();
ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder()
.hostName("Reprocess_LocalHost")
.feedContainer(feedContainerId)
.leaseContainer(leaseContainerId)
.options(options)
.handleChanges(changes -> {
System.out.println("Start");
for (JsonNode document : changes) {
long currentLsn = document.get("_lsn").asLong();
if (currentLsn <= toTokenLsn) {
System.out.println(" handleChanges() RECEIVED (within range): " + document.toPrettyString());
}
if (currentLsn >= toTokenLsn) {
System.out.println("Reached the specified toToken LSN: " + toTokenLsn + ". Stopping processing.");
ChangeFeedProcessor processorInstance = changeFeedProcessorRef.get();
if (processorInstance != null) {
processorInstance.stop()
.subscribeOn(Schedulers.boundedElastic())
.doOnSuccess(avoid -> System.out.println("Change Feed Processor has been stopped successfully."))
.doOnError(error -> System.err.println("Failed to stop the Change Feed Processor: " + error))
.subscribe();
}
return;
}
}
System.out.println("End");
})
.buildChangeFeedProcessor();
changeFeedProcessorRef.set(changeFeedProcessor);
changeFeedProcessor.start().block();
return true;
}