遇到MassTransit异常:System.Threading.Channels.ChannelClosedException:通道已关闭

问题描述 投票:0回答:1

我们有一个面向 .NET 3.1 的 ASP.NET Core 应用程序。 我们使用 MassTransit 发送和使用消息,并使用 ActiveMQ 作为消息代理。

我们正在使用这些 MassTransit 套餐

<PackageReference Include="MassTransit" Version="7.2.2" />
<PackageReference Include="MassTransit.ActiveMQ" Version="7.2.2" />
<PackageReference Include="MassTransit.AspNetCore" Version="7.2.2" />
<PackageReference Include="MassTransit.Extensions.DependencyInjection" Version="7.2.2" />

我们有很多消费者,这样声明:

services.AddMassTransit(busConfigurator =>
{
    busConfigurator.AddConsumer<CreateLeadConsumer>();
    busConfigurator.AddConsumer<CreateOrderConsumer>();
    busConfigurator.AddConsumer<InvoiceCreatedConsumer>();
    busConfigurator.AddConsumer<PhoenixContractCreatedConsumer>();
    busConfigurator.AddConsumer<PhoenixDocumentCreatedConsumer>();

    ConfigureBus(busConfigurator, activeMqOptions);
});

我们定义了这些ActiveMqOptions

var activeMqOptions = Configuration.GetSection(SodexoActiveMQOptions.SectionName).Get<SodexoActiveMQOptions>();
EndpointConvention.Map<InvoiceCreatedMessage>(new Uri(QueueFormatter.FormatToAllowConcurrency(activeMqOptions.LocalQueues.InvoiceCreatedQueue)));
EndpointConvention.Map<CompanyOrderDto>(new Uri(QueueFormatter.FormatToAllowConcurrency(activeMqOptions.GlobalQueues.CreateOrUpdateClientQueue)));
EndpointConvention.Map<BeneficiaryWithProductDto>(new Uri(QueueFormatter.FormatToTopic(activeMqOptions.GlobalQueues.CreateOrUpdateBeneficiaryQueue)));
EndpointConvention.Map<CreatePhoenixOrderMessage>(new Uri(QueueFormatter.FormatToAllowConcurrency(activeMqOptions.GlobalQueues.CreateOrderQueue)));
EndpointConvention.Map<PhoenixOrderCreationMessage>(new Uri(QueueFormatter.FormatToAllowConcurrency(activeMqOptions.GlobalQueues.PhoenixCreateOrderQueue)));
EndpointConvention.Map<CancelOrderMessage>(new Uri(QueueFormatter.FormatToAllowConcurrency(activeMqOptions.GlobalQueues.CancelOrderQueue)));
EndpointConvention.Map<CancelOrderMessage>(new Uri(QueueFormatter.FormatToAllowConcurrency(activeMqOptions.GlobalQueues.CancelOrderQueue)));
EndpointConvention.Map<DocumentCreatedMessage>(new Uri(QueueFormatter.FormatToAllowConcurrency(activeMqOptions.LocalQueues.DocumentCreatedQueue)));

我们的消费者大部分时间都工作正常,有效负载消耗良好。

但有时会抛出异常:

[MassTransit.ReceiveTransport] R-FAULT "activemq://localhost:61616/{name of the queue}" System.Threading.Channels.ChannelClosedException: The channel has been closed.
  at MassTransit.Util.ChannelExecutor.Run[T](Func`1 method, CancellationToken cancellationToken) 
  at MassTransit.ActiveMqTransport.Transport.ActiveMqSendTransport.SendPipe`1.Send(SessionContext sessionContext) 
  at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken) 
  at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken) 
  at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken) 
  at GreenPipes.Internals.Extensions.TaskExtensions.<>c__DisplayClass4_0.<<OrCanceled>g__WaitAsync|0>d.MoveNext() --- End of stack trace from previous location where exception was thrown --- 
  at MassTransit.Util.PendingTaskCollection.Completed(CancellationToken cancellationToken) 
  at MassTransit.Pipeline.Filters.DeserializeFilter.Send(ReceiveContext context, IPipe`1 next) 
  at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)

由于我们收到此异常,消息将进入错误队列。

为了避免此异常,我在应用程序上添加了重试行为。当遇到异常时,消息将在队列中重新发送。在其他例外情况下它工作正常,但在这个例外情况下则不然。

一旦抛出

System.Threading.Channels.ChannelClosedException
,消息就会重试并被消耗。但在消息被消费后,消息仍然会进入错误队列。

这是更清晰的日志视图,您将看到有一个模式:

As you will see, the first time, the application try to publish the message to a queue, but a ChannelClosedException is thrown. 

The second time, the message is correctly published, because we can see final logs, and also because I checked with the targeted consumers.

 

[22:39:13.647 INF] [] [ECommerce.Messaging.Consumers.CreateOrderConsumer] Receiving order message

 --> First time we get the message

[22:39:13.924 INF] [] [ECommerce.Messaging.Consumers.CreateOrderConsumer] Sending phoenix reorder creation message for enterprise number 754887949

 

[22:39:13.927 WRN] [] [MassTransit.ReceiveTransport] R-RETRY "activemq://localhost:61616/create-order" "8ee80000-56b0-0050-5f50-08da74c70f82" MassTransit.Context.RetryConsumeContext<MassTransitMessaging.CreateOrderMessage>

System.Threading.Channels.ChannelClosedException: The channel has been closed.

   at MassTransit.Util.ChannelExecutor.Run[T](Func`1 method, CancellationToken cancellationToken)

   at MassTransit.ActiveMqTransport.Transport.ActiveMqSendTransport.SendPipe`1.Send(SessionContext sessionContext)

   at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)

   at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)

   at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)

   at MassTransit.EndpointConventionExtensions.Send[T](ISendEndpointProvider provider, T message, CancellationToken cancellationToken)

   at ECommerce.Messaging.Consumers.MessageNotifier.NotifyAsync[T](T message, CancellationToken cancellationToken) in /home/vsts/work/1/s/ECommerce.Messaging/Consumers/MessageNotifier.cs:line 18

   at ECommerce.Messaging.Consumers.CreateOrderConsumer.Consume(ConsumeContext`1 context) in /home/vsts/work/1/s/ECommerce.Messaging/Consumers/CreateOrderConsumer.cs:line 96

   at MassTransit.Scoping.ScopeConsumerFactory`1.Send[TMessage](ConsumeContext`1 context, IPipe`1 next)

   at MassTransit.Scoping.ScopeConsumerFactory`1.Send[TMessage](ConsumeContext`1 context, IPipe`1 next)

   at MassTransit.Pipeline.Filters.ConsumerMessageFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)

   at MassTransit.Pipeline.Filters.ConsumerMessageFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)

   at GreenPipes.Filters.RetryFilter`1.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)

 

[22:39:14.031 INF] [] [ECommerce.Messaging.Consumers.CreateOrderConsumer] Receiving order message

 --> Second time we get the message

[22:39:14.255 INF] [] [ECommerce.Messaging.Consumers.CreateOrderConsumer] Sending phoenix reorder creation message Phoenix reorder creation message is sent for enterprise number 754887949

 

 

[22:39:14.281 INF] [] [ECommerce.Messaging.Consumers.CreateOrderConsumer] Phoenix reorder creation message is sent for enterprise number 754887949 

[22:39:14.281 INF] [] [ECommerce.Messaging.Consumers.CreateOrderConsumer] Reorder was created for company 754887949 

--> Success, it means that the message has been correctly sent
 

[22:39:14.291 ERR] [] [MassTransit.ReceiveTransport] R-FAULT "activemq://localhost:61616/create-order" ID:CEBESVC-BA5AP08-54759-637950758237778584-1:0:1:1:1 "00:00:00.6544301"

System.Threading.Channels.ChannelClosedException: The channel has been closed.

   at MassTransit.Util.ChannelExecutor.Run[T](Func`1 method, CancellationToken cancellationToken)

   at MassTransit.ActiveMqTransport.Transport.ActiveMqSendTransport.SendPipe`1.Send(SessionContext sessionContext)

   at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)

   at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)

   at GreenPipes.Agents.PipeContextSupervisor`1.GreenPipes.IPipeContextSource<TContext>.Send(IPipe`1 pipe, CancellationToken cancellationToken)

   at GreenPipes.Internals.Extensions.TaskExtensions.<>c_DisplayClass4_0.<<OrCanceled>g_WaitAsync|0>d.MoveNext()

— End of stack trace from previous location where exception was thrown —

   at MassTransit.Util.PendingTaskCollection.Completed(CancellationToken cancellationToken)

   at MassTransit.Pipeline.Filters.DeserializeFilter.Send(ReceiveContext context, IPipe`1 next)

   at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)

--> The message has been sent BUT we get this error !

这很烦人,因为我们必须检查错误队列中的每条消息是否是真正的错误。

不幸的是,我无法在本地环境中重现此行为。

c# asp.net .net activemq masstransit
1个回答
0
投票

您遇到的异常已在 MassTransit 版本 8.1.0 中修复,其中 ActiveMQ 库已更新到官方版本。更新到此版本或更新版本应该有助于解决此问题。

© www.soinside.com 2019 - 2024. All rights reserved.