在这方面遇到了一点麻烦,希望你们能帮忙...我正在使用 C# System.Reactive。我有一个可观察的,它只是一个从天蓝色存储队列获取天蓝色队列消息的时间间隔,一次一个。看起来像这样:
public IObservable<QueueMessage> On(
string queueName,
int poleIntervalSeconds = 5
)
{
return Observable
.Interval(TimeSpan.FromSeconds(poleIntervalSeconds))
.SelectMany(async _ =>
{
IAzureQueueAdapter queueAdapter = ConnectToQueue(queueName);
Response<QueueMessage[]> response = await queueAdapter.ReceiveMessagesAsync(1);
return response.Value;
})
.SelectMany(messages => messages)
.Do(async queueMessage =>
{
IAzureQueueAdapter queueAdapter = ConnectToQueue(queueName);
await queueAdapter.DeleteMessageAsync(queueMessage.MessageId, queueMessage.PopReceipt);
});
}
// Usage
IObservable<ProcessingCompletedEvent> processingCompletedEvent = eventAggregator
.On<ProcessingCompletedEvent>(); // Not the same On
IObservable<QueueMessage> queueMessageObservable = queueAdapter.On(queueName, 5);
它相当简单,对于队列消息(Windows 服务应用程序)的直接异步处理,这非常有效。不过,我现在有一个使用第三方 C++ SDK 的外部进程,我必须使用 pinvoke 来执行该进程。 SDK 基本上公开了我使用 FromEventPattern 用可观察值包装的事件委托。正如我所说,这对于正常的异步内容来说一切都很好,但问题是我现在有一个订阅,需要侦听队列并等待来自 SDK 包装器可观察对象的完成事件。
简单来说,如果前一条消息也有匹配的已完成事件,我只需要从队列中获取新消息。这是由于 SDK 的问题造成的,如果我尝试一次做两件事,它就会崩溃。
此处发出的已完成事件可供参考:
public class ProcessingCompletedEvent : EventMessage<QueueMessage>
{
public ProcessingCompletedEvent(QueueMessage data) : base(data) {
MessageId = data.MessageId;
}
public string MessageId { get; set; }
public string? CorrelationId { get; set; }
}
需要明确的是,我实际上没有选择将消息源更改为服务总线或其他任何东西。但是,如果在处理前一条消息时检索到另一条消息,我可以在本地备份消息,我只是希望它等待处理它,直到上一条消息完成处理。如果可以更好地达到目的,我确实可以选择更改“开启”或为其创建一个带或不带间隔的新功能,但我确实觉得间隔是必要的。
我重构了
async
lambda(因为它们与 Rx 配合得不好)来得到这个:
public IObservable<QueueMessage> On(string queueName, int poleIntervalSeconds = 5) =>
Observable
.Defer(() =>
{
IAzureQueueAdapter queueAdapter = ConnectToQueue(queueName);
return
from n in Observable.Interval(TimeSpan.FromSeconds(poleIntervalSeconds))
from rqms in Observable.FromAsync(() => queueAdapter.ReceiveMessagesAsync(1))
from qm in rqms.Value
from u in Observable.FromAsync(() => queueAdapter.DeleteMessageAsync(qm.MessageId, qm.PopReceipt))
select qm;
});
我不能确定它是否有效,但希望它能更接近一点。
如果您需要更紧密地控制正在使用的线程,请尝试使用
EventLoopScheduler
。它是一个启动自己线程的调度程序,因此不能同时运行两件事。