与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。
注册 Avro schema 时出错:写入 Kafka 存储时注册 schema 操作失败;错误代码:50001
我在发送从 .avsc 文件生成的对象时遇到了一个奇怪的问题。 在我的本地上,没有问题,但在开发服务器上,它会抛出异常 我使用生产者发送这种类型的回复...
带有更改日志主题的 Kafka Streams KTable 存储与日志压缩源主题
我正在一个输入主题上构建 KTable,并在两个 Kafka Stream 应用程序实例上加入 KStream。 KTable 的输入主题已经是日志压缩主题。所以当我的一个应用程序...
从 PySpark 结构化流将聚合数据写入 MongoDB 的问题
我在尝试从 PySpark 结构化流作业将聚合数据写入 MongoDB 时遇到问题。这是我的设置: 我有一个 Kafka 主题,我正在其中使用 JSON 消息。 我是...
如何在运行时重新配置 Kafka Streams 应用程序中的过滤器(无需停止并重新启动应用程序)? 过滤内容的配置最好在 Kafka 配置主题中找到。 ...
我想给我的消费者一些时间来重新启动,这样就不会发生不必要的重新平衡。我怎样才能做到这一点? 如果关闭,我希望复制能够出现,并且在一段时间后如果
如何使用挂钟时间而不是 Kafka 流中的事件时间来抑制窗口?
要求是,如果在输入到...的 2 分钟时间窗口内没有收到预期的“最终”事件(从事件负载中的 EvenType 字段标识),则发送警报
出于测试目的,我删除了目标主题,并预计应用程序会在一段时间后超时。然而,经过一些研究,我了解到 Kafka Streams 默认会重试消息
我正在使用 .outerJoin 将两个流连接在一起。预期的行为是我会在输出中获得两个输入中所有记录的记录;但实际上我只得到一个输出,其中 t...
如何控制我的 kafka 流应用程序中用于 leftJoin 操作的内部主题的名称?
我正在一个环境中构建一个kafka流应用程序,该应用程序不允许具有创建内部主题的管理员权限。我可以通过让我的操作具有
我正在尝试创建一个自定义加入消费者来加入多个事件。 我创建了一个拓扑,它有四个子拓扑(subtopology-0、subtopology-1、subtopology-2、subtopology-3),不在
KafkaStreams 和 MockProcessorContext - 获取传感器为空
我正在使用 MockProcessorContext 和此代码进行单元测试 val 属性 = 属性() props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String().javaClass 道具[StreamsConfig。
处理 Kafka 流中的异常是类似的问题,但接受的答案仅讨论 productionException。如何处理处理过程中出现的异常从而...
Kafka Streams List Serdes:始终为空
我做错了什么? 我正在编写一个 ProcessorSupplier,用于将 n 条记录聚合为一条。为此,我正在使用 List Serdes ... 我的问题是 ArrayList 总是空的。 使用 Java 21 和 Ka...
如果在处理过程中抛出异常,Kafka-streams 是否会提交偏移量?
我有一个 Kafka 流应用程序,我创建了自定义生产/消费/未捕获的异常处理程序,所有处理程序都返回 CONTINUE 以继续处理,尽管出现异常。 Kafka 流通讯吗...
引起:com.fasterxml.jackson.core.JsonParseException:无法识别的标记“ÿ”:正在等待(JSON字符串,数字,数组,对象或标记“null”,
我正在使用 Spring Boot Apache Kafka 示例并出现以下错误。 原因:com.fasterxml.jackson.core.JsonParseException:无法识别的标记“ÿ”:正在等待(JSON字符串,数字,数组,Obj...
带有 Kafka 流 api 的 Java 代码帮助 WindowBy
我有一个场景,其中有一个 Kafka 生产者流应用程序为某个主题生成 json 有效负载。 我想创建一个流应用程序,它应该对到达的每条消息进行计数...
我使用Kafka Stream来处理我的主题A,并使用inMemoryKeyValueStore。 builder.addStateStore(Stores.keyValueStoreBuilder( //Stores.persistentKeyValueStore("AccurateADCounts"), ...
kafka 流处理器内的异步 http 调用并将响应转发到下一个处理器
我有一个kafka流应用程序(用java springboot编写),它有3个处理器。在第二个处理器中,我想以异步方式调用rest api,当响应到来时,我想...
令我惊讶的是,我意识到“抑制”运算符不会在窗口关闭时发出最后一个事件,而是仅当在分区上发布另一个事件时,流的任务才会发出...
处理并忽略 Kafka Streams 中的 UNKNOWN_TOPIC_OR_PARTITION 错误
我正在使用 Kafka Streams 应用程序,我们使用基于消息头的动态主题确定。在我们的设置中,在应用程序运行时删除主题是正常的。