有两个kafka主题
新闻主题中的消息可以包含图像 ID 列表,如下所示
{
"id": "news-1",
"title": "Title news-1",
"description": " description news-1",
"author": " Author news-1",
"imageIds": [
"images-1",
"images-2"
]
}
图像主题中的消息如下所示
{
"id": "image-1",
"url": "https://www.mypublication.co.uk/image-title-1.jpeg",
"description": "title-1 description",
"height": 400,
"width": 450
}
{
"id": "image-2",
"url": "https://www.mypublication.co.uk/image-title-2.jpeg",
"description": "title-2 description",
"height": 400,
"width": 450
}
我正在尝试加入这两个流来填充包含所有图像细节的最终新闻消息。我尝试使用
groupBy 和 aggregate 如下
KTable<String, Image> images = builder.table(topics.getImagesTopic(), Consumed.with(Serdes.String(), imageSerde));
KStream<String, News> news = builder.stream(topics.getNewsTopic(), Consumed.with(Serdes.String(), newsSerde));
KTable<String, NewsImages> newsImagesKTable = news.flatMapValues(newsArticle -> newsArticle.getImageIds())
.map((newsId, imageId) -> new KeyValue<>(imageId, newsId)) // rekey not good !!?
.join(images, (newsId, image) -> {
return new ImageWrapper(newsId, image);
}, Joined.with(Serdes.String(), Serdes.String(), imageSerde))
.groupBy((imageId, imageWrapper) -> imageWrapper.getNewsId(), Grouped.with(Serdes.String(), imageWrapperSerde))
.aggregate(NewsImages::new, (newsId, image, newsImages) -> {
newsImages.setNewsId(newsId);
newsImages.addImage(image);
return newsImages;
}, Materialized.with(Serdes.String(),newsImagesSerde));
newsImagesKTable.toStream().
to(topics.getNewsImagesTopic());
但正如预期的那样,上面的代码聚合了新闻的所有图像当作者第一次用两张图片发布新闻时,进展顺利,我们可以看到下面的输出
"news-1" :
{
"newsId":"news-1",
"images":
{"image-1":{"id":"image-1","url":"https://www.mypublication.co.uk/image-1.jpeg","description":"title-1 description","height":400,"width":450},
"image-2":{"id":"image-2","url":"https://www.mypublication.co.uk/image-2.jpeg","description":"title-2 description","height":400,"width":450}}
}
当作者重新发布仅包含 image-3 的文章时,它正在输出所有三个图像(这就是聚合器)
新闻-1:[图像-1,图像-2,图像-3]
我正在寻找任何其他替代方法来加入新闻和图像并覆盖重新发布新闻时的先前值新闻-1:[图片-3]
https://docs.confluence.io/platform/current/streams/developer-guide/dsl-api.html#ktable-ktable-foreign-key-加入
有关更多详细信息,请查看https://docs.confluence.io/platform/current/streams/developer-guide/dsl-api.html#ktable-ktable-foreign-key-join
要实现此目的,您需要将两个输入作为连接的 KTable 进行处理。对于左侧的“新闻”输入,您需要将每个原始输入记录拆分为 N 个记录,每个记录对应您要加入的每个图像,然后聚合最终结果。“准备”左侧数据的最简单方法可能是自定义有状态
Processor
。
builder.stream("news")
.process(....)
.toTable()
.join(imageTable)
.groupBy(... -> news.id)
.aggregate(...);
process()
步骤将是平面映射,将每个输入行转换为N个输出,每个图像一个,使用
<(news.id, image.id),...>
作为主键,(并且还保留
image.id
作为值中的字段(以便能够提取
image.id
作为连接的 FK(参见https://cwiki.apache.org/confluence/display/KAFKA/KIP-1104%3A+Allow+Foreign+Key+Extraction+from+Both+Key+和+Value+in+KTable+Joins,这将避免需要在值中包含图像ID)。
process()
需要有状态才能生成删除/逻辑删除...状态仅存储带有 news.id 键的完整输入记录。对于每个输入记录,图像列表必须与旧图像列表进行比较,并且对于每个新图像,生成常规输出记录,并且对于每个丢失图像,生成墓碑。例如输入
{
"id": "news-1",
"title": "Title news-1",
"description": " description news-1",
"author": " Author news-1",
"imageIds": [
"images-1",
"images-2"
]
}
产生两个输出记录<(news-1,image-1), ...>
和
<(news-1,image-2),...>
。如果我们收到新闻更新-1:
{
"id": "news-1",
"title": "Title news-1",
"description": " description news-1",
"author": " Author news-1",
"imageIds": [
"images-1",
"images-3"
]
}
输出将为 <(news-1,image-2), null>
(删除/逻辑删除)和
<(news-1,image-3),...>
。