我正在尝试在.net 中测试更改提要处理器。我从一开始就尝试使用更改源处理器(如文档中所述)。当我启动更改源处理器时,它会按预期运行,如果我在 COSMOS 数据库中进行更改,它会触发 HandleChanges 方法。我想测试一种场景:我已在本地停止了更改源处理器,对 cosmos db 进行了 2 项更改并启动了处理器,这次处理器仅选择最新的更改。为什么是这样?我在代码中遗漏了什么吗?
这是我的代码:
public class ChangeFeedListener
{
private static CosmosClient _cosmosClient;
private static Database _productDatabase;
private static Container _productContainer;
private static Container _productLeaseContainer;
private IAuditMessenger _auditMessenger = null;
private TelemetryClient _telemetryClient = null;
public ChangeFeedListener(IAuditMessenger auditMessenger, TelemetryClient telemetryClient)
{
_auditMessenger = auditMessenger;
_telemetryClient = telemetryClient;
}
public async Task StartListener(CancellationToken cancellationToken)
{
_cosmosClient = new CosmosClient(Config.CosmosConfig.ConnectionString);
_productDatabase = _cosmosClient.GetDatabase(Config.CosmosConfig.CosmosDB);
_productContainer = _productDatabase.GetContainer(Config.CosmosConfig.TriggerContainer);
_productLeaseContainer = _productDatabase.GetContainer(Config.CosmosConfig.LeaseContainer);
await StartChangeFeedProcessorAsync(_cosmosClient);
}
private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(CosmosClient cosmosClient)
{
Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
if (exception is ChangeFeedProcessorUserException userException)
{
//handle user exception
}
else
{
//handle other exceptions
}
return Task.CompletedTask;
};
Container leaseContainer = cosmosClient.GetContainer(_productDatabase.Id, _productLeaseContainer.Id);
string processorName = "abc";
string instanceName = "test";
ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(_productDatabase.Id, _productContainer.Id)
.GetChangeFeedProcessorBuilder<ABCItem>(processorName, HandleChangesAsync)
.WithErrorNotification(onErrorAsync)
.WithInstanceName(instanceName)
.WithLeaseContainer(leaseContainer)
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.Build();
await changeFeedProcessor.StartAsync();
return changeFeedProcessor;
}
private async Task HandleChangesAsync(IReadOnlyCollection<ABCItem> changes, CancellationToken cancellationToken)
{
//handler code
}
}
我假设您在同一个文档中进行了更改,如果您在同一时间段内对同一个文档进行了更改,更改源只会向您发送最新的更改,而不是所有更改。
如果您更改多个文档,您将按照更改顺序依次看到每个文档的更改。
这是一个工作示例
[FunctionName("IntermediateDataChangeFeedListener")]
public async Task Run([CosmosDBTrigger(
databaseName: "productdata",
collectionName: "intermediatedata",
ConnectionStringSetting = "cosmosDBConnectionString",
LeaseCollectionName = "lease")]IReadOnlyList<Document> input, ILogger log)
{
try
{
if (input == null || input.Count == 0)
{
return;
}
foreach (var doc in input)
{
var documentType = doc.GetPropertyValue<string>("documentType");
if (documentType == null)
{
log.LogWarning($"{nameof(IntermediateDataChangeFeedListener)}. " +
$"DocumentType missing. Document {doc.Id} will not be processed.");
continue;
}
if (!_processors.TryGetValue(documentType, out var processor))
{
log.LogWarning($"{nameof(IntermediateDataChangeFeedListener)}. " +
$"Handler not found. Document of type {documentType} will not be processed.");
continue;
}
await processor.Process(doc.ToString(), log);
}
}
catch (Exception ex)
{
log.LogError($"Fail to process change feed in intermediatedata container. Message: {ex.Message}");
throw;
}
}
当我使用 Service Fabric Explorer 时,即使在停止本地调试后,侦听器仍继续在后台运行,因此它在后台处理它们。没有数据丢失,并且运行良好。 即使在我停止监听器之后,当 TS 的继续标记似乎发生变化时,我意识到了这一点