我正在使用 MassTransit 8.3.4、.NET 7 和 Azure 服务总线“标准”定价层。
我注意到如果:
然后我得到以下结果:
并行处理 8 条消息。
< 1 min hold - no saga instance process anything>
并行处理 8 条消息。
< 1 min hold - no saga instance process anything>
...
问题:为什么会延迟1分钟?我缺少什么?为什么在 saga 实例完成后 - 没有立即从队列中提取另一条消息?
源代码:
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();
var queueAName = "commandA";
builder.Services.AddMassTransit(x =>
{
x.AddSagaStateMachine<SimpleSaga, SimpleSagaState>().MessageSessionRepository();
x.UsingAzureServiceBus((context, cfg) =>
{
cfg.ReceiveEndpoint(queueAName, c =>
{
c.RequiresSession = true;
c.ConfigureSaga<SimpleSagaState>(context);
});
cfg.Host("");
});
});
var app = builder.Build();
app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
app.Run();
public class SimpleSaga : MassTransitStateMachine<SimpleSagaState>
{
public SimpleSaga()
{
InstanceState(x => x.CurrentState);
Event(() => InitialCommandReceived, x => x.CorrelateById(context => context.CorrelationId.Value));
Initially(
When(InitialCommandReceived)
.Then(c => Console.WriteLine($"[{DateTime.UtcNow.TimeOfDay}]: InitialCommandReceived: received with CorrelationId: {c.CorrelationId}."))
.Finalize());
SetCompletedWhenFinalized();
}
public Event<CommandA> InitialCommandReceived { get; set; }
}
public class SimpleSagaState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
}
输出:
[19:19:02.4539790]:InitialCommandReceived:收到 CorrelationId:538f04fd-62b1-4885-8be2-4f1e1a143539。
[19:19:02.4540137]:InitialCommandReceived:收到 CorrelationId:87077e87-447d-422e-b078-b73236c9d23e。
[19:19:02.4540267]:InitialCommandReceived:收到 CorrelationId:3d9297d8-5117-4bf3-8bb6-769b045ba68e。
[19:19:02.4539898]:InitialCommandReceived:收到 CorrelationId:8a02518f-c6f9-4fcb-a605-dede410e8610。
[19:19:02.4545988]:InitialCommandReceived:收到CorrelationId:7773cd68-a58d-420f-bb6a-23fe67c5fcf1。
[19:19:02.5554364]:InitialCommandReceived:收到 CorrelationId:aa12eb8a-fa9f-4b52-ae69-917ba04bdec3。
[19:19:02.6620557]:InitialCommandReceived:收到 CorrelationId:b8ca11e2-cdcb-4388-8263-6e8df22f7cfb。
[19:19:02.7596565]:InitialCommandReceived:收到 CorrelationId:722e6aa7-4fb6-43c1-87a2-b09294e7f3c1。
-------------- 1 分钟------------------------
[19:20:03.5855011]:InitialCommandReceived:收到 CorrelationId:f1cdd00a-dd83-49bc-bf0d-33ab3052673c。
[19:20:03.5854979]:InitialCommandReceived:收到 CorrelationId:da7c2aa7-c676-447d-8f5f-42d1e0546b7b。
[19:20:03.5856126]:InitialCommandReceived:收到 CorrelationId:264c9e84-b302-4511-a00a-6e6241e091e8。
[19:20:03.5860874]:InitialCommandReceived:收到CorrelationId:fc8f1f03-4103-4501-b406-88cc1c59f528。
[19:20:03.5893627]:InitialCommandReceived:收到 CorrelationId:779f42ae-8986-4fdb-b3c3-28113652717c。
[19:20:03.6894042]:InitialCommandReceived:收到 CorrelationId:7672425b-e0fc-4584-8a96-ff2d17689df8。
[19:20:03.8924069]:InitialCommandReceived:收到 CorrelationId:1182d62a-5657-4fb4-9877-efbc31155f01。
[19:20:03.9992448]:InitialCommandReceived:收到 CorrelationId:8fc8ecfe-5c14-4279-af71-95e48a1afa64。
-------------- 1 分钟------------------------
[19:21:04.8731504]:InitialCommandReceived:收到 CorrelationId:3cf94c6a-2e5b-4740-9407-e12ae1ed9555。
[19:21:04.8732114]:InitialCommandReceived:收到 CorrelationId:478921d9-9a1a-49f1-a99d-2e5613625b74。
[19:21:05.2777634]:InitialCommandReceived:收到 CorrelationId:497d815f-466f-4baf-89cb-618e0bf2e9d3。
[19:21:05.2781711]:InitialCommandReceived:收到 相关 ID:d142326f-6b2a-4792-8cf8-0f0914aeea36。
[19:21:05.2782202]:InitialCommandReceived:收到 CorrelationId:718bce5a-c8e1-4163-a0df-ba795d8fc3d3。
[19:21:05.4816332]:InitialCommandReceived:收到 CorrelationId:c9bfc039-7fdc-4cdf-b496-0e01d781b392。
[19:21:05.4816541]:InitialCommandReceived:收到 CorrelationId:4aa1a967-c9a3-4233-9d63-796b07af3aab。
[19:21:05.5837755]:InitialCommandReceived:收到 CorrelationId:7dfc6640-856a-4ef8-b5cc-c3469a6bb5bc。
您可以配置会话设置以满足您的要求。
如果您希望在释放会话之前减少空闲时间,请考虑降低
SessionIdleTimeout
:
cfg.ReceiveEndpoint(queueAName, c =>
{
c.RequiresSession = true;
c.SessionIdleTimeout = TimeSpan.FromSeconds(3);
c.ConfigureSaga<SimpleSagaState>(context);
});