[MyActor
收到Start
消息时,它将运行Akka Stream
,并将收到的每个项目发布到Akka Event Stream
。
class MyActor (implicit system: ActorSystem, materialize: Materializer, ec: ExecutionContext) extends Actor {
override def receive: Receive = {
case Start =>
someSource
.toMat(Sink.foreach(item => system.eventStream.publish(item)))(Keep.left)
.run()
}
}
现在在另一个代码块中,我想从该事件流中的那些项目中构建一个Source
,因此可以在另一个Akka Stream
中处理每个发布的项目。
我该怎么做?
如果可能会添加更多选项,请注意,所讨论的另一代码块是Play framework
的Websocket处理程序。
这似乎是XY problem。如果发布者和订阅者最终脱钩,那么如果发布者生成数据的速度比订阅者快,该怎么办?
话虽如此,这是一种您所要求的方式:
/** Produce a source by subscribing to the Akka actorsystem event bus for a
* specific event type.
*
* @param bufferSize max number of events to buffer up in the source
* @param overflowStrategy what to do if the event buffer fills up
*/
def itemSource[Item : ClassTag](
bufferSize: Int = 1000,
overflowStrategy: OverflowStrategy = OverflowStrategy.dropNew
)(
implicit system: ActorSystem
): Source[Item, NotUsed] = Source
.lazily { () =>
val (actorRef, itemSource) = Source
.actorRef[Item](
completionMatcher = PartialFunction.empty,
failureMatcher = PartialFunction.empty,
bufferSize,
overflowStrategy
)
.preMaterialize()
system.eventStream.subscribe(actorRef, classTag[Item].runtimeClass)
itemSource
}
.mapMaterializedValue(_ => NotUsed)
我终于让它与BroadcastHub一起使用,不再有Akka事件流。
我的发布者(本身正在使用来源)看起来像这样:
val publisherSource = someSource
.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right)
.run()
然后在另一个代码块中,我只需要引用PublisherSource:
val subscriberSource = publisherSource
.map(...) // What ever
您可以根据需要拥有任意数量的SubscriberSource,他们都将收到相同的项目。