假设应用程序中
office
有两个EventSourcedBehavior
演员
trait OfficeFridgeCommand
case object OpenFridge extends OfficeFridgeCommand
case object CloseFridge extends OfficeFridgeCommand
trait OfficeFridgeEvent
case object FridgeOpened extends OfficeFridgeEvent
case object FridgeClosed extends OfficeFridgeEvent
trait OfficeCoffeeMachineCommand
case object MakeCoffee extends OfficeCoffeeMachineCommand
trait OfficeCoffeeMachineEvent
case object CoffeeMade extends OfficeCoffeeMachineEvent
val fridgeEntity =
EventSourcedBehavior[OfficeFridgeCommand, OfficeFridgeEvent, OfficeFridge]()
val frontDoorEntity =
EventSourcedBehavior[OfficeFrontDoorCommand,, OfficeFrontDoorEvent, OfficeFridge]()
val coffeeMachineEntity =
EventSourcedBehavior[OfficeCoffeeMachineCommand, OfficeCoffeeMachineEvent, OfficeFridge]()
假设冰箱发生了一些操作,并且冰箱上注册了 1000 个具有不同持久性 ID [0-1000] 的事件。
使得期刊喜欢:
订购 | persistence_id | event_ser_manifest |
---|---|---|
1 | 200 | 冰箱打开 |
2 | 200 | 冰箱关闭 |
... | ... | ... |
500 | 500 | 冰箱打开 |
... | ... | ... |
1000 | 501 | 冰箱关闭 |
如果有一条持久性 ID 为 500 的
GetCoffeeMachineState
消息传入 frontDoorEntity
actor。 frontDoorEntity
将尝试重播 persistence_id = 500
日志事件。
它会失败,因为它无法将 OfficeFridgeEvent
投射到 OfficeCoffeeMachineEvent
actor(akka 输入还记得吗?)。
这是此类系统的常见设置吗?或者每个实体是否都需要自己的数据库,该数据库具有仅包含参与者接受的“有效”类型事件的事件日志?
我现在在我的系统中看到了这个确切的问题。如果有人(意外地)运行 1000 个这样的查询,我将有 1000 个实体参与者尝试永远重播这些事件,或者直到我重新启动 Pod。
我最终得到的是无限次尝试使用以下堆栈跟踪重新启动实体参与者
at akka.persistence.typed.internal.ReplayingEvents.onJournalResponse(ReplayingEvents.scala:200)
at akka.persistence.typed.internal.ReplayingEvents.onMessage(ReplayingEvents.scala:98)
at akka.persistence.typed.internal.ReplayingEvents.onMessage(ReplayingEvents.scala:73)
Caused by: java.lang.ClassCastException
这是有道理的,因为类型化的参与者正在尝试处理不同类型的事件。
您说了多个实体(在您的示例中为
fridge
、front door
和coffe machine
)。每个实体回复不同的命令并坚持不同的事件。
当您使用
EventSourcedBehavior.apply() 创建
EventSourcedBehavior
时
/**
* Create a `Behavior` for a persistent actor.
*
* @param persistenceId stable unique identifier for the event sourced behavior
* @param emptyState the intial state for the entity before any events have been processed
* @param commandHandler map commands to effects e.g. persisting events, replying to commands
* @param eventHandler compute the new state given the current state when an event has been persisted
*/
def apply[Command, Event, State](
persistenceId: PersistenceId,
emptyState: State,
commandHandler: (State, Command) => Effect[Event, State],
eventHandler: (State, Event) => State): EventSourcedBehavior[Command, Event, State] = {
val loggerClass = LoggerClass.detectLoggerClassFromStack(classOf[EventSourcedBehavior[_, _, _]], logPrefixSkipList)
EventSourcedBehaviorImpl(persistenceId, emptyState, commandHandler, eventHandler, loggerClass)
}
第一个参数是PersistenceId。您负责使该 ID 独一无二。该对象提供了一个 factory 方法,要求您提供提示和实体 ID
/**
* Constructs a [[PersistenceId]] from the given `entityTypeHint` and `entityId` by
* concatenating them with `|` separator.
*
* Cluster Sharding is often used together with `EventSourcedBehavior` for the entities.
* The `PersistenceId` of the `EventSourcedBehavior` can typically be constructed with:
* {{{
* PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId)
* }}}
*
* That format of the `PersistenceId` is not mandatory and only provided as a convenience of
* a "standardized" format.
*
* Another separator can be defined by using the `apply` that takes a `separator` parameter.
*
* The `|` separator is also used in Lagom's `scaladsl.PersistentEntity` but no separator is used
* in Lagom's `javadsl.PersistentEntity`. For compatibility with Lagom's `javadsl.PersistentEntity`
* you should use `""` as the separator.
*
* @throws IllegalArgumentException if the `entityTypeHint` or `entityId` contains `|`
*/
def apply(entityTypeHint: String, entityId: String): PersistenceId
中详细介绍的那样
持久化 ID
PersistenceId是后端事件日志和快照存储中持久参与者的稳定唯一标识符。
集群分片通常与
一起使用,以确保每个EventSourcedBehavior
(PersistenceId
)只有一个活动实体。有一些技术可以确保这种唯一性,可以在集群分片文档中的持久性示例中找到示例。这说明了如何从entityId
EntityContext提供的和PersistenceId
构造entityTypeKey
entityId
。Cluster Sharding中的
是实体的业务域标识。entityId
可能不够独特,无法单独用作entityId
。例如,两种不同类型的实体可能具有相同的PersistenceId
。要创建唯一的entityId
,PersistenceId
应以实体类型的稳定名称作为前缀,该名称通常与集群分片中使用的entityId
相同。有EntityTypeKey.name
工厂方法可以帮助从PersistenceId.apply
和PersistenceId
构造这样的entityTypeHint
。entityId
您可以以 akka 持久化购物车示例 - ShoppingCart Behaviour 作为一个很好的例子
def apply(cartId: String): Behavior[Command] = {
EventSourcedBehavior[Command, Event, State](
PersistenceId("ShoppingCart", cartId),
State.empty,
(state, command) => ...,
(state, event) => ...
)
}
你的代码应该是这样的
val fridgeEntity =
EventSourcedBehavior[OfficeFridgeCommand, OfficeFridgeEvent, OfficeFridge](
PersistenceId("Fridge"), UUID.randomUUID(),
Fridge.emptyState,
Fridge.commandHandler,
Fridge.eventHandler
)
val frontDoorEntity =
EventSourcedBehavior[OfficeFrontDoorCommand,, OfficeFrontDoorEvent, OfficeFridge](
PersistenceId("FrontDoor"), UUID.randomUUID(),
FrontDoor.emptyState,
FrontDoor.commandHandler,
FrontDoor.eventHandler
)
val coffeeMachineEntity =
EventSourcedBehavior[OfficeCoffeeMachineCommand, OfficeCoffeeMachineEvent, OfficeFridge](
PersistenceId("CoffeeMachine"), UUID.randomUUID(),
CoffeeMachine.emptyState,
CoffeeMachine.commandHandler,
CoffeeMachine.eventHandler
)
将实体的事件持久保存在日志中后,您将能够在数据库中看到类似以下内容
订购 | persistence_id | event_ser_manifest |
---|---|---|
1 | 冰箱| |
冰箱打开 |
2 | 冰箱| |
冰箱关闭 |
... | ... | ... |
1 | 咖啡机| |
咖啡制作 |
2 | 咖啡机| |
咖啡制作 |
... | ... | ... |
1 | 前门| |
前门打开 |
2 | 前门| |
前门关闭 |
一如既往,这取决于情况。如果您需要每秒保留数百万个事件,并且这些事件来自数百万个不同的设备,那么答案是否定的。 Akka 持久化提供不同的持久化插件
每个都有自己的优点和缺点。
您可以在 scala index - akka persistence 找到更多插件,但它们可能已经过时,没有商业支持等。