与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。
已弃用 KStreams TransformerSupplier 至 ProcessorSupplier
鉴于 flatTransform 已被弃用,我正在尝试按照建议通过流程替换它。 我以前的 TransformerSupplier 看起来像这样: 公共类 MyTransformerSupplier 实现
我正在六边形架构中创建服务,该服务消耗来自主题的数据。在同一个项目中,我想使用 kafka 流将几个主题合并为一个主题,然后使用其中的数据......
我有一个特定主题的卡夫卡消费者。由于一些原因我改变了消费者。现在我不再使用消息,而是使用一个 kafka-stream 来处理主题并生成新消息到
如果我想让Kafka集群支持准实时消息转换(< 500 ms), do I need to set the Kafka cluster to the non-flushing disk mode? If I set non-flushing disk mode, Whether non-flu...
假设有一个主题,其中不同文件的块全部混合在一起,由一个元组(FileId,Chunk)表示。 同一文件的块也可能有点乱序。 任务是聚合...
如何以避免从其变更日志主题重新创建状态存储的方式重新启动 KafkaStreams 消费者组
在多个节点托管具有持久状态存储的 KafkaStreams (0.10.2.1) 实例的部署中,重新启动所有节点同时避免重播整个过程的推荐方法是什么...
我需要显示过去 3 个月、6 个月和 1 年中任何时间点的使用统计数据。我计划在上述持续时间内使用 KStream 滑动窗口。大多数例子我...
我的应用程序有多个实例,每个实例都有其状态存储,我们在其中存储(键,值)信息,应用程序将使用这些信息进行功能处理。 我们的要求是删除特定条目
Do <stream>.toTable().toStream() 返回原始流并删除相同键的旧记录?
我有一个可以包含重复记录的KStream。我可以使用 .groupByKey().reduce() 删除具有相同键的旧记录,但想知道是否可以通过 .toTable().toStream 实现这一点...
Kafka Streams 状态存储获取操作什么也取不到,即使更改日志主题显示了一个值
我在我的拓扑中添加了一个持久键值存储。在不涉及太多细节的情况下,我将解释我面临问题的部分。我想查询国营商店。 当我收到
Ksqldb、kafka 流。将主题消息拆分并按条件发布到不同的主题
我有一个主题,比如说“topic_soure”。消息采用 json 格式。 所有消息的顶级字段都是相同的,但“数据”字段可能有不同的模型。 我不知道前...
我们可以将 KStream 转换为 Apache Kafka 中的全局 KTable 吗?
我正在尝试在 scala 中使用 kafka 流。我想知道是否可以将 KStream 转换为 GlobalKTable?
我知道 Kafka Streams 使用 Murmur3 来散列消息的值以解决竞争条件。但这也意味着如果我们改变消息的结构,连接就会失败,因为......
要求确认我的推理是否正确。 我的应用程序有 12 个输入分区,分布在 3 个应用程序实例中。 通常情况下,我希望线程数设置为 4,因为 4
我对 kafka 流完全陌生,我正在尝试在 quarkus 应用程序中使用它。 我尝试像数据库表一样使用它。 我想知道这是否是正确的方法以及是否有......
org.apache.kafka.streams.KafkaStreams#store 方法线程安全吗?
在我的 Kafka Streams 应用程序中,我有以下 2 个线程: 线程 A:这会创建一个 Topology 对象,包括状态存储和所有内容,然后最终调用 KafkaStreams 的构造函数
KStreams 在 Spring Cloud Stream 中运行速度非常慢
我已经在Spring boot中使用SpringCloudStreams实现了KStreams。 我已准备好从具有 20 个分区的主题写入具有相同分区数的主题。我有 2 个 Pod 正在运行。 平均...
我有使用 spring-kafka 和 kafka-streams 的 Java Spring-boot 应用程序。 我正在尝试使用以下命令构建 GraalVM 本机映像 mvn -Pnative spring-boot:build-image 构建成功,但是当我尝试时...
StreamsBuilderFactoryBeanCustomizer 未自定义 StreamsBuilderFactoryBean
我有带有 Kafka 流的 springboot 应用程序,其中我肯定主要的 bean 如下。 @豆 公共 StreamsBuilderFactoryBeanCustomizer StreamsBuilderFactoryBeanCustomizer(CustomStateListener
我正在使用kafka处理器api,我从3个分区的主题创建一个状态存储(我有3个经纪人),我有1个流实例。我想知道当我到达当地的国营商店时,我可以买到所有的东西吗...