与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。
Kafka 使用 commit.interval.ms 自定义抑制行为
在Kafka streams中,如果我们有多个partition,想要根据一个key来聚合消息,只为key产生聚合的最终结果。我们不得不使用自定义抑制器使用
kafka streaming application 不会消耗所有主题,也不会抛出错误
我已经编写了 java springboot kafka 流处理器应用程序(Kubernetes 1 pod,扩展到 3 以查看是否有帮助,传入消息在上午 12 点后增加),它有几个输入目的地...
我正在尝试设置我的 Kafka S3 源连接器以从我的 S3 存储桶中提取文件。但是,当我在连接时检查连接器的状态时,我将其作为错误响应 error_code: 500
如果使用消息的 kafka 流应用程序有一些异常,如何从 apache Kafka 重新读取消息
我正在使用 kafka 流读取来自 kafka 的消息进行实时处理。 然后,如果在...中计算消息时出现错误,我会将此消息发送给其他 rest api(最终服务)
假设我在流 A 中有一条具有以下架构的记录: {时间戳,json} 示例记录: {“行”:{“列”:[1678710830000,{“1”:“abc”,“2”:...
为什么 KStreams 的 outerJoin 滑动窗口的不完整结果在新的不相关记录稍后到达之前不会被刷新?
我有三个主题和一个从中读取的 KStream 应用程序。 x、y 和 z 将收到消息,我想使用 KStream 应用程序中的窗口处理它们。 关键是,在同一个窗口中...
为什么 KStreams 的 outerJoin 滑动窗口的不完整结果直到新的不相关记录稍后到达才发布?
我有三个主题和一个从中读取的 KStream 应用程序。 x、y 和 z 将收到消息,我想使用 KStream 应用程序中的窗口处理它们。 关键是,在同一个窗口中...
我想了解一下 Kafka 备用副本的工作原理! 鉴于这种情况; 一个 Kafka 流应用程序; 从具有 10 个分区的源主题读取 写给...
我有三个主题和一个从中读取的 KStream 应用程序。 x、y 和 z 将收到消息,我想使用 KStream 应用程序中的窗口处理它们。 关键是,在同一个窗口中...
我想用 Java Quarkus 创建一个微服务来在运行时订阅多个主题。该服务将定期(每天一次)从 API 读取主题并更新主题以...
为 Springboot 应用程序的多个实例设置 Actuator Kafkastreams 健康检查
我们有 x 个 KafkaStreams Springboot 应用程序实例在我们的云环境中并行运行。我想用 Springboot Actuator 实现健康检查,它会返回
我在用 卡夫卡 0.8.2.1 火花 2.1.2 我试图运行一个代码,它将数据从 kafka 流式传输到 spark bu 我收到这个错误 文件“c:/Users/anish/OneDrive/Desktop/major project/
我有一个关于测试kafak流的问题,特别是测试kafka流内部的连接。 我有一个员工的小用例,我想加入保险。输出将是...
在使用 PySpark 消费来自 Kafka 的消息时处理架构演变
我是卡夫卡的新手。目前我正在处理一个要求 - 用例: 我正在使用来自 Kafka 的消息(消息由上游团队在 Kafka 中生成)。上游团队不维护...
来自 Kafka 主题的 Spark 消费者流不断重置偏移量
所以我正在运行一个 spark 流程序,并使用 readStream 方法的以下选项从 Kafka 主题读取流。 选项= { ... ... “kafka.security.
无法使用 MySQL 连接器 Kafka 中的 where 子句运行查询
JSON 文件: { “名称”:“mysql-jdbc”, “配置”:{ “connector.class”:“io.confluent.connect.jdbc.JdbcSourceConnector”, “连接。你...
使用调度程序和 kafka 时勇敢的行李引发 outOfMemoryError
Vsem 女贞。 V svoyem prilozheniye ya ispol'zuyu kafka stream i standartnyy scheduler postavlyayemyy spring。 Pri rabote Scheduler otpravlyayutsya soobshcheniya v kafku a potom vychityvayutsya ot t...
用于查看来自微服务的所有 Kafka 输入和输出主题的软件 [关闭]
作为全面了解我组织中每个微服务使用的所有 Kafka 主题的一部分,我正在寻找一种自动化工具,它可以扫描并提供我...
如何将 Kafka 流的简单整数密钥转换为复杂的 Avro 类型密钥
我有一个流式应用程序拓扑结构,它使用一个带有整数键和 AVRO 主体的简单主题。我想操纵流并使用复杂的 AVRO 密钥写入主题。但是我...
我写了一个带有 kafka 流的 Quarkus 应用程序(2.16 版)。由于 kafka 主题中的大量消息,我的应用程序的消费者组计数滞后。所以入住后...