System.Reactive,一次处理一个 Azure 队列消息

问题描述 投票:0回答:1

在这方面遇到了一点麻烦,希望你们能帮忙...我正在使用 C# System.Reactive。我有一个可观察的,它只是一个从天蓝色存储队列获取天蓝色队列消息的时间间隔,一次一个。看起来像这样:

public IObservable<QueueMessage> On(
    string queueName, 
    int poleIntervalSeconds = 5
)
{
    return Observable
        .Interval(TimeSpan.FromSeconds(poleIntervalSeconds))
        .SelectMany(async _ =>
        {
            IAzureQueueAdapter queueAdapter = ConnectToQueue(queueName);
            Response<QueueMessage[]> response = await queueAdapter.ReceiveMessagesAsync(1);
            return response.Value;
        })
        .SelectMany(messages => messages)
        .Do(async queueMessage =>
        {
            IAzureQueueAdapter queueAdapter = ConnectToQueue(queueName);
            await queueAdapter.DeleteMessageAsync(queueMessage.MessageId, queueMessage.PopReceipt);
        });
}

// Usage

IObservable<ProcessingCompletedEvent> processingCompletedEvent = eventAggregator
    .On<ProcessingCompletedEvent>(); // Not the same On

IObservable<QueueMessage> queueMessageObservable = queueAdapter.On(queueName, 5);

它相当简单,对于队列消息(Windows 服务应用程序)的直接异步处理,这非常有效。不过,我现在有一个使用第三方 C++ SDK 的外部进程,我必须使用 pinvoke 来执行该进程。 SDK 基本上公开了我使用 FromEventPattern 用可观察值包装的事件委托。正如我所说,这对于正常的异步内容来说一切都很好,但问题是我现在有一个订阅,需要侦听队列并等待来自 SDK 包装器可观察对象的完成事件。

简单来说,如果前一条消息也有匹配的已完成事件,我只需要从队列中获取新消息。这是由于 SDK 的问题造成的,如果我尝试一次做两件事,它就会崩溃。

此处发出的已完成事件可供参考:

public class ProcessingCompletedEvent : EventMessage<QueueMessage> 
{
     public ProcessingCompletedEvent(QueueMessage data) : base(data) { 
        MessageId = data.MessageId; 
     }
     public string MessageId { get; set; }
     public string? CorrelationId { get; set; }
}

需要明确的是,我实际上没有选择将消息源更改为服务总线或其他任何东西。但是,如果在处理前一条消息时检索到另一条消息,我可以在本地备份消息,我只是希望它等待处理它,直到上一条消息完成处理。如果可以更好地达到目的,我确实可以选择更改“开启”或为其创建一个带或不带间隔的新功能,但我确实觉得间隔是必要的。

c# system.reactive
1个回答
0
投票

我重构了

async
lambda(因为它们与 Rx 配合得不好)来得到这个:

public IObservable<QueueMessage> On(string queueName, int poleIntervalSeconds = 5) =>
    Observable
        .Defer(() =>
        {
            IAzureQueueAdapter queueAdapter = ConnectToQueue(queueName);
            return
                from n in Observable.Interval(TimeSpan.FromSeconds(poleIntervalSeconds))
                from rqms in Observable.FromAsync(() => queueAdapter.ReceiveMessagesAsync(1))
                from qm in rqms.Value
                from u in Observable.FromAsync(() => queueAdapter.DeleteMessageAsync(qm.MessageId, qm.PopReceipt))
                select qm;
        });

我不能确定它是否有效,但希望它能更接近一点。

如果您需要更紧密地控制正在使用的线程,请尝试使用

EventLoopScheduler
。它是一个启动自己线程的调度程序,因此不能同时运行两件事。

© www.soinside.com 2019 - 2024. All rights reserved.