我对 Azure 服务总线和消息队列总体来说还比较陌生。在熟悉自己的过程中,我遇到了一些事情,我最初认为这是我做的一些笨拙的事情,但当我试图找出根本原因时,我变得越来越困惑。
在一个非常简单的应用程序中,我有一个端点,它将一些硬编码的详细信息传递到通用的“将此消息排队”服务类:
public class TestFunction(IEventBusService eventBusService)
{
[Function("TestFunction")]
public async Task<HttpResponseData> Run([HttpTrigger(AuthorizationLevel.Function, "get")] HttpRequestData req)
{
try
{
await eventBusService.SendAsync(new TestObject { Message = "Message for service bus", SentAtDateTimeOffset = DateTimeOffset.UtcNow }, Queues.Test);
var response = req.CreateResponse(HttpStatusCode.OK);
await response.WriteAsJsonAsync("Message successfully published to service bus", "application/json");
return response;
}
catch (ServiceBusOperationFailedException)
{
var response = req.CreateResponse(HttpStatusCode.InternalServerError);
await response.WriteAsJsonAsync("Unable to publish message to service bus", "application/json");
return response;
}
}
}
public class TestObject
{
public string Message { get; set; }
public DateTimeOffset SentAtDateTimeOffset { get; set; }
}
所讨论的方法如下所示:
public class EventBusService(ServiceBusClient serviceBusClient) : IEventBusService
{
public async Task SendAsync<T>(T dataToSend, string sendTo, CancellationToken cancellationToken = default)
{
ValidateParams(dataToSend, sendTo);
var serviceBusSender = serviceBusClient.CreateSender(sendTo);
var messageJson = new ServiceBusMessage(JsonSerializer.Serialize(dataToSend));
try
{
await serviceBusSender.SendMessageAsync(messageJson, cancellationToken);
}
catch (Exception ex)
{
throw new ServiceBusOperationFailedException($"Unable to publish message to '{sendTo}'.", ex);
}
finally
{
await serviceBusSender.DisposeAsync();
}
}
private void ValidateParams<T>(T dataToSend, string sendTo)
{
if (dataToSend is null)
throw new ArgumentNullException(nameof(dataToSend));
if (string.IsNullOrEmpty(sendTo))
throw new ArgumentNullException(nameof(sendTo));
}
}
一切看起来都很简单。
为了验证其行为是否符合预期,我进行了一个快速测试,该测试将调用端点...
[TestMethod]
public async Task Should_OnSuccessfulPublishToEventBus_ReceiveConfirmationMessage()
{
// Arrange
var testFunction = new TestFunction(EventBusHelper.CreateService());
var mockFunctionContext = EventBusHelper.GetMockFunctionContext();
//Act
var response = await testFunction.Run(new TestHttpRequestData(mockFunctionContext));
//Assert
Assert.IsNotNull(response);
Assert.AreEqual(HttpStatusCode.OK, response.StatusCode);
response.Body.Position = 0;
using var streamReader = new StreamReader(response.Body);
var responseBody = await streamReader.ReadToEndAsync();
Assert.AreEqual("\"Message successfully published to service bus\"", responseBody);
}
...并使用 Service Bus Explorer 检查结果。
但是,当我查看排队的消息时,我可以看到同一条消息已被添加到队列中两次。从那以后,每次我运行测试时都会发生这种情况。
根据我上面的代码,发生这种情况有什么原因吗?我已经调试并确保端点只被命中一次,所以我只能认为这是我不知道的消息队列的一些复杂性。任何帮助或建议表示赞赏!
谢谢, 马克
Azure 服务总线 - 来自发件人的消息被排队两次
可能是由于Host并发发送消息,或者您设置了重试策略等原因,导致发送了两次数据。 防止的一种方法是在创建队列时设置以下选项:
或下面是代码(使用DuplicateDetection),仅发送一次消息:
函数1.cs:
using System;
using System.Text.Json;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
namespace FunctionApp170
{
public class Function1
{
private readonly ServiceBusClient rith_sbc;
private readonly ILogger<Function1> r_log;
public Function1(ServiceBusClient sbc, ILogger<Function1> logger)
{
rith_sbc = sbc;
r_log = logger;
}
[FunctionName("Function1")]
public async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequest req)
{
r_log.LogInformation("Hello, code is started executing");
var rith_obj = new { Message = "Secret", Time = DateTimeOffset.UtcNow };
var mid = Guid.NewGuid().ToString();
var r_sen = rith_sbc.CreateSender("rithqueue");
var rith_msg = new ServiceBusMessage(JsonSerializer.Serialize(rith_obj))
{
MessageId = mid
};
await r_sen.SendMessageAsync(rith_msg);
return new OkObjectResult("Hello Rithwik Bojja, Message is sent to Service Bus Successfully");
}
}
}
Startup.cs:
using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
using Microsoft.Azure.Functions.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Threading.Tasks;
[assembly: FunctionsStartup(typeof(FunctionApp170.Startup))]
namespace FunctionApp170
{
public class Startup : FunctionsStartup
{
public override void Configure(IFunctionsHostBuilder builder)
{
string rith_cs = Environment.GetEnvironmentVariable("Rith_Service_Bus_Con");
var rith_ac = new ServiceBusAdministrationClient(rith_cs);
ConfigureQueueAsync(rith_ac).GetAwaiter().GetResult();
builder.Services.AddSingleton(new ServiceBusClient(rith_cs));
builder.Services.AddSingleton<Function1>();
}
private async Task ConfigureQueueAsync(ServiceBusAdministrationClient r_ac)
{
var r_q_name = "rithqueue";
if (!await r_ac.QueueExistsAsync(r_q_name))
{
var rith_op = new CreateQueueOptions(r_q_name)
{
RequiresDuplicateDetection = true,
DuplicateDetectionHistoryTimeWindow = TimeSpan.FromMinutes(10)
};
await r_ac.CreateQueueAsync(rith_op);
}
}
}
}
local.settings.json:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"Rith_Service_Bus_Con": "Endpoint=sb://rithwik.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=eu6XXSQ="
}
}
输出:
使用上面的代码不会发生两次排队。