如何在Topos上实现Saga?

问题描述 投票:0回答:1

Topos .NET 事件处理库,类似于 Rebus。与 Rebus 不同,与其说它是消息处理,不如说是事件处理。

Rebus 开箱即用地支持 Sagas,包括持久性、关联性和并发性。 如何在 Topos 上实现 Saga?

如果 Topos 支持 Sagas,是否有 Saga 实现的示例?

saga rebus rebus-kafka
1个回答
1
投票

不幸的是,Topos 没有任何类型的内置传奇。

Fleet ManagerRebus Pro附带的 Rebus 管理应用程序,也是我制作 Topos 的原因)中,我制作了一个类似 saga 的事件处理器,它使用 MongoDBLiteDB 进行持久化。

这个实现是完全专有的,因为它是商业软件产品的一部分,并且它不够通用,不适合重用。无论如何,我可以在这里告诉你一些关于它的信息,希望能给你一些关于如何自己构建类似的东西的灵感。 🙂

事件处理器托管在 Topos 使用者中,它将所有接收到的事件分派到一堆“投影”,从而实现源自“左折叠”的经典事件(current_state + event => new_state)。

Fleet Manager 有两种类型的投影:流程管理器(即通过发出命令导致其他事件发出的投影)和视图。这两种类型的结合就是你所谓的“传奇”🙂

一种可能的视图可以这样实现(为了简洁起见,删除了很多内容):

public class QueueInstanceView : ViewInstance<InstancePerQueue>, IExpire, IHaveAccountId, IHaveQueueName, ICanBeHidden
{
    public string AccountId { get; set; }
    public string QueueName { get; set; }

    public DateTime LastActivity { get; set; }
        
    public bool Hidden { get; set; }

    protected override void DispatchEvent(AuditEvent auditEvent)
    {
        if (auditEvent.Body is EntityHidden entityHidden)
        {
            QueueName ??= entityHidden.Id;
            Hidden = !entityHidden.Reverse;
        }
        else
        {
            QueueName ??= auditEvent.GetQueueName();
        }

        LastActivity = auditEvent.GetTime();
    }
}

注意视图类如何从通用

ViewInstance<>
类继承,并以
InstancePerQueue
类型关闭它。基类跟踪视图实例的 ID 以及用于实现幂等性的其他一些内容,然后
InstancePerQueue
定义事件如何映射到视图实例。

看起来像这样:

public class InstancePerQueue : ViewLocator
{
    public override string[] GetViewIds(AuditEvent auditEvent)
    {
        if (auditEvent.Body is EntityHidden entityHidden)
        {
            if (entityHidden.HasType(EntityTypeNames.Queue))
            {
                var accountId = auditEvent.GetAccountId();
                return new[] { $"{accountId}/{entityHidden.Id}" };
            }

            return Array.Empty<string>();
        }

        var queueName = auditEvent.GetQueueNameOrNull();
        if (queueName == null) return Array.Empty<string>();

        var accountId = auditEvent.GetAccountId();
        return new[] { $"{accountId}/{queueName}" };
    }
}

因此将事件与“/”形式的 ID 相关联(其中 Fleet Manager 术语中的“帐户”基本上只是一个环境,即队列名称用于识别帐户内的队列)。

当然,在投影实现中会实现很多逻辑,但虽然很长,但也相当简单。

我希望这能给您一些灵感,帮助您了解如何为 Topos 构建“传奇”。 🙂


顺便说一句。我不能把这个特殊的设计归功于我。我最初是在 2013 年至 2014 年由 Emil Krog Ingerslev 接触到与此非常相似的设计,他为我们在 d60 构建的事件源应用程序提出了该设计。

后来我模仿了所有移动部件来实现 Cirqus 的持久投影,我们将其用于几个事件源应用程序。

最后,我在 2016 年为 Fleet Manager 实现了当前的实现,当时我需要类似的东西,只是没有 Cirqus 中存在的聚合根内容,并且在 Kafka 而不是普通数据库上工作。

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.