更改 Feed 处理器问题

问题描述 投票:0回答:2

我正在尝试在.net 中测试更改提要处理器。我从一开始就尝试使用更改源处理器(如文档中所述)。当我启动更改源处理器时,它会按预期运行,如果我在 COSMOS 数据库中进行更改,它会触发 HandleChanges 方法。我想测试一种场景:我已在本地停止了更改源处理器,对 cosmos db 进行了 2 项更改并启动了处理器,这次处理器仅选择最新的更改。为什么是这样?我在代码中遗漏了什么吗?

这是我的代码:

public class ChangeFeedListener 
{
    private static CosmosClient _cosmosClient;
    private static Database _productDatabase;
    private static Container _productContainer;
    private static Container _productLeaseContainer;
    private IAuditMessenger _auditMessenger = null;
    private TelemetryClient _telemetryClient = null;

    public ChangeFeedListener(IAuditMessenger auditMessenger, TelemetryClient telemetryClient)
    {
        _auditMessenger = auditMessenger;
        _telemetryClient = telemetryClient;
    }

    public async Task StartListener(CancellationToken cancellationToken)
    {
        _cosmosClient = new CosmosClient(Config.CosmosConfig.ConnectionString);
        _productDatabase = _cosmosClient.GetDatabase(Config.CosmosConfig.CosmosDB);
        _productContainer = _productDatabase.GetContainer(Config.CosmosConfig.TriggerContainer);
        _productLeaseContainer = _productDatabase.GetContainer(Config.CosmosConfig.LeaseContainer);

        await StartChangeFeedProcessorAsync(_cosmosClient);
    }

    private async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(CosmosClient cosmosClient)
    {
        Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
        {
            if (exception is ChangeFeedProcessorUserException userException)
            {
                //handle user exception
            }
            else
            {
                //handle other exceptions
            }

            return Task.CompletedTask;
        };

        Container leaseContainer = cosmosClient.GetContainer(_productDatabase.Id, _productLeaseContainer.Id);
        string processorName = "abc";
        string instanceName = "test";

        ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(_productDatabase.Id, _productContainer.Id)
            .GetChangeFeedProcessorBuilder<ABCItem>(processorName, HandleChangesAsync)
            .WithErrorNotification(onErrorAsync)
            .WithInstanceName(instanceName)
            .WithLeaseContainer(leaseContainer)
            .WithStartTime(DateTime.MinValue.ToUniversalTime())
            .Build();

        await changeFeedProcessor.StartAsync();
        return changeFeedProcessor;
    }

    private async Task HandleChangesAsync(IReadOnlyCollection<ABCItem> changes, CancellationToken cancellationToken)
    {
        //handler code
    }
}
.net azure azure-cosmosdb azure-cosmosdb-changefeed
2个回答
0
投票

我假设您在同一个文档中进行了更改,如果您在同一时间段内对同一个文档进行了更改,更改源只会向您发送最新的更改,而不是所有更改。

如果您更改多个文档,您将按照更改顺序依次看到每个文档的更改。

这是一个工作示例

[FunctionName("IntermediateDataChangeFeedListener")]
        public async Task Run([CosmosDBTrigger(
            databaseName: "productdata",
            collectionName: "intermediatedata",
            ConnectionStringSetting = "cosmosDBConnectionString",
            LeaseCollectionName = "lease")]IReadOnlyList<Document> input, ILogger log)
        {
            try
            {
                if (input == null || input.Count == 0)
                {
                    return;
                }

                foreach (var doc in input)
                {
                    var documentType = doc.GetPropertyValue<string>("documentType");
                    if (documentType == null)
                    {
                        log.LogWarning($"{nameof(IntermediateDataChangeFeedListener)}. " +
                            $"DocumentType missing. Document {doc.Id} will not be processed.");
                        continue;
                    }

                    if (!_processors.TryGetValue(documentType, out var processor))
                    {
                        log.LogWarning($"{nameof(IntermediateDataChangeFeedListener)}. " +
                            $"Handler not found. Document of type {documentType} will not be processed.");
                        continue;
                    }

                    await processor.Process(doc.ToString(), log);
                }
            }
            catch (Exception ex)
            {
                log.LogError($"Fail to process change feed in intermediatedata container. Message: {ex.Message}");
                throw;
            }
        }

0
投票

当我使用 Service Fabric Explorer 时,即使在停止本地调试后,侦听器仍继续在后台运行,因此它在后台处理它们。没有数据丢失,并且运行良好。 即使在我停止监听器之后,当 TS 的继续标记似乎发生变化时,我意识到了这一点

© www.soinside.com 2019 - 2024. All rights reserved.