apache-kafka-streams 相关问题

与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。

KafkaStreams 一开始不读取来自 kafka 的消息

@Bean 公共 KafkaStreams kafkaStreams(KafkaStreamsConfiguration StreamsConfig) { 属性 props = new Properties(); props.put("bootstrap.servers", "localhost:9...

回答 1 投票 0

我可以在 Kafka Streams 拓扑和普通 Kafka 消费者之间使用相同的组 ID 吗?

我的服务是一个 Kafka Streams 应用程序,假设应用程序 ID 设置为“service1”,从主题 A 消费。我相信这个 ID 成为 Kafka Stre 中消费者的组 ID...

回答 1 投票 0

物化状态存储目录中缺少数据

我的应用程序在 GroupedKTable 上进行聚合,然后将其具体化为 PersistentKeyValueStore 我将 state.dir 配置为永久路径(不是默认的 /tmp/kstreams) 我可以看到钥匙-

回答 1 投票 0

Kstream-Ktable 与 CloudEvent 值的连接不起作用

我想在kafka流和ktable之间进行连接。 poc 可以很好地处理流数据。但是,当我使用 CloudEvent 时,我不断遇到与序列化相关的一些或其他问题。 这里...

回答 1 投票 0

无效拓扑:拓扑没有流线程,也没有全局线程,必须订阅至少一个源主题或全局表

启动 Spring Boot 应用程序时出现错误。 org.springframework.context.ApplicationContextException:无法启动bean“defaultKafkaStreamsBuilder”;嵌套异常是 org。

回答 1 投票 0

如何操作KafkaStreams KeyValueStore

我正在使用 KafkaStreams 来处理消息的 kafka 并将累积状态保存在 KeyValueStore 中。 但我还需要通过 API 调用以编程方式影响 KeyValueStore 的状态。 这...

回答 1 投票 0

使用 Kafka Streams 开发时,Lib 上的 UnsatisfiedLinkError 会影响 DB dll

我正在我的开发 Windows 机器上编写 Kafka Streams 应用程序。 如果我尝试使用 Kafka Streams 的 leftJoin 和分支功能,我在执行 jar 应用程序时会收到以下错误:

回答 7 投票 0

将两个kafka主题中的消息排序到另一个主题中

假设我有 2 个 Kafka 主题,每个主题都有一个分区。 我想使用 Kafka Streams、Apache Flink 或 Spring Kafka 等技术通过连接这两个主题将数据写入第三个主题

回答 1 投票 0

流式数据过滤算法

我正在使用 Kafka Streams 开发基于 Kafka 的流数据应用程序。 在这里,我的应用程序将在高峰时段(大约 10 小时)处理大约 400M 的事件,在这里我需要过滤这些事件

回答 1 投票 0

Kafka 状态存储作为本地持久化选项?

我有一个简单的分布式系统架构,其中一个生产者系统将事件写入一个 kafka 主题。这些事件基本上只被一个系统消耗。这位消费者整晚都在加载...

回答 1 投票 0

RocksDB 数据丢失

我有一个禁用日志记录的有状态 Kafka Streams 应用程序。我必须禁用它的原因是因为某些记录的聚合状态可能会变得非常大,并向cha发送更新...

回答 1 投票 0

KafkaStream 在代理滚动升级时达到 ERROR 状态

我正在使用 Kafka 2.8.1 (AWS MSK)。我观察到,每当 AWS 完成一些滚动升级时,我的 Kafka 流应用程序都会达到错误状态,并且在尝试时不断收到以下异常...

回答 1 投票 0

在将记录发送到 kafka-stream 中的主题时设置标头

我需要处理kafka-stream中的请求记录,并且还需要告诉发送者处理的响应。我创建了一个 Rest 端点来接收请求。该端点使用 spring-kafka 的

回答 1 投票 0


Kafka Streams Binder:访问全局状态存储

与此问题相关: 我正在使用 spring-boot-starter-parent 版本 3.1.2 和 spring-cloud-stream-binder-kafka-streams 版本 4.0.3 我使用以下代码创建了一个 GlobalKTable: @豆

回答 1 投票 0

KeyValueStore 如何允许写入操作(Kafka Streams)

我正在尝试了解kafka Stream的Statestore。我了解了它的一些基础知识。我查看了代码。 这是 StateStore 的声明: 公共接口 StateStore { } 其中之一...

回答 1 投票 0

Kafka Streams - 无法搜索状态存储

我有一个使用两个状态存储的 Kafka Stream 应用程序。我在 Strimzi 集群 (kafka:0.29.0-kafka-3.1.0) 上的 Openshift 上运行此应用程序时遇到问题。 这意味着当我收到 bp-addr 记录时...

回答 1 投票 0

kafka 流日志禁用 INFO

有什么办法可以禁用Kafka流处理摘要信息吗?因为它会占用大量磁盘空间 例如 INFO 21284 --- [-StreamThread-6] o.a.k.s.p.internals.StreamThread :流...

回答 1 投票 0

限制Kafka Streams的内存使用

有没有办法限制或定义kafka流应用程序的最大内存使用量?我已经启用了状态存储的缓存,但是当我在 Openshift 中部署时,我的 pod 上出现了 OOM 问题。我有

回答 1 投票 0

Kafka Stateless KStream:异常时如何提交直到失败点?

假设 Kafka 流按顺序发出事件 1,2,3,4,5。当事件 5 出错时,kafka 流处理器按预期退出。但是,直到事件 4 为止它都无法提交。当我重新启动时...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.