我正在尝试在 DotNet 中实现后台服务,将消息发送到 Azure 服务总线中的队列。我的代码如下所示:
public class AuditBackgroundService : BackgroundService, IAuditBackgroundService
{
private readonly TimeSpan _sendInterval;
private readonly ServiceBusSender _sender;
private readonly ConcurrentQueue<string> _messageQueue = new ConcurrentQueue<string>();
public AuditBackgroundService(ServiceBusSender sender, IOptions<AuditConfiguration> config)
{
_sendInterval = TimeSpan.FromMilliseconds(config.Value.SendIntervalMilliseconds);
_sender = sender;
}
public void AddMessage(string message)
{
_messageQueue.Enqueue(message);
}
protected async override Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(_sendInterval, stoppingToken);
await ProcessMessagesAsync(stoppingToken);
}
}
private async Task ProcessMessagesAsync(CancellationToken cancellationToken)
{
if (!_messageQueue.IsEmpty)
{
while (_messageQueue.TryDequeue(out string message))
{
var busMessage = new ServiceBusMessage(message);
await _sender.SendMessageAsync(busMessage, cancellationToken);
}
}
}
}
据我所知,这工作得很好,实际上从 Azure 中的概述页面我可以看到我收到了几个成功的请求,但是实际上没有消息到达队列。
我已利用 Microsoft 的 快速入门指南 获取有关如何为 Azure 服务总线设置命名空间和队列的说明,但尽管按照说明进行操作,我似乎仍然无法使其工作。知道我可能做错了什么吗?提前感谢您的帮助。
下面的示例代码演示了如何在发送过程中添加日志记录。感谢@Sean Feldman 对日志的评论。
{
// Load configuration from appsettings.json
var configuration = new ConfigurationBuilder()
.AddJsonFile("appsettings.json", optional: true, reloadOnChange: true)
.Build();
// Get Service Bus connection string from configuration
string connectionString = " AzureServiceBusConnectionString";
string queueName = "AzureServiceBusqueueName";
// Create a ServiceBusClient
await using var client = new ServiceBusClient(connectionString);
// Create a ServiceBusSender
ServiceBusSender sender = client.CreateSender(queueName);
// Create a cancellation token to stop the sending after a certain time
using var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(30));
// Create and start the sending process
var auditBackgroundService = new AuditBackgroundService(sender, cts.Token);
await auditBackgroundService.StartSendingAsync();
Console.WriteLine("Press any key to stop the application...");
Console.ReadKey();
// Stop the sending process when a key is pressed
cts.Cancel();
}
}
public class AuditBackgroundService
{
private readonly ServiceBusSender _sender;
private readonly CancellationToken _cancellationToken;
public AuditBackgroundService(ServiceBusSender sender, CancellationToken cancellationToken)
{
_sender = sender;
_cancellationToken = cancellationToken;
}
public async Task StartSendingAsync()
{
while (!_cancellationToken.IsCancellationRequested)
{
string message = "Hello, Azure Service Bus!";
var busMessage = new ServiceBusMessage(message)
{
SessionId = "1"
};
try
{
Console.WriteLine($"Sending message: {message}");
await _sender.SendMessageAsync(busMessage, _cancellationToken);
Console.WriteLine($"Message sent successfully: {message}");
}
catch (Exception ex)
{
Console.WriteLine($"Error sending message: {ex.Message}");
}
// Delay for a moment before sending the next message
await Task.Delay(TimeSpan.FromSeconds(1));
}
}
}