在 Akka 流中连接两个流

问题描述 投票:0回答:2

我正在尝试连接两个流程,但我无法解释我的实现的输出。

val source = Source(1 to 10)
val sink = Sink.foreach(println)

val flow1 = Flow[Int].map(s => s + 1)
val flow2 = Flow[Int].map(s => s * 10)

val flowGraph = Flow.fromGraph(
    GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val concat = builder.add(Concat[Int](2))
      val broadcast = builder.add(Broadcast[Int](2))

      broadcast ~> flow1 ~> concat.in(0)
      broadcast ~> flow2 ~> concat.in(1)

      FlowShape(broadcast.in, concat.out)
    }
  )

source.via(flowGraph).runWith(sink)

我期望这段代码的输出如下。

2
3
4
.
.
.
11
10
20
.
.
.
100

相反,我只看到打印了“2”。您能否解释一下我的实现中有什么问题以及我应该如何更改程序以获得所需的输出。

scala akka akka-stream
2个回答
5
投票

来自 Akka Stream 的 API 文档:

Concat

当前流有可用元素时发出;如果当前输入完成,它会尝试下一个

Broadcast

当所有输出停止反压且有可用输入元件时发出

这两个运算符不会协同工作,因为它们的工作方式存在冲突 -

Concat
尝试从
Broadcast
的一个输出中提取所有元素,然后再切换到另一个输出,而
Broadcast
获胜除非对其所有输出都有需求,否则不会排放。

对于您需要的内容,您可以按照评论者的建议使用

concat
连接:

source.via(flow1).concat(source.via(flow2)).runWith(sink)

或者等效地,使用

Source.combine
,如下所示:

Source.combine(source.via(flow1), source.via(flow2))(Concat[Int](_)).runWith(sink)

-1
投票

使用

GraphDSL

val sg = Source.fromGraph(
  GraphDSL.create(){ implicit builder =>
    import GraphDSL.Implicits._

    val concat = builder.add(Concat[Int](2))

    source ~> flow1 ~> concat
    source ~> flow2 ~> concat

    SourceShape(concat.out)
  }
)

sg.runWith(sink)

Source.combine在后台使用GraphDSL。

© www.soinside.com 2019 - 2024. All rights reserved.