apache-kafka-streams 相关问题

与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。

已弃用 KStreams TransformerSupplier 至 ProcessorSupplier

鉴于 flatTransform 已被弃用,我正在尝试按照建议通过流程替换它。 我以前的 TransformerSupplier 看起来像这样: 公共类 MyTransformerSupplier 实现

回答 1 投票 0

六边形架构中的Kafka流

我正在六边形架构中创建服务,该服务消耗来自主题的数据。在同一个项目中,我想使用 kafka 流将几个主题合并为一个主题,然后使用其中的数据......

回答 1 投票 0

当监听器被移除时如何处理kafka主题

我有一个特定主题的卡夫卡消费者。由于一些原因我改变了消费者。现在我不再使用消息,而是使用一个 kafka-stream 来处理主题并生成新消息到

回答 1 投票 0

kafka集群非刷盘模式是否可以对某些主题生效?

如果我想让Kafka集群支持准实时消息转换(< 500 ms), do I need to set the Kafka cluster to the non-flushing disk mode? If I set non-flushing disk mode, Whether non-flu...

回答 1 投票 0

kafka Streams groupBy 内部做了什么?

假设有一个主题,其中不同文件的块全部混合在一起,由一个元组(FileId,Chunk)表示。 同一文件的块也可能有点乱序。 任务是聚合...

回答 1 投票 0

如何以避免从其变更日志主题重新创建状态存储的方式重新启动 KafkaStreams 消费者组

在多个节点托管具有持久状态存储的 KafkaStreams (0.10.2.1) 实例的部署中,重新启动所有节点同时避免重播整个过程的推荐方法是什么...

回答 2 投票 0

具有大型滑动窗口的Kafka Streams

我需要显示过去 3 个月、6 个月和 1 年中任何时间点的使用统计数据。我计划在上述持续时间内使用 KStream 滑动窗口。大多数例子我...

回答 2 投票 0

如何通过检查应用程序的所有实例来从状态存储中删除记录

我的应用程序有多个实例,每个实例都有其状态存储,我们在其中存储(键,值)信息,应用程序将使用这些信息进行功能处理。 我们的要求是删除特定条目

回答 1 投票 0

Do <stream>.toTable().toStream() 返回原始流并删除相同键的旧记录?

我有一个可以包含重复记录的KStream。我可以使用 .groupByKey().reduce() 删除具有相同键的旧记录,但想知道是否可以通过 .toTable().toStream 实现这一点...

回答 1 投票 0

Kafka Streams 状态存储获取操作什么也取不到,即使更改日志主题显示了一个值

我在我的拓扑中添加了一个持久键值存储。在不涉及太多细节的情况下,我将解释我面临问题的部分。我想查询国营商店。 当我收到

回答 1 投票 0

Ksqldb、kafka 流。将主题消息拆分并按条件发布到不同的主题

我有一个主题,比如说“topic_soure”。消息采用 json 格式。 所有消息的顶级字段都是相同的,但“数据”字段可能有不同的模型。 我不知道前...

回答 1 投票 0

我们可以将 KStream 转换为 Apache Kafka 中的全局 KTable 吗?

我正在尝试在 scala 中使用 kafka 流。我想知道是否可以将 KStream 转换为 GlobalKTable?

回答 1 投票 0

架构更改时 Kafka Streams 连接不起作用

我知道 Kafka Streams 使用 Murmur3 来散列消息的值以解决竞争条件。但这也意味着如果我们改变消息的结构,连接就会失败,因为......

回答 1 投票 0

与 num.standby.replicas 相关的线程数

要求确认我的推理是否正确。 我的应用程序有 12 个输入分区,分布在 3 个应用程序实例中。 通常情况下,我希望线程数设置为 4,因为 4

回答 1 投票 0

从 Kafka Streams 读取值并检查错误

我对 kafka 流完全陌生,我正在尝试在 quarkus 应用程序中使用它。 我尝试像数据库表一样使用它。 我想知道这是否是正确的方法以及是否有......

回答 1 投票 0

org.apache.kafka.streams.KafkaStreams#store 方法线程安全吗?

在我的 Kafka Streams 应用程序中,我有以下 2 个线程: 线程 A:这会创建一个 Topology 对象,包括状态存储和所有内容,然后最终调用 KafkaStreams 的构造函数

回答 1 投票 0

KStreams 在 Spring Cloud Stream 中运行速度非常慢

我已经在Spring boot中使用SpringCloudStreams实现了KStreams。 我已准备好从具有 20 个分区的主题写入具有相同分区数的主题。我有 2 个 Pod 正在运行。 平均...

回答 1 投票 0

Spring-boot kafka 应用程序到 GraalVM 本机映像 - org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier 无法找到

我有使用 spring-kafka 和 kafka-streams 的 Java Spring-boot 应用程序。 我正在尝试使用以下命令构建 GraalVM 本机映像 mvn -Pnative spring-boot:build-image 构建成功,但是当我尝试时...

回答 1 投票 0

StreamsBuilderFactoryBeanCustomizer 未自定义 StreamsBuilderFactoryBean

我有带有 Kafka 流的 springboot 应用程序,其中我肯定主要的 bean 如下。 @豆 公共 StreamsBuilderFactoryBeanCustomizer StreamsBuilderFactoryBeanCustomizer(CustomStateListener

回答 1 投票 0

多个分区的Kafka本地状态存储

我正在使用kafka处理器api,我从3个分区的主题创建一个状态存储(我有3个经纪人),我有1个流实例。我想知道当我到达当地的国营商店时,我可以买到所有的东西吗...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.