我想查看来自多个 Azure 服务总线队列的所有消息。之后,我想在queueName、insertDate之后过滤它们,并提供对正文进行全文搜索的机会。
目前,我正在使用
Microsoft.Azure.ServiceBus
包创建一个 ManagementClient
来收集队列信息,然后使用 MessageReceiver
来查看消息。
var managementClient = new ManagementClient(connectionString);
var queue = await managementClient.GetQueueRuntimeInfoAsync(queueName);
var count = queue.MessageCount;
var receiver = new MessageReceiver(connectionString, queueName);
var messagesOfQueue = new List<Message>();
for (var i = 1; i <= count; i++)
{
messagesOfQueue.Add(await receiver.PeekAsync());
}
有没有更好的方法来获取所有消息?或者有没有办法只查看适用于过滤器的消息?
我还尝试使用
QueueClient.PeekBatch
包中的 WindowsAzure.ServiceBus
方法。但该方法没有返回所有消息,尽管我设置了正确的 messageCount
参数。
还有包裹
Azure.Messaging.ServiceBus
…这么多包裹是怎么回事?
那么我应该使用哪个包以及基于某些过滤器查看队列消息的最佳方法是什么?
我当前正在使用且按预期工作的解决方案如下所示:
var receiver = serviceBusClient.CreateReceiver(queueName);
var messagesOfQueue = new List<ServiceBusReceivedMessage>();
var previousSequenceNumber = -1L;
var sequenceNumber = 0L;
do
{
var messageBatch = await receiver.PeekMessagesAsync(int.MaxValue, sequenceNumber);
if (messageBatch.Count > 0)
{
sequenceNumber = messageBatch[^1].SequenceNumber;
if (sequenceNumber == previousSequenceNumber)
break;
messagesOfQueue.AddRange(messageBatch);
previousSequenceNumber = sequenceNumber;
}
else
{
break;
}
} while (true);
它使用 nuget 包
Azure.Messaging.ServiceBus
。
PeekBatchAsync(Int64, Int32)
的 MessageReceiver
方法批量接收消息。
这是执行此操作的示例代码(未经测试):
var messagesOfQueue = new List<Message>();
var sequenceNumber = 0;
var batchSize = 100;//number of messages to receive in a single call
do
{
var messages = await receiver.PeekBatchAsync(sequenceNumber, batchSize);
messagesOfQueue.AddRange(messages);
if (messages.Count > 0)
{
sequenceNumber = messages[messages.Count-1].SequenceNumber;
}
else
{
break;
}
} while (true);
解决方案避免两次获取具有相同SequenceNumber的消息。
序列号单调递增。我已经测试了大多数情况,除了当达到最大值(Long.MaxValue)时将sequenceNumber翻转到0。
using Azure.Messaging.ServiceBus;
private static async Task<List<ServiceBusReceivedMessage>> PeekAllMessages(string serviceBusConnectionString, string queueName)
{
var client = new ServiceBusClient(serviceBusConnectionString);
var receiver = client.CreateReceiver(queueName);
var messages = new List<ServiceBusReceivedMessage>();
var batchSize = 20;
var sequenceNumber = 0L;
do
{
var messageBatch = await receiver.PeekMessagesAsync(batchSize, sequenceNumber);
if (messageBatch.Count <= 0)
{
break;
}
// Increasing the SequenceNumber by 1 to avoid getting the message with the same SequenceNumber twice
sequenceNumber = messageBatch[^1].SequenceNumber + 1;
messages.AddRange(messageBatch);
} while (true);
return messages;
}
public long currentSequenceNumber; // default value for long is 0L
do{
currentSequence++;
message = reader.peekMessage(currentSequence);
currentSequence = message.getSequenceNumber();
}while(message!=null)
只要不断增加序列号即可。就是这样。