我们有这些 POJO:
@Data
@NoArgsConstructure
@AllArgsConstructure
class MyPost {
private String content;
private SeenInfo seenInfo;
}
@Data
@NoArgsConstructure
@AllArgsConstructure
class SeenInfo {
private Integer seenCount;
//other fields...
}
以及我们应用程序中的左连接过程:
@Bean
public Function<KStream<String, MyPost>, Function<KStream<String, SeenInfo>, KStream<String, MyPost>>> joinProcess(Map<String, String> schemaConfig) {
return postStream ->
seenInfoStream -> {
SpecificAvroSerde<MyPost> postSerde = new SpecificAvroSerde<>();
SpecificAvroSerde<SeenInfo> seenInfoSerde = new SpecificAvroSerde<>();
postSerde.configure(schemaConfig, true);
seenInfoSerde.configure(schemaConfig, true);
return postStream.leftJoin(seenInfoStream,
(p, s) -> {
p.setSeenInfo(s);
return p;
},
JoinWindows.of(Duration.ofMinutes(5)),
StreamJoined.with(Serdes.String(),
postSerde,
seenInfoSerde));
};
}
当 MyPost 和 SeenInfo 匹配值在 5 分钟内出现时,加入过程会生成两条消息:
Message1: MyPost={ "content": "some text", "seenInfo": null}
Message2: MyPost={ "content": "some text", "seenInfo": { "seenCount": 1, ...}}
如果 MyPost 存在而 SeenInfo 不存在,则加入过程将不会返回任何数据。
我们期望:
Message: MyPost={ "content": "some text", "seenInfo": null}
我们应该怎么做才能解决这个问题?
似乎您正在使用旧版本的 Kafka Streams,它面临着关于“虚假左连接结果”的第一个问题。它在 Kafka 3.1 版本中已修复(参见 https://issues.apache.org/jira/browse/KAFKA-10847)。
对于第二个问题,是的,您应该得到一个输出 - 如果您使用版本 3.1,丢失输出的唯一解释是您的输入流“停止/暂停”并且没有新数据到达 - 如果没有连接窗口不会关闭的新数据左连接结果会“卡住”,直到时间推进(基于记录时间戳,因此需要新的输入数据)。