与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。
KafkaStreams 一开始不读取来自 kafka 的消息
@Bean 公共 KafkaStreams kafkaStreams(KafkaStreamsConfiguration StreamsConfig) { 属性 props = new Properties(); props.put("bootstrap.servers", "localhost:9...
我可以在 Kafka Streams 拓扑和普通 Kafka 消费者之间使用相同的组 ID 吗?
我的服务是一个 Kafka Streams 应用程序,假设应用程序 ID 设置为“service1”,从主题 A 消费。我相信这个 ID 成为 Kafka Stre 中消费者的组 ID...
我的应用程序在 GroupedKTable 上进行聚合,然后将其具体化为 PersistentKeyValueStore 我将 state.dir 配置为永久路径(不是默认的 /tmp/kstreams) 我可以看到钥匙-
Kstream-Ktable 与 CloudEvent 值的连接不起作用
我想在kafka流和ktable之间进行连接。 poc 可以很好地处理流数据。但是,当我使用 CloudEvent 时,我不断遇到与序列化相关的一些或其他问题。 这里...
无效拓扑:拓扑没有流线程,也没有全局线程,必须订阅至少一个源主题或全局表
启动 Spring Boot 应用程序时出现错误。 org.springframework.context.ApplicationContextException:无法启动bean“defaultKafkaStreamsBuilder”;嵌套异常是 org。
如何操作KafkaStreams KeyValueStore
我正在使用 KafkaStreams 来处理消息的 kafka 并将累积状态保存在 KeyValueStore 中。 但我还需要通过 API 调用以编程方式影响 KeyValueStore 的状态。 这...
使用 Kafka Streams 开发时,Lib 上的 UnsatisfiedLinkError 会影响 DB dll
我正在我的开发 Windows 机器上编写 Kafka Streams 应用程序。 如果我尝试使用 Kafka Streams 的 leftJoin 和分支功能,我在执行 jar 应用程序时会收到以下错误:
假设我有 2 个 Kafka 主题,每个主题都有一个分区。 我想使用 Kafka Streams、Apache Flink 或 Spring Kafka 等技术通过连接这两个主题将数据写入第三个主题
我正在使用 Kafka Streams 开发基于 Kafka 的流数据应用程序。 在这里,我的应用程序将在高峰时段(大约 10 小时)处理大约 400M 的事件,在这里我需要过滤这些事件
我有一个简单的分布式系统架构,其中一个生产者系统将事件写入一个 kafka 主题。这些事件基本上只被一个系统消耗。这位消费者整晚都在加载...
我有一个禁用日志记录的有状态 Kafka Streams 应用程序。我必须禁用它的原因是因为某些记录的聚合状态可能会变得非常大,并向cha发送更新...
KafkaStream 在代理滚动升级时达到 ERROR 状态
我正在使用 Kafka 2.8.1 (AWS MSK)。我观察到,每当 AWS 完成一些滚动升级时,我的 Kafka 流应用程序都会达到错误状态,并且在尝试时不断收到以下异常...
在将记录发送到 kafka-stream 中的主题时设置标头
我需要处理kafka-stream中的请求记录,并且还需要告诉发送者处理的响应。我创建了一个 Rest 端点来接收请求。该端点使用 spring-kafka 的
每当我调用此函数时,我都想要特定 id 的旧 kafka 事件。我应该使用 Kafka Stream 还是 kafkaListner
当旧事件数量较少或较多时,这是首选。我正在使用弹簧靴
与此问题相关: 我正在使用 spring-boot-starter-parent 版本 3.1.2 和 spring-cloud-stream-binder-kafka-streams 版本 4.0.3 我使用以下代码创建了一个 GlobalKTable: @豆
KeyValueStore 如何允许写入操作(Kafka Streams)
我正在尝试了解kafka Stream的Statestore。我了解了它的一些基础知识。我查看了代码。 这是 StateStore 的声明: 公共接口 StateStore { } 其中之一...
我有一个使用两个状态存储的 Kafka Stream 应用程序。我在 Strimzi 集群 (kafka:0.29.0-kafka-3.1.0) 上的 Openshift 上运行此应用程序时遇到问题。 这意味着当我收到 bp-addr 记录时...
有什么办法可以禁用Kafka流处理摘要信息吗?因为它会占用大量磁盘空间 例如 INFO 21284 --- [-StreamThread-6] o.a.k.s.p.internals.StreamThread :流...
有没有办法限制或定义kafka流应用程序的最大内存使用量?我已经启用了状态存储的缓存,但是当我在 Openshift 中部署时,我的 pod 上出现了 OOM 问题。我有
Kafka Stateless KStream:异常时如何提交直到失败点?
假设 Kafka 流按顺序发出事件 1,2,3,4,5。当事件 5 出错时,kafka 流处理器按预期退出。但是,直到事件 4 为止它都无法提交。当我重新启动时...