apache-kafka-streams 相关问题

与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。

防止 kafka-streams 中基于密钥的重新分区

我有一个有点奇怪的用例,我们的应用程序没有使用标准的 kafka 分区。相反,我们有一个自定义分区策略,我们在复合中使用特定字段......

回答 1 投票 0

使用“几乎”相同的键连接两个 Java KStream(Kafka 主题)

我有两个kafka主题,其中包含与“警告事件”不同的信息。 要知道主题 A 和主题 B 中的哪些条目相互对应,我必须比较序列号、日期和机器...

回答 0 投票 0

消费者函数和服务函数访问同一个kafka流的状态存储是线程安全的吗?

我正在使用 kotlin + Spring Boot + Kafka Streams 和 Spring Cloud Stream。 我有一个服务功能,可以验证客户端的请求并将其发送到 kafka 主题。 要验证请求,需要...

回答 0 投票 0

Kafka 流测试:java.util.NoSuchElementException:未初始化主题:“output_topic_name”

我已经根据https://kafka.apache.org/24/documentation/streams/developer-guide/testing.html为kafka流应用程序编写了一个测试类 ,其代码是 导入 com.EventSerde; 导入组织。

回答 1 投票 0

Kafka 清理策略“COMPACT”:消息未从主题中清除

我有 2 个主题 topic1 和 topic2 我正在将消息生成到测试主题中 topic1的配置如下: 主题:topic1 TopicId: PartitionCount:1 ReplicationFactor...

回答 0 投票 0

Kafka-在复合键上连接 KStream 和 KTable

我有两个主题 - 计划和供应商 plan 主题有一个基于两列的复合键 (avro):planCode + memberAge。 供应商主题以供应商 ID 列为键,它包含 planCo 列...

回答 1 投票 0

Kafka Stream 每次迭代执行多个标点符号

我有一个带有时间表的变压器 context.schedule(scanFrequency, PunctuationType.WALL_CLOCK_TIME, new MyPunctuator(stateStore)); 然后是我的标点符号类 公共类 MyPunctuator 实现

回答 2 投票 0

Kafka Streams 有键值状态存储的保留期概念吗?

我正在使用 Kafka Streams 开发一个项目,我正在使用键值状态存储来为我的应用程序存储一些数据。我需要能够为状态存储配置保留期,以便

回答 0 投票 0

Kafka Streams CDC 处理以生成具有外键表的合并记录

假设两个数据库表正在被 Kafka Connect for CDC 监控: 表A ------ ID 姓名 B_ID (FK) 表B ------ ID 姓名 我有一个 KStream 应用程序,它将从 ...

回答 0 投票 0

如何在 azure event hub 上使用 prem 上的 kafka 消息?

我想在 azure 事件中心中使用来自已经存在的 prem kafka 服务的 kafka 消息。 我有 topic 和 brokerlist 。 我找不到我应该在事件中配置它们的位置......

回答 1 投票 0

Complex 在历史数据上加入 Kafka Streams

我想弄清楚我是否应该将数据发送到数据库并使用数据库执行复杂的连接,或者是否可以避免使用数据库。 考虑具有以下结构的 Kafka 主题:

回答 0 投票 0

如何处理 kafka KStream 并直接写入 API 而不是发送另一个主题

How to Process a kafka KStream and write to API directly instead of send it another topic 未尝试,因为不确定由谁实施

回答 0 投票 0

如何发送时间窗 KTable 的最终 kafka-streams 聚合结果?

我想做的是: 从数字主题(Long's)中消费记录 聚合(计数)每个 5 秒窗口的值 将 FINAL 聚合结果发送到另一个主题 我的代码看起来像...

回答 3 投票 0

如何在使用 Kafka Stream 从 RTopic 读取数据时对外部系统进行 rest 调用

主要需求是从主题中读取数据,并通过rest API将数据发送到外部系统。也有要求以相同的顺序将消息发送到目标系统。 电子...

回答 0 投票 0

卡夫卡流 |有人可以指导如何在使用 Kafka Stream 从 RTopic 读取数据时对外部系统进行休息调用

卡夫卡流 |有人可以指导如何在使用 Kafka Stream 从 RTopic 读取数据时对外部系统进行休息调用 用例:主要需求是从主题中读取数据,然后...

回答 0 投票 0

使用 Kafka Streams 将数据写入外部主题

我想创建一个 Kafka Streams 应用程序,它从内部 Kafka 主题读取数据并写入位于另一个集群上的外部主题。 有什么办法可以写到一个exte...

回答 1 投票 0

通用类型的 Kafka Streams Serdes

我为 Kafka 流写了一个 Serde。我的 Serde 类具有通用类型。当我在 API 方法中显式传递它(覆盖默认值)时它工作正常但当我通过

回答 0 投票 0

我们可以只为 kafka 禁用 log4j 日志吗

我正在使用以下 log4j.properties log4j.rootLogger=调试,标准输出 log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.

回答 4 投票 0

Kafka- 使用非键值将 Kafka 流与全局 K 表连接

我看过多个帖子,说明可以使用记录值而不是全局 k 表上的键将 kafka 流与全局 k 表连接起来 https://kafka.apache.org/20/documentation/st...

回答 1 投票 0

如何确定 InvalidProducerEpochException/ProducerFencedException 的根本原因以及如何修复它

我们有一个在 AWS 中运行的 Kafka 流 spring boot 应用程序。 springKafka版本:2.8.7 apacheKafkaClient版本:3.0.2 融合版本:5.5.5 中间的一些性能测试的一部分......

回答 0 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.