我们有一个面向 .NET 3.1 的 ASP.NET Core 应用程序。 我们使用 MassTransit 发送和使用消息,并使用 ActiveMQ 作为消息代理。
我们正在使用这些 MassTransit 套餐
<PackageReference Include="MassTransit" Version="7.2.2" />
<PackageReference Include="MassTransit.ActiveMQ" Version="7.2.2" />
<PackageReference Include="MassTransit.AspNetCore" Version="7.2.2" />
<PackageReference Include="MassTransit.Extensions.DependencyInjection" Version="7.2.2" />
我们有很多消费者,这样声明:
services.AddMassTransit(busConfigurator =>
{
busConfigurator.AddConsumer<CreateLeadConsumer>();
busConfigurator.AddConsumer<CreateOrderConsumer>();
busConfigurator.AddConsumer<InvoiceCreatedConsumer>();
busConfigurator.AddConsumer<PhoenixContractCreatedConsumer>();
busConfigurator.AddConsumer<PhoenixDocumentCreatedConsumer>();
ConfigureBus(busConfigurator, activeMqOptions);
});
我们定义了这些ActiveMqOptions
var activeMqOptions = Configuration.GetSection(SodexoActiveMQOptions.SectionName).Get<SodexoActiveMQOptions>();
EndpointConvention.Map<InvoiceCreatedMessage>(new Uri(QueueFormatter.FormatToAllowConcurrency(activeMqOptions.LocalQueues.InvoiceCreatedQueue)));
EndpointConvention.Map<CompanyOrderDto>(new Uri(QueueFormatter.FormatToAllowConcurrency(activeMqOptions.GlobalQueues.CreateOrUpdateClientQueue)));
EndpointConvention.Map<BeneficiaryWithProductDto>(new Uri(QueueFormatter.FormatToTopic(activeMqOptions.GlobalQueues.CreateOrUpdateBeneficiaryQueue)));
EndpointConvention.Map<CreatePhoenixOrderMessage>(new Uri(QueueFormatter.FormatToAllowConcurrency(activeMqOptions.GlobalQueues.CreateOrderQueue)));
EndpointConvention.Map<PhoenixOrderCreationMessage>(new Uri(QueueFormatter.FormatToAllowConcurrency(activeMqOptions.GlobalQueues.PhoenixCreateOrderQueue)));
EndpointConvention.Map<CancelOrderMessage>(new Uri(QueueFormatter.FormatToAllowConcurrency(activeMqOptions.GlobalQueues.CancelOrderQueue)));
EndpointConvention.Map<CancelOrderMessage>(new Uri(QueueFormatter.FormatToAllowConcurrency(activeMqOptions.GlobalQueues.CancelOrderQueue)));
EndpointConvention.Map<DocumentCreatedMessage>(new Uri(QueueFormatter.FormatToAllowConcurrency(activeMqOptions.LocalQueues.DocumentCreatedQueue)));
我们的消费者大部分时间都工作正常,有效负载消耗良好。
但有时会抛出异常:
[MassTransit.ReceiveTransport] R-FAULT "activemq://localhost:61616/{name of the queue}" System.Threading.Channels.ChannelClosedException: The channel has been closed.
at MassTransit.Util.ChannelExecutor.Run[T](Func`1 method, CancellationToken cancellationToken)
at MassTransit.ActiveMqTransport.Transport.ActiveMqSendTransport.SendPipe`1.Send(SessionContext sessionContext)
at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)
at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)
at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)
at GreenPipes.Internals.Extensions.TaskExtensions.<>c__DisplayClass4_0.<<OrCanceled>g__WaitAsync|0>d.MoveNext() --- End of stack trace from previous location where exception was thrown ---
at MassTransit.Util.PendingTaskCollection.Completed(CancellationToken cancellationToken)
at MassTransit.Pipeline.Filters.DeserializeFilter.Send(ReceiveContext context, IPipe`1 next)
at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
由于我们收到此异常,消息将进入错误队列。
为了避免此异常,我在应用程序上添加了重试行为。当遇到异常时,消息将在队列中重新发送。在其他例外情况下它工作正常,但在这个例外情况下则不然。
一旦抛出
System.Threading.Channels.ChannelClosedException
,消息就会重试并被消耗。但在消息被消费后,消息仍然会进入错误队列。
这是更清晰的日志视图,您将看到有一个模式:
As you will see, the first time, the application try to publish the message to a queue, but a ChannelClosedException is thrown.
The second time, the message is correctly published, because we can see final logs, and also because I checked with the targeted consumers.
[22:39:13.647 INF] [] [ECommerce.Messaging.Consumers.CreateOrderConsumer] Receiving order message
--> First time we get the message
[22:39:13.924 INF] [] [ECommerce.Messaging.Consumers.CreateOrderConsumer] Sending phoenix reorder creation message for enterprise number 754887949
[22:39:13.927 WRN] [] [MassTransit.ReceiveTransport] R-RETRY "activemq://localhost:61616/create-order" "8ee80000-56b0-0050-5f50-08da74c70f82" MassTransit.Context.RetryConsumeContext<MassTransitMessaging.CreateOrderMessage>
System.Threading.Channels.ChannelClosedException: The channel has been closed.
at MassTransit.Util.ChannelExecutor.Run[T](Func`1 method, CancellationToken cancellationToken)
at MassTransit.ActiveMqTransport.Transport.ActiveMqSendTransport.SendPipe`1.Send(SessionContext sessionContext)
at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)
at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)
at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)
at MassTransit.EndpointConventionExtensions.Send[T](ISendEndpointProvider provider, T message, CancellationToken cancellationToken)
at ECommerce.Messaging.Consumers.MessageNotifier.NotifyAsync[T](T message, CancellationToken cancellationToken) in /home/vsts/work/1/s/ECommerce.Messaging/Consumers/MessageNotifier.cs:line 18
at ECommerce.Messaging.Consumers.CreateOrderConsumer.Consume(ConsumeContext`1 context) in /home/vsts/work/1/s/ECommerce.Messaging/Consumers/CreateOrderConsumer.cs:line 96
at MassTransit.Scoping.ScopeConsumerFactory`1.Send[TMessage](ConsumeContext`1 context, IPipe`1 next)
at MassTransit.Scoping.ScopeConsumerFactory`1.Send[TMessage](ConsumeContext`1 context, IPipe`1 next)
at MassTransit.Pipeline.Filters.ConsumerMessageFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)
at MassTransit.Pipeline.Filters.ConsumerMessageFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)
at GreenPipes.Filters.RetryFilter`1.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
[22:39:14.031 INF] [] [ECommerce.Messaging.Consumers.CreateOrderConsumer] Receiving order message
--> Second time we get the message
[22:39:14.255 INF] [] [ECommerce.Messaging.Consumers.CreateOrderConsumer] Sending phoenix reorder creation message Phoenix reorder creation message is sent for enterprise number 754887949
[22:39:14.281 INF] [] [ECommerce.Messaging.Consumers.CreateOrderConsumer] Phoenix reorder creation message is sent for enterprise number 754887949
[22:39:14.281 INF] [] [ECommerce.Messaging.Consumers.CreateOrderConsumer] Reorder was created for company 754887949
--> Success, it means that the message has been correctly sent
[22:39:14.291 ERR] [] [MassTransit.ReceiveTransport] R-FAULT "activemq://localhost:61616/create-order" ID:CEBESVC-BA5AP08-54759-637950758237778584-1:0:1:1:1 "00:00:00.6544301"
System.Threading.Channels.ChannelClosedException: The channel has been closed.
at MassTransit.Util.ChannelExecutor.Run[T](Func`1 method, CancellationToken cancellationToken)
at MassTransit.ActiveMqTransport.Transport.ActiveMqSendTransport.SendPipe`1.Send(SessionContext sessionContext)
at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)
at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)
at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)
at GreenPipes.Internals.Extensions.TaskExtensions.<>c_DisplayClass4_0.<<OrCanceled>g_WaitAsync|0>d.MoveNext()
— End of stack trace from previous location where exception was thrown —
at MassTransit.Util.PendingTaskCollection.Completed(CancellationToken cancellationToken)
at MassTransit.Pipeline.Filters.DeserializeFilter.Send(ReceiveContext context, IPipe`1 next)
at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
--> The message has been sent BUT we get this error !
这很烦人,因为我们必须检查错误队列中的每条消息是否是真正的错误。
不幸的是,我无法在本地环境中重现此行为。
您遇到的异常已在 MassTransit 版本 8.1.0 中修复,其中 ActiveMQ 库已更新到官方版本。更新到此版本或更新版本应该有助于解决此问题。