与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。
在kafka-streams中,流上的branch()方法已被弃用。 我从 COnfluence 中找到了一些代码片段,我现在应该使用 split() 和 Branch() 方法,如下所示: artikelEvents.split() ...
GlobalKTable 作为带有 Kafka Streams Binder 的 QueryableStore
我正在使用 spring-boot-starter-parent 版本 3.1.2 和 spring-cloud-stream-binder-kafka-streams 版本 4.0.3 绝大多数在线示例都显示使用 @Input 注释创建 GlobalKTable...
Kafka StreamsUncaughtExceptionHandler REPLACE_THREAD 与 SHUTDOWN_CLIENT
我有一个旧主题,其中包含损坏的消息,我需要完全重新处理该主题,忽略无法处理的消息。这是正确的未捕获异常处理策略
Kafka Streams 使用手动创建的内部主题,给出 TopicAuthorizationException
我使用 KStreams 来消费主题中的数据,对数据执行一些逻辑并写入 KTable。由于安全原因,应用程序无法获得创建内部主题的权限。 巴...
Quarkus Kafka Streams/反应式消息反序列化异常
嘿,所以我正在尝试使用 Kafka Streams 和 MP Reactive Messaging 来读取 Kafka 主题,然后生成回它。 卡夫卡流错误 - org.apache.kafka.streams.errors.
单元测试 KafkaStreams 给出 IllegalArgumentException:未知主题
我有一个应用程序,使用 KStream 从 Kafka 读取数据,根据标头过滤数据,然后写入 KTable。 公共拓扑 buildTopology() { KStream inputStream = bui...
我正在节点上运行一个 kafka 流应用程序,它耗尽了应用程序的内存, 我想在集群级别运行 kafka 流应用程序 作为国家,我怎样才能实现同样的目标......
Kafka Processor API 中 Header 有什么用?
我正在学习Kafka Processor API并在ProcessorContext中找到一个方法头。 标题() 返回当前输入记录的标题;可能 如果不可用则为 null 我什么...
我是 Kafka 和 Kafka Streams 的新手。虽然我已经了解了 Kafka 和 Kafka Streams 的概念,并且在概念上感到自信,但有一件事让我感到困惑。是的,就是
防止 Kafka Streams Consumer 写入偏移量/等待一个流消耗完所有记录后再启动第二个流
这是一个关于流的二合一问题。 我正在开发一项由两个流组成的服务。一个(第一个)应该消耗整个主题,接收键/值对并存储它们的信息......
我正在寻找 Kafka-streams 来进行事件处理。我尝试为 Kakfa-Streams 添加一个拦截器(针对消费者)。 我添加了一个 RecordInterceptor,如下所示: configMap.put(consumerPrefix(
状态存储中的密钥重新平衡如何在 Kafka Streams 中进行分区扩展?
假设一个有状态运行的 Kafka 流有一个包含 16 个分区的输入主题,实例(或任务)的数量也是 16。据我所知,Kafka 的默认分区器
几乎所有的kafka流文档都说它使用RocksDB作为状态存储。我创建了一个简单的流生成器,下面的示例效果很好。我唯一的困惑是 - 我没有指定任何状态
我有一个流应用程序,它消耗消息的速度通常很快,但有时需要很长时间(最多 30 分钟)。该行为是不确定的,我们在处理消息之前不知道......
Rocksdb 似乎在内存中加载了完整的 Ktable 状态存储
在我的kafka流拓扑中,我用kstream连接一个ktable,但由于内存的原因,它一直崩溃:连接DSL为Ktable创建一个状态存储,当这个状态存储很小时,u...
我利用 kafka 流和 spring-cloud-stream-binder-kafka-streams 来构建一个流,该流从主题接收有关 kubernetes 集群中运行的 pod 的信息,并构建出 i...
我有一个有点奇怪的用例,我们的应用程序没有使用标准的 kafka 分区。相反,我们有一个自定义分区策略,我们在复合中使用特定字段......
使用“几乎”相同的键连接两个 Java KStream(Kafka 主题)
我有两个kafka主题,其中包含与“警告事件”不同的信息。 要知道主题 A 和主题 B 中的哪些条目相互对应,我必须比较序列号、日期和机器...
消费者函数和服务函数访问同一个kafka流的状态存储是线程安全的吗?
我正在使用 kotlin + Spring Boot + Kafka Streams 和 Spring Cloud Stream。 我有一个服务功能,可以验证客户端的请求并将其发送到 kafka 主题。 要验证请求,需要...
Kafka 流测试:java.util.NoSuchElementException:未初始化主题:“output_topic_name”
我已经根据https://kafka.apache.org/24/documentation/streams/developer-guide/testing.html为kafka流应用程序编写了一个测试类 ,其代码是 导入 com.EventSerde; 导入组织。