与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。
我有 2 个主题 topic1 和 topic2 我正在将消息生成到测试主题中 topic1的配置如下: 主题:topic1 TopicId: PartitionCount:1 ReplicationFactor...
Kafka-在复合键上连接 KStream 和 KTable
我有两个主题 - 计划和供应商 plan 主题有一个基于两列的复合键 (avro):planCode + memberAge。 供应商主题以供应商 ID 列为键,它包含 planCo 列...
我有一个带有时间表的变压器 context.schedule(scanFrequency, PunctuationType.WALL_CLOCK_TIME, new MyPunctuator(stateStore)); 然后是我的标点符号类 公共类 MyPunctuator 实现
我正在使用 Kafka Streams 开发一个项目,我正在使用键值状态存储来为我的应用程序存储一些数据。我需要能够为状态存储配置保留期,以便
Kafka Streams CDC 处理以生成具有外键表的合并记录
假设两个数据库表正在被 Kafka Connect for CDC 监控: 表A ------ ID 姓名 B_ID (FK) 表B ------ ID 姓名 我有一个 KStream 应用程序,它将从 ...
如何在 azure event hub 上使用 prem 上的 kafka 消息?
我想在 azure 事件中心中使用来自已经存在的 prem kafka 服务的 kafka 消息。 我有 topic 和 brokerlist 。 我找不到我应该在事件中配置它们的位置......
Complex 在历史数据上加入 Kafka Streams
我想弄清楚我是否应该将数据发送到数据库并使用数据库执行复杂的连接,或者是否可以避免使用数据库。 考虑具有以下结构的 Kafka 主题:
如何处理 kafka KStream 并直接写入 API 而不是发送另一个主题
How to Process a kafka KStream and write to API directly instead of send it another topic 未尝试,因为不确定由谁实施
如何发送时间窗 KTable 的最终 kafka-streams 聚合结果?
我想做的是: 从数字主题(Long's)中消费记录 聚合(计数)每个 5 秒窗口的值 将 FINAL 聚合结果发送到另一个主题 我的代码看起来像...
如何在使用 Kafka Stream 从 RTopic 读取数据时对外部系统进行 rest 调用
主要需求是从主题中读取数据,并通过rest API将数据发送到外部系统。也有要求以相同的顺序将消息发送到目标系统。 电子...
卡夫卡流 |有人可以指导如何在使用 Kafka Stream 从 RTopic 读取数据时对外部系统进行休息调用
卡夫卡流 |有人可以指导如何在使用 Kafka Stream 从 RTopic 读取数据时对外部系统进行休息调用 用例:主要需求是从主题中读取数据,然后...
我想创建一个 Kafka Streams 应用程序,它从内部 Kafka 主题读取数据并写入位于另一个集群上的外部主题。 有什么办法可以写到一个exte...
我为 Kafka 流写了一个 Serde。我的 Serde 类具有通用类型。当我在 API 方法中显式传递它(覆盖默认值)时它工作正常但当我通过
我正在使用以下 log4j.properties log4j.rootLogger=调试,标准输出 log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.
Kafka- 使用非键值将 Kafka 流与全局 K 表连接
我看过多个帖子,说明可以使用记录值而不是全局 k 表上的键将 kafka 流与全局 k 表连接起来 https://kafka.apache.org/20/documentation/st...
如何确定 InvalidProducerEpochException/ProducerFencedException 的根本原因以及如何修复它
我们有一个在 AWS 中运行的 Kafka 流 spring boot 应用程序。 springKafka版本:2.8.7 apacheKafkaClient版本:3.0.2 融合版本:5.5.5 中间的一些性能测试的一部分......
我在 Kafka 主题“原始数据”中获取 CSV,目标是通过在另一个主题“数据”中发送具有正确时间戳(每行不同)的每一行来转换它们。 目前,我有 2 str ...
Spring Cloud Stream kafka:如何为每个消费者主题创建 DLQ 配置?
以下属性没有按预期工作,我想在重试后为每个消费者主题创建一个 DLQ 溪流: 卡夫卡: 流: binder.deserializationExceptionHandler:sendToDlq 绑定: 李...
我正在使用 Spring Boot 和 Micrometer 构建 Kafka Streams 应用程序。我想知道如何收集/公开以下指标: 每个任务从重新平衡到运行所需的延迟。理想...
从 Kafka Streams 向 Spring Beans 有效公开全球商店
我想通过普通的 Spring Beans 查询 Kafka Stream 全局故事,例如通过@Controller。 我找到了两个例子: SO:使用 Spring 访问 Kafka Stream State Store 自动装配