与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。
在Helidon SE 4.1.6中,如何使用Kafka生产商将数据发送到特定分区
我想使用Helidon SE 4.1.6并使用生产者将数据与Apache Kafka的特定分区一起生产。 细节 : 我已经通过https://helidon.io/docs/latest/se/reactive-messaging#
//定义来自Kafka主题的传入原始数据流采购 datastream主流= env.AddSource(...); //定义来自Kafka主题的参考数据流采购 datastr ...
Kafka流ktable-ktable ktable外国钥匙连接发出null,即使右侧为空
kafka流的语义是什么(3.7.1)ktable-ktable外键联接,其中提取的外键从未与右侧ktable中的主键匹配? 在此示例中...
Kafka Streams 具有多个主题和扩展的分区分配问题
我正在开发一个卡夫卡流应用程序,该应用程序从具有三个主题的消费者组进行消费。一个主题有 20 个分区,另外一个主题有 10 个分区,最后一个主题有 5 个分区。因此,这个消费者组总共有 35 个
org.apache.kafka.common.errors.UnsupportedVersionException:该节点不支持 GET_TELEMETRY_SUBSCRIPTIONS
我正在用java开发Kafka Stream示例,当我启动应用程序时出现以下错误。 错误: 17:34:38.930 [kafka 管理客户端线程 | my-ktable-demo-881a7b7e-9830-4e3c-b7b8-eeaf70a26bbf-admin]
将 repartition.purge.interval.ms 配置设置得太高以防止清除是否有任何副作用?
我们在应用程序中使用 Kafka Streams 来处理事件。 为了加入操作,我们使用 Kafka Streams 的 repartition 和 selectKey 方法,这会导致创建内部修复...
Kafka Stream KTable 外键左连接输出比预期更多的墓碑
迁移到最新的流版本 3.9.0 后,我注意到左外键连接中有一个我无法理解的行为。 对于左外键连接: KTable ...
我有两个Kafka队列(queue1和queue2),并且多个生产者(客户端)正在将消息发布到两个队列。每个客户端向队列发送不同类型的消息(例如,client1 send &q...
我们有一个Java应用程序,它使用Streams库来处理数据。流应用程序不会连接来自多个主题的数据,而是独立处理收到的每条消息。应用...
在 Spring Boot Kafka 流应用程序中创建多个拓扑的最简单或最佳方法是什么? 是否可以使用相同的默认 StreamBuilder bean?或者如果我需要创建一个新的
卷曲此 https://right-boa-11231-eu1-rest-kafka.upstash.io/schema-registry/schemas/ids/8?fetchMaxId=false&subject=test1-value 返回 {“架构”:“{\”类型\“:\”r...
我正在开发一个 Quarkus 微服务,它利用 Kafka Streams 处理来自多个主题的消息。 具体来说,我正在尝试加入从其中两个顶部派生的 KStream 和 KTable...
Kafka Streams:负载下 KStream-KTable 连接结果不一致
我正在开发一个 Quarkus 微服务,它利用 Kafka Streams 处理来自多个主题的消息。 具体来说,我正在尝试加入从其中两个顶部派生的 KStream 和 KTable...
假设我们有一个 Kafka 生产者,它将消息发送到一个主题,然后由 Kafka Streams 应用程序读取该主题,然后将这些消息放入状态存储中以供稍后进一步处理。 我们...
在 Kafka Streams KGroupedTable.aggregate 中,加法器和减法器是否应该在每次调用时返回一个新的不可变聚合器?
我想将 KTable 条目聚合到集合映射(例如 Map>),因此在聚合函数中我需要提供加法器和减法器,如下所示: (键、值、映射)...
具有全局 K 表的 Kafka 流应用程序是否仅根据输入主题保留期中定义的内容保留记录? 我知道在启用更改日志的普通 K 表中,它会创建一个更改日志
我正在使用KStreams进行表、表左连接操作,剧集记录是针对特定节目的,这些剧集记录用于丰富。 即使记录已经存在...
我试图了解我是否可以使用 Kafka Streams、通过宽限配置或以任何其他方式实现以下行为: 1 分钟的翻滚窗口 时间取自
我有一个 kafkaStream 拓扑,如下所示: 溪流 .filter(((key, Trade) -> Trade.tradeTime != null && Trade.tradeTime > TodayMillis )) .