与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。
我有一个使用两个状态存储的 Kafka Stream 应用程序。我在 Strimzi 集群 (kafka:0.29.0-kafka-3.1.0) 上的 Openshift 上运行此应用程序时遇到问题。 这意味着当我收到 bp-addr 记录时...
有什么办法可以禁用Kafka流处理摘要信息吗?因为它会占用大量磁盘空间 例如 INFO 21284 --- [-StreamThread-6] o.a.k.s.p.internals.StreamThread :流...
有没有办法限制或定义kafka流应用程序的最大内存使用量?我已经启用了状态存储的缓存,但是当我在 Openshift 中部署时,我的 pod 上出现了 OOM 问题。我有
Kafka Stateless KStream:异常时如何提交直到失败点?
假设 Kafka 流按顺序发出事件 1,2,3,4,5。当事件 5 出错时,kafka 流处理器按预期退出。但是,直到事件 4 为止它都无法提交。当我重新启动时...
在kafka-streams中,流上的branch()方法已被弃用。 我从 COnfluence 中找到了一些代码片段,我现在应该使用 split() 和 Branch() 方法,如下所示: artikelEvents.split() ...
GlobalKTable 作为带有 Kafka Streams Binder 的 QueryableStore
我正在使用 spring-boot-starter-parent 版本 3.1.2 和 spring-cloud-stream-binder-kafka-streams 版本 4.0.3 绝大多数在线示例都显示使用 @Input 注释创建 GlobalKTable...
Kafka StreamsUncaughtExceptionHandler REPLACE_THREAD 与 SHUTDOWN_CLIENT
我有一个旧主题,其中包含损坏的消息,我需要完全重新处理该主题,忽略无法处理的消息。这是正确的未捕获异常处理策略
Kafka Streams 使用手动创建的内部主题,给出 TopicAuthorizationException
我使用 KStreams 来消费主题中的数据,对数据执行一些逻辑并写入 KTable。由于安全原因,应用程序无法获得创建内部主题的权限。 巴...
Quarkus Kafka Streams/反应式消息反序列化异常
嘿,所以我正在尝试使用 Kafka Streams 和 MP Reactive Messaging 来读取 Kafka 主题,然后生成回它。 卡夫卡流错误 - org.apache.kafka.streams.errors.
单元测试 KafkaStreams 给出 IllegalArgumentException:未知主题
我有一个应用程序,使用 KStream 从 Kafka 读取数据,根据标头过滤数据,然后写入 KTable。 公共拓扑 buildTopology() { KStream inputStream = bui...
我正在节点上运行一个 kafka 流应用程序,它耗尽了应用程序的内存, 我想在集群级别运行 kafka 流应用程序 作为国家,我怎样才能实现同样的目标......
Kafka Processor API 中 Header 有什么用?
我正在学习Kafka Processor API并在ProcessorContext中找到一个方法头。 标题() 返回当前输入记录的标题;可能 如果不可用则为 null 我什么...
我是 Kafka 和 Kafka Streams 的新手。虽然我已经了解了 Kafka 和 Kafka Streams 的概念,并且在概念上感到自信,但有一件事让我感到困惑。是的,就是
防止 Kafka Streams Consumer 写入偏移量/等待一个流消耗完所有记录后再启动第二个流
这是一个关于流的二合一问题。 我正在开发一项由两个流组成的服务。一个(第一个)应该消耗整个主题,接收键/值对并存储它们的信息......
我正在寻找 Kafka-streams 来进行事件处理。我尝试为 Kakfa-Streams 添加一个拦截器(针对消费者)。 我添加了一个 RecordInterceptor,如下所示: configMap.put(consumerPrefix(
状态存储中的密钥重新平衡如何在 Kafka Streams 中进行分区扩展?
假设一个有状态运行的 Kafka 流有一个包含 16 个分区的输入主题,实例(或任务)的数量也是 16。据我所知,Kafka 的默认分区器
几乎所有的kafka流文档都说它使用RocksDB作为状态存储。我创建了一个简单的流生成器,下面的示例效果很好。我唯一的困惑是 - 我没有指定任何状态
我有一个流应用程序,它消耗消息的速度通常很快,但有时需要很长时间(最多 30 分钟)。该行为是不确定的,我们在处理消息之前不知道......
Rocksdb 似乎在内存中加载了完整的 Ktable 状态存储
在我的kafka流拓扑中,我用kstream连接一个ktable,但由于内存的原因,它一直崩溃:连接DSL为Ktable创建一个状态存储,当这个状态存储很小时,u...
我利用 kafka 流和 spring-cloud-stream-binder-kafka-streams 来构建一个流,该流从主题接收有关 kubernetes 集群中运行的 pod 的信息,并构建出 i...