在Flink中的两个不同流中使用相同的运算符

问题描述 投票:0回答:1

我想在两个不同的流中使用相同的运算符。但是,我收到一个错误,指出UID或该运算符不是唯一的。

lazy val opt: DataStream[Foo] => DataStream[Buzz] = src => src.flatMap(new MyFlatMapFunc).uid("opt")

lazy val pipe1 = : DataStream[Foo] => DataStream[Buzz] = src => opt(src). // Do keyBy and other logic

lazy val pipe2 = : DataStream[Foo] => DataStream[Buzz] = src => opt(src). // Do some other logic

我得到一个例外:

线程“ main”中的异常java.lang.IllegalArgumentException:用户指定的ID“ opt”上的哈希冲突。最可能的原因是非唯一ID。请检查通过uid(String)指定的所有ID是否唯一。

scala apache-flink flink-streaming
1个回答
0
投票

这是因为uid在运算符上,将在管道中使用两次。您有两个选择,您可以将两个流都union合并为一个,以便仅使用一次运算符,或者可以稍微更改逻辑,以便分配不同的ID:

lazy val opt: (DataStream[Foo], String) => DataStream[Buzz] = (src, id) => src.flatMap(new MyFlatMapFunc).uid(id)

lazy val pipe1 = : (DataStream[Foo], String) => DataStream[Buzz] = src => opt(src, "firstOpt"). // Do keyBy and other logic

lazy val pipe2 = : (DataStream[Foo], String) => DataStream[Buzz] = src => opt(src, "secondOpt"). // Do keyBy and other logic
© www.soinside.com 2019 - 2024. All rights reserved.