apache-kafka-streams 相关问题

与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。

如何使用branch()方法获取下游

在kafka-streams中,流上的branch()方法已被弃用。 我从 COnfluence 中找到了一些代码片段,我现在应该使用 split() 和 Branch() 方法,如下所示: artikelEvents.split() ...

回答 1 投票 0

GlobalKTable 作为带有 Kafka Streams Binder 的 QueryableStore

我正在使用 spring-boot-starter-parent 版本 3.1.2 和 spring-cloud-stream-binder-kafka-streams 版本 4.0.3 绝大多数在线示例都显示使用 @Input 注释创建 GlobalKTable...

回答 1 投票 0

Kafka StreamsUncaughtExceptionHandler REPLACE_THREAD 与 SHUTDOWN_CLIENT

我有一个旧主题,其中包含损坏的消息,我需要完全重新处理该主题,忽略无法处理的消息。这是正确的未捕获异常处理策略

回答 1 投票 0

Kafka Streams 使用手动创建的内部主题,给出 TopicAuthorizationException

我使用 KStreams 来消费主题中的数据,对数据执行一些逻辑并写入 KTable。由于安全原因,应用程序无法获得创建内部主题的权限。 巴...

回答 1 投票 0

Quarkus Kafka Streams/反应式消息反序列化异常

嘿,所以我正在尝试使用 Kafka Streams 和 MP Reactive Messaging 来读取 Kafka 主题,然后生成回它。 卡夫卡流错误 - org.apache.kafka.streams.errors.

回答 1 投票 0

单元测试 KafkaStreams 给出 IllegalArgumentException:未知主题

我有一个应用程序,使用 KStream 从 Kafka 读取数据,根据标头过滤数据,然后写入 KTable。 公共拓扑 buildTopology() { KStream inputStream = bui...

回答 1 投票 0

Kafka Streams 节点上内存不足

我正在节点上运行一个 kafka 流应用程序,它耗尽了应用程序的内存, 我想在集群级别运行 kafka 流应用程序 作为国家,我怎样才能实现同样的目标......

回答 1 投票 0

Kafka Processor API 中 Header 有什么用?

我正在学习Kafka Processor API并在ProcessorContext中找到一个方法头。 标题() 返回当前输入记录的标题;可能 如果不可用则为 null 我什么...

回答 2 投票 0

Kafka Streams 中的状态存储与 Ktable

我是 Kafka 和 Kafka Streams 的新手。虽然我已经了解了 Kafka 和 Kafka Streams 的概念,并且在概念上感到自信,但有一件事让我感到困惑。是的,就是

回答 1 投票 0

防止 Kafka Streams Consumer 写入偏移量/等待一个流消耗完所有记录后再启动第二个流

这是一个关于流的二合一问题。 我正在开发一项由两个流组成的服务。一个(第一个)应该消耗整个主题,接收键/值对并存储它们的信息......

回答 1 投票 0

Kafka-Streams 消费者的记录拦截器

我正在寻找 Kafka-streams 来进行事件处理。我尝试为 Kakfa-Streams 添加一个拦截器(针对消费者)。 我添加了一个 RecordInterceptor,如下所示: configMap.put(consumerPrefix(

回答 2 投票 0

状态存储中的密钥重新平衡如何在 Kafka Streams 中进行分区扩展?

假设一个有状态运行的 Kafka 流有一个包含 16 个分区的输入主题,实例(或任务)的数量也是 16。据我所知,Kafka 的默认分区器

回答 1 投票 0

kafka 流 - 默认状态存储在哪里

几乎所有的kafka流文档都说它使用RocksDB作为状态存储。我创建了一个简单的流生成器,下面的示例效果很好。我唯一的困惑是 - 我没有指定任何状态

回答 1 投票 0

Kafka Streams - 暂停和恢复

我有一个流应用程序,它消耗消息的速度通常很快,但有时需要很长时间(最多 30 分钟)。该行为是不确定的,我们在处理消息之前不知道......

回答 1 投票 0

Rocksdb 似乎在内存中加载了完整的 Ktable 状态存储

在我的kafka流拓扑中,我用kstream连接一个ktable,但由于内存的原因,它一直崩溃:连接DSL为Ktable创建一个状态存储,当这个状态存储很小时,u...

回答 1 投票 0

KStream-KStream Join 不触发左连接

我利用 kafka 流和 spring-cloud-stream-binder-kafka-streams 来构建一个流,该流从主题接收有关 kubernetes 集群中运行的 pod 的信息,并构建出 i...

回答 1 投票 0

防止 kafka-streams 中基于密钥的重新分区

我有一个有点奇怪的用例,我们的应用程序没有使用标准的 kafka 分区。相反,我们有一个自定义分区策略,我们在复合中使用特定字段......

回答 1 投票 0

使用“几乎”相同的键连接两个 Java KStream(Kafka 主题)

我有两个kafka主题,其中包含与“警告事件”不同的信息。 要知道主题 A 和主题 B 中的哪些条目相互对应,我必须比较序列号、日期和机器...

回答 0 投票 0

消费者函数和服务函数访问同一个kafka流的状态存储是线程安全的吗?

我正在使用 kotlin + Spring Boot + Kafka Streams 和 Spring Cloud Stream。 我有一个服务功能,可以验证客户端的请求并将其发送到 kafka 主题。 要验证请求,需要...

回答 0 投票 0

Kafka 流测试:java.util.NoSuchElementException:未初始化主题:“output_topic_name”

我已经根据https://kafka.apache.org/24/documentation/streams/developer-guide/testing.html为kafka流应用程序编写了一个测试类 ,其代码是 导入 com.EventSerde; 导入组织。

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.