Topos .NET 事件处理库,类似于 Rebus。与 Rebus 不同,与其说它是消息处理,不如说是事件处理。
Rebus 开箱即用地支持 Sagas,包括持久性、关联性和并发性。 如何在 Topos 上实现 Saga?
如果 Topos 支持 Sagas,是否有 Saga 实现的示例?
不幸的是,Topos 没有任何类型的内置传奇。
在 Fleet Manager(Rebus Pro附带的 Rebus 管理应用程序,也是我制作 Topos 的原因)中,我制作了一个类似 saga 的事件处理器,它使用 MongoDB 或 LiteDB 进行持久化。
这个实现是完全专有的,因为它是商业软件产品的一部分,并且它不够通用,不适合重用。无论如何,我可以在这里告诉你一些关于它的信息,希望能给你一些关于如何自己构建类似的东西的灵感。 🙂
事件处理器托管在 Topos 使用者中,它将所有接收到的事件分派到一堆“投影”,从而实现源自“左折叠”的经典事件(current_state + event => new_state)。
Fleet Manager 有两种类型的投影:流程管理器(即通过发出命令导致其他事件发出的投影)和视图。这两种类型的结合就是你所谓的“传奇”🙂
一种可能的视图可以这样实现(为了简洁起见,删除了很多内容):
public class QueueInstanceView : ViewInstance<InstancePerQueue>, IExpire, IHaveAccountId, IHaveQueueName, ICanBeHidden
{
public string AccountId { get; set; }
public string QueueName { get; set; }
public DateTime LastActivity { get; set; }
public bool Hidden { get; set; }
protected override void DispatchEvent(AuditEvent auditEvent)
{
if (auditEvent.Body is EntityHidden entityHidden)
{
QueueName ??= entityHidden.Id;
Hidden = !entityHidden.Reverse;
}
else
{
QueueName ??= auditEvent.GetQueueName();
}
LastActivity = auditEvent.GetTime();
}
}
注意视图类如何从通用
ViewInstance<>
类继承,并以 InstancePerQueue
类型关闭它。基类跟踪视图实例的 ID 以及用于实现幂等性的其他一些内容,然后 InstancePerQueue
定义事件如何映射到视图实例。
看起来像这样:
public class InstancePerQueue : ViewLocator
{
public override string[] GetViewIds(AuditEvent auditEvent)
{
if (auditEvent.Body is EntityHidden entityHidden)
{
if (entityHidden.HasType(EntityTypeNames.Queue))
{
var accountId = auditEvent.GetAccountId();
return new[] { $"{accountId}/{entityHidden.Id}" };
}
return Array.Empty<string>();
}
var queueName = auditEvent.GetQueueNameOrNull();
if (queueName == null) return Array.Empty<string>();
var accountId = auditEvent.GetAccountId();
return new[] { $"{accountId}/{queueName}" };
}
}
因此将事件与“/”形式的 ID 相关联(其中 Fleet Manager 术语中的“帐户”基本上只是一个环境,即队列名称用于识别帐户内的队列)。
当然,在投影实现中会实现很多逻辑,但虽然很长,但也相当简单。
我希望这能给您一些灵感,帮助您了解如何为 Topos 构建“传奇”。 🙂
顺便说一句。我不能把这个特殊的设计归功于我。我最初是在 2013 年至 2014 年由 Emil Krog Ingerslev 接触到与此非常相似的设计,他为我们在 d60 构建的事件源应用程序提出了该设计。
后来我模仿了所有移动部件来实现 Cirqus 的持久投影,我们将其用于几个事件源应用程序。
最后,我在 2016 年为 Fleet Manager 实现了当前的实现,当时我需要类似的东西,只是没有 Cirqus 中存在的聚合根内容,并且在 Kafka 而不是普通数据库上工作。