与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。
我想给我的消费者一些时间来重新启动,这样就不会发生不必要的重新平衡。我怎样才能做到这一点? 如果关闭,我希望复制能够出现,并且在一段时间后如果
如何使用挂钟时间而不是 Kafka 流中的事件时间来抑制窗口?
要求是,如果在输入到...的 2 分钟时间窗口内没有收到预期的“最终”事件(从事件负载中的 EvenType 字段标识),则发送警报
出于测试目的,我删除了目标主题,并预计应用程序会在一段时间后超时。然而,经过一些研究,我了解到 Kafka Streams 默认会重试消息
我正在使用 .outerJoin 将两个流连接在一起。预期的行为是我会在输出中获得两个输入中所有记录的记录;但实际上我只得到一个输出,其中 t...
如何控制我的 kafka 流应用程序中用于 leftJoin 操作的内部主题的名称?
我正在一个环境中构建一个kafka流应用程序,该应用程序不允许具有创建内部主题的管理员权限。我可以通过让我的操作具有
我正在尝试创建一个自定义加入消费者来加入多个事件。 我创建了一个拓扑,它有四个子拓扑(subtopology-0、subtopology-1、subtopology-2、subtopology-3),不在
KafkaStreams 和 MockProcessorContext - 获取传感器为空
我正在使用 MockProcessorContext 和此代码进行单元测试 val 属性 = 属性() props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String().javaClass 道具[StreamsConfig。
处理 Kafka 流中的异常是类似的问题,但接受的答案仅讨论 productionException。如何处理处理过程中出现的异常从而...
Kafka Streams List Serdes:始终为空
我做错了什么? 我正在编写一个 ProcessorSupplier,用于将 n 条记录聚合为一条。为此,我正在使用 List Serdes ... 我的问题是 ArrayList 总是空的。 使用 Java 21 和 Ka...
如果在处理过程中抛出异常,Kafka-streams 是否会提交偏移量?
我有一个 Kafka 流应用程序,我创建了自定义生产/消费/未捕获的异常处理程序,所有处理程序都返回 CONTINUE 以继续处理,尽管出现异常。 Kafka 流通讯吗...
引起:com.fasterxml.jackson.core.JsonParseException:无法识别的标记“ÿ”:正在等待(JSON字符串,数字,数组,对象或标记“null”,
我正在使用 Spring Boot Apache Kafka 示例并出现以下错误。 原因:com.fasterxml.jackson.core.JsonParseException:无法识别的标记“ÿ”:正在等待(JSON字符串,数字,数组,Obj...
带有 Kafka 流 api 的 Java 代码帮助 WindowBy
我有一个场景,其中有一个 Kafka 生产者流应用程序为某个主题生成 json 有效负载。 我想创建一个流应用程序,它应该对到达的每条消息进行计数...
我使用Kafka Stream来处理我的主题A,并使用inMemoryKeyValueStore。 builder.addStateStore(Stores.keyValueStoreBuilder( //Stores.persistentKeyValueStore("AccurateADCounts"), ...
kafka 流处理器内的异步 http 调用并将响应转发到下一个处理器
我有一个kafka流应用程序(用java springboot编写),它有3个处理器。在第二个处理器中,我想以异步方式调用rest api,当响应到来时,我想...
令我惊讶的是,我意识到“抑制”运算符不会在窗口关闭时发出最后一个事件,而是仅当在分区上发布另一个事件时,流的任务才会发出...
处理并忽略 Kafka Streams 中的 UNKNOWN_TOPIC_OR_PARTITION 错误
我正在使用 Kafka Streams 应用程序,我们使用基于消息头的动态主题确定。在我们的设置中,在应用程序运行时删除主题是正常的。
spring-cloud-stream-binder-kafka-streams 消费者在发生 RuntimeException 时关闭
spring-cloud-stream-binder-kafka-streams 当消费者发生异常时,消费者停止并进入 EMPTY 状态。我想测试重试机制,但它没有按预期工作。 (https://docs.s...
有没有办法同步具有kafka流的应用程序以避免重复的消息处理?
在我的 Spring Boot 应用程序中,我使用了 kafka 流。它首先按键对来自某个主题的消息进行分组,根据一定的时间间隔对它们进行窗口化,使用reduce仅保留最新的m...
我正在尝试使用golang在Go中创建kafka流客户端。据我所知,只有使用 Java 客户端才有可能实现这一点。我做了一些搜索,发现了一些其他第三方库......
使用 Kafka Streams 仅基于密钥过滤和转发 Kafka 消息
我想了解并可能改进我目前正在处理的一些 Kafka Streams 代码。现在我不完全确定反序列化和序列化的完整生命周期