我正在使用
.outerJoin
将两个流连接在一起。预期的行为是我会在输出中获得两个输入中所有记录的记录;但实际上我只得到一个输出,其中两个输入中都有一条具有相同键的记录 - 这与文档直接矛盾。
代码如下:
KStream<String, String> originalStream = builder.stream("original-topic");
KStream<String, String> augmentedStream = builder.stream("augmented-topic");
KStream<String, String> mergedStream = originalStream.outerJoin(augmentedStream, (value1,value2)->
{
if (value2 == null) // no augmented message
{
System.out.println("No Augmented message for original message " + value1);
return value1;
}
else if(value1 == null) // no raw message
{
System.out.println("No original message for Augmented message " + value2);
}
else
{
System.out.println("pair found: " + value1 + "," + value2);
}
return value2;
}
// Add a time-based join window
,JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(1)));
测试数据(请注意,增强数据中不存在 13,原始数据中不存在 19):
// populate some test data
try(Producer<String, String> producer = new KafkaProducer<>(config, new StringSerializer(), new StringSerializer()))
{
Arrays.asList("11,a","12,c","13,e","14,g","15,i","16,k","17,m").forEach(s->producer.send(new ProducerRecord<>("original-topic",s.split(",")[0],s.split(",")[1])));
Arrays.asList("11,ta","12,tc","12,td","14,tg","15,ti","16,tk","17,tm","19,to").forEach(s->producer.send(new ProducerRecord<>("augmented-topic",s.split(",")[0],s.split(",")[1])));
producer.flush();
}
输出:
pair found: a,ta
pair found: c,tc
pair found: c,td
pair found: g,tg
pair found: i,ti
pair found: k,tk
pair found: m,tm
如您所见,我期望 13 (
e
) 和 19 (to
) 的打印输出,表示它们分别不在增强流和原始流中;但那里什么也没有 - 并且断点确认代码不会被调用 13 和 19,即使外连接应该用 null 作为另一个值来执行此操作。我知道它会等到加入窗口之后才这样做;但鉴于我将其设置为 1 秒,应该不会花很长时间。
我在这里缺少什么?
更新 - 如果我在写入之间添加延迟(并将单个项目移动到数组的前面),那么这可以工作(但当然找不到任何对,只有未配对的项目,因为 1100 比窗口周期长) ):
Arrays.asList("13,e","11,a","12,c","14,g","15,i","16,k","17,m").forEach(s->
{
producer.send(new ProducerRecord<>("original-topic",s.split(",")[0],s.split(",")[1]));
try {
Thread.sleep(900);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
Arrays.asList("19,to","11,ta","12,tc","12,td","14,tg","15,ti","16,tk","17,tm","17,tm").forEach(s->
{
producer.send(new ProducerRecord<>("augmented-topic",s.split(",")[0],s.split(",")[1]));
try {
Thread.sleep(1100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
No Augmented message for original message e
No Augmented message for original message a
No Augmented message for original message c
No Augmented message for original message g
No Augmented message for original message i
No Augmented message for original message k
No Augmented message for original message m
No original message for Augmented message to
No original message for Augmented message ta
No original message for Augmented message tc
No original message for Augmented message td
No original message for Augmented message tg
No original message for Augmented message ti
No original message for Augmented message tk
如果我将窗口延长到 10 秒,它就会再次找到这些对;这次它也发现
e
是一个未配对的项目;但不是to
。
pair found: a,ta
pair found: c,tc
pair found: c,td
pair found: g,tg
No Augmented TD message for original message e
pair found: i,ti
pair found: k,tk
pair found: m,tm
pair found: m,tm
所以,对我来说,如果只有一小部分消息流,似乎有一些东西阻止它找到未配对的项目进行连接?
今天也经历过同样的事情。 我还发现,如果您继续定期向输入主题发送消息,那么最终会发出那些不匹配的记录。
顺便问一下,您是否找到了解决此行为的优雅解决方案?没有分组和压制...