在消息处理程序中如何立即停止处理新消息?

问题描述 投票:0回答:1

我有一个 Rebus 总线设置,其中有一个工作进程,最大并行度为 1,可以“顺序”处理消息。如果处理程序失败或出于特定业务原因,我希望总线实例立即停止处理消息。

我尝试使用 Rebus.Event 包来检测 AfterMessageHandled 处理程序中的异常并将工作线程数设置为 0,但似乎在它真正成功停止单个工作线程实例之前已经处理了其他消息。

在事件处理管道中我可以做什么

bus.Advanced.Workers.SetNumberOfWorkers(0);
为了防止进一步的消息处理?

我还尝试在处理程序本身的 catch 块内将工作人员数量设置为 0,但这似乎不是正确的位置,因为

SetNumberOfWorkers(0)
在返回之前等待处理程序完成,并且调用者是处理程序...对我来说似乎陷入了某种僵局。

谢谢你

rebus rebus-rabbitmq
1个回答
2
投票

这种特殊情况有点进退两难,因为 - 正如您所正确观察到的 -

SetNumberOfWorkers
是一个阻塞函数,它将等待直到达到所需的线程数。

在您的情况下,由于您将其设置为零,这意味着您的消息处理程序需要在线程数达到零之前完成......然后:💣☠🔒

我很抱歉这么说,因为我敢打赌你想要这样做是因为你不知何故陷入了困境 - 但总的来说,我必须说想要按顺序并按消息队列的顺序处理消息是自找麻烦,因为有很多事情可能导致消息被重新排序。

但是,我认为您可以通过安装传输装饰器来解决您的问题,该装饰器在切换时将绕过真正的传输。如果装饰器随后从

null
方法返回
Receive
,它将触发 Rebus 的内置后退策略并开始冷却(即,它将增加轮询传输之间的等待时间)。

检查一下 - 首先,让我们创建一个简单的线程安全切换:

public class MessageHandlingToggle
{
    public volatile bool ProcessMessages = true;
}

(你可能想把它包起来并以某种方式做得漂亮,但现在应该可以了)

然后我们将其注册为容器中的单例(假设此处为 Microsoft DI):

services.AddSingleton(new MessageHandlingToggle());

我们将使用

ProcessMessages
标志来指示是否应启用消息处理。

现在,当您配置 Rebus 时,您可以装饰传输并授予装饰器访问容器中的切换实例的权限:

services.AddRebus((configure, provider) =>
    configure
        .Transport(t => {
            t.Use(...);

            // install transport decorator here
            t.Decorate(c => {
                var transport = c.Get<ITransport>();
                var toggle = provider.GetRequiredService<MessageHandlingToggle>();
                return new MessageHandlingToggleTransportDecorator(transport, toggle);
            })
        })
        .(...)
);

所以,现在你只需要构建装饰器:

public class MessageHandlingToggleTransportDecorator : ITransport
{
    static readonly Task<TransportMessage> NoMessage = Task.FromResult<TransportMessage>(null);

    readonly ITransport _transport;
    readonly MessageHandlingToggle _toggle;

    public MessageHandlingToggleTransportDecorator(ITransport transport, MessageHandlingToggle toggle)
    {
        _transport = transport;
        _toggle = toggle;
    }   

    public string Address => _transport.Address;

    public void CreateQueue(string address) => _transport.CreateQueue(address);

    public Task Send(string destinationAddress, TransportMessage message, ITransactionContext context) 
        => _transport.Send(destinationAddress, message, context);

    public Task<TransportMessage> Receive(ITransactionContext context, CancellationToken cancellationToken)
        => _toggle.ProcessMessages 
            ? _transport.Receive(context, cancellationToken)
            : NoMessage;
}

如您所见,当

null
时,它只会返回
ProcessMessages == false
。剩下的唯一一件事就是决定何时再次恢复处理消息,以某种方式从容器中拉出
MessageHandlingToggle
(可能通过注入),然后将
bool
弹回
true

我希望能为你工作,或者至少给你一些如何解决问题的灵感。 🙂

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.