apache-kafka-streams 相关问题

与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。

KStreams 外连接的作用类似于内连接

我正在使用 .outerJoin 将两个流连接在一起。预期的行为是我会在输出中获得两个输入中所有记录的记录;但实际上我只得到一个输出,其中 t...

回答 1 投票 0

如何控制我的 kafka 流应用程序中用于 leftJoin 操作的内部主题的名称?

我正在一个环境中构建一个kafka流应用程序,该应用程序不允许具有创建内部主题的管理员权限。我可以通过让我的操作具有

回答 1 投票 0

Kafka 状态存储跨子拓扑共享

我正在尝试创建一个自定义加入消费者来加入多个事件。 我创建了一个拓扑,它有四个子拓扑(subtopology-0、subtopology-1、subtopology-2、subtopology-3),不在

回答 1 投票 0

KafkaStreams 和 MockProcessorContext - 获取传感器为空

我正在使用 MockProcessorContext 和此代码进行单元测试 val 属性 = 属性() props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String().javaClass 道具[StreamsConfig。

回答 1 投票 0

Kafka流中处理异常如何处理?

处理 Kafka 流中的异常是类似的问题,但接受的答案仅讨论 productionException。如何处理处理过程中出现的异常从而...

回答 3 投票 0

Kafka Streams List Serdes:始终为空

我做错了什么? 我正在编写一个 ProcessorSupplier,用于将 n 条记录聚合为一条。为此,我正在使用 List Serdes ... 我的问题是 ArrayList 总是空的。 使用 Java 21 和 Ka...

回答 2 投票 0

如果在处理过程中抛出异常,Kafka-streams 是否会提交偏移量?

我有一个 Kafka 流应用程序,我创建了自定义生产/消费/未捕获的异常处理程序,所有处理程序都返回 CONTINUE 以继续处理,尽管出现异常。 Kafka 流通讯吗...

回答 2 投票 0

引起:com.fasterxml.jackson.core.JsonParseException:无法识别的标记“ÿ”:正在等待(JSON字符串,数字,数组,对象或标记“null”,

我正在使用 Spring Boot Apache Kafka 示例并出现以下错误。 原因:com.fasterxml.jackson.core.JsonParseException:无法识别的标记“ÿ”:正在等待(JSON字符串,数字,数组,Obj...

回答 1 投票 0

带有 Kafka 流 api 的 Java 代码帮助 WindowBy

我有一个场景,其中有一个 Kafka 生产者流应用程序为某个主题生成 json 有效负载。 我想创建一个流应用程序,它应该对到达的每条消息进行计数...

回答 1 投票 0

如何从Kafka Stream变更日志主题读取数据?

我使用Kafka Stream来处理我的主题A,并使用inMemoryKeyValueStore。 builder.addStateStore(Stores.keyValueStoreBuilder( //Stores.persistentKeyValueStore("AccurateADCounts"), ...

回答 1 投票 0

kafka 流处理器内的异步 http 调用并将响应转发到下一个处理器

我有一个kafka流应用程序(用java springboot编写),它有3个处理器。在第二个处理器中,我想以异步方式调用rest api,当响应到来时,我想...

回答 1 投票 0

卡夫卡流。如何使用抑制器在聚合窗口中发出最终结果

令我惊讶的是,我意识到“抑制”运算符不会在窗口关闭时发出最后一个事件,而是仅当在分区上发布另一个事件时,流的任务才会发出...

回答 1 投票 0

处理并忽略 Kafka Streams 中的 UNKNOWN_TOPIC_OR_PARTITION 错误

我正在使用 Kafka Streams 应用程序,我们使用基于消息头的动态主题确定。在我们的设置中,在应用程序运行时删除主题是正常的。

回答 1 投票 0

spring-cloud-stream-binder-kafka-streams 消费者在发生 RuntimeException 时关闭

spring-cloud-stream-binder-kafka-streams 当消费者发生异常时,消费者停止并进入 EMPTY 状态。我想测试重试机制,但它没有按预期工作。 (https://docs.s...

回答 1 投票 0

有没有办法同步具有kafka流的应用程序以避免重复的消息处理?

在我的 Spring Boot 应用程序中,我使用了 kafka 流。它首先按键对来自某个主题的消息进行分组,根据一定的时间间隔对它们进行窗口化,使用reduce仅保留最新的m...

回答 1 投票 0

golang 中的 kafka 流

我正在尝试使用golang在Go中创建kafka流客户端。据我所知,只有使用 Java 客户端才有可能实现这一点。我做了一些搜索,发现了一些其他第三方库......

回答 2 投票 0

使用 Kafka Streams 仅基于密钥过滤和转发 Kafka 消息

我想了解并可能改进我目前正在处理的一些 Kafka Streams 代码。现在我不完全确定反序列化和序列化的完整生命周期

回答 1 投票 0

在没有 Kafka-Streams 的情况下在 Kafka 上进行共同分区

我很好奇 kafka 是否默认在消费者组内进行共同分区,或者这是否是 kafka-streams 添加的功能。 例如:假设我有一个消费者组 group-1 并且该组消费

回答 1 投票 0

当事件时间在 ts 到 ts -1 分钟之间的所有消息中满足条件时,kafka 会生成消息

我想要处理kafka消息,考虑到eventTime,我在其中接收非线性格式的数据 我想在收到以下 JSON 格式的消息后,如果我得到一个

回答 1 投票 0

使用 Kafka Stream 按时间戳聚合事件

我有两个包含一些指标事件的主题。每个指标都是在特定时间戳生成的。我需要合并一些指标;唯一的共同键是时间戳。 我创建了两个 KStream wi...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.