我计划创建一个监听服务总线订阅的 Window 服务。我没有使用顶层架子。我使用本机方式创建窗口服务。我的目标框架是.Net Framework 4.6.2。 在 OnStart 中,我设置侦听器来侦听服务总线,但每次订阅上有新消息时,MessageHandler 方法都不会被调用。
protected override async void OnStart(string[] args)
{
_logger.Information("App is running");
client = new ServiceBusClient(connectionString, new ServiceBusClientOptions()
{
TransportType = ServiceBusTransportType.AmqpWebSockets
});
// create a processor that we can use to process the messages
processor = client.CreateProcessor(topicName, subscriptionName, new ServiceBusProcessorOptions());
try
{
// add handler to process messages
processor.ProcessMessageAsync += MessageHandler;
// add handler to process any errors
processor.ProcessErrorAsync += ErrorHandler;
// start processing
await processor.StartProcessingAsync();
_logger.Information("Wait for a minute and then press any key to end the processing");
}
catch (Exception ex)
{
_logger.Error(ex, "Error running");
}
finally
{
// Calling DisposeAsync on client types is required to ensure that network
// resources and other unmanaged objects are properly cleaned up.
await processor.DisposeAsync();
await client.DisposeAsync();
}
}
static async Task MessageHandler(ProcessMessageEventArgs args)
{
try
{
var sqliteUploadFileDto = JsonConvert.DeserializeObject<TestDto>(args.Message.Body.ToString());
_logger.Information($"Received new message - {args.Message.Body.ToString()}");
await args.CompleteMessageAsync(args.Message);
}
catch (Exception ex)
{
_logger.Error(ex, "Message handler error");
throw ex;
}
}
static Task ErrorHandler(ProcessErrorEventArgs args)
{
_logger.Error(args.Exception, "Message handler error");
return Task.CompletedTask;
}
protected override async void OnStop()
{
// stop processing
_logger.Information("\nStopping the receiver...");
await processor.StopProcessingAsync();
_logger.Information("Stopped receiving messages");
}
行
await processor.StartProcessingAsync();
开始处理,然后立即返回。所以代码然后执行await processor.DisposeAsync();
。您需要推迟对 Dispose
的呼叫,直到呼叫 OnStop
。在当前情况下,进程没有足够的时间来处理任何传入消息。