apache-kafka-streams 相关问题

与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。

在没有 Kafka-Streams 的情况下在 Kafka 上进行共同分区

我很好奇 kafka 是否默认在消费者组内进行共同分区,或者这是否是 kafka-streams 添加的功能。 例如:假设我有一个消费者组 group-1 并且该组消费

回答 1 投票 0

当事件时间在 ts 到 ts -1 分钟之间的所有消息中满足条件时,kafka 会生成消息

我想要处理kafka消息,考虑到eventTime,我在其中接收非线性格式的数据 我想在收到以下 JSON 格式的消息后,如果我得到一个

回答 1 投票 0

使用 Kafka Stream 按时间戳聚合事件

我有两个包含一些指标事件的主题。每个指标都是在特定时间戳生成的。我需要合并一些指标;唯一的共同键是时间戳。 我创建了两个 KStream wi...

回答 1 投票 0

Kafka 流应用程序想要检索错误的模式

我们在其中一个 Kafka 流应用程序中看到这样的错误: org.apache.kafka.streams.errors.StreamsException:进程中捕获异常。 taskId=0_6,处理器=KSTREAM-SOURCE-0000000000,主题=

回答 1 投票 0

我使用 JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(24)) 进行左连接时遇到问题

我在使用 JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(24)) 进行左连接时遇到问题 该方法是否应该将左流中的所有记录保留 24 小时以查找匹配

回答 1 投票 0

Kafka Streams:通过两个或多个外键重新设置 KTable

假设有一个(简化的)A类,如下所示: A类{ 私人长ID; 私有字符串一些内容; 私人长fk1; 私人长fk2; // 相应的 getter 和 setter } fk1 和 fk2 ...

回答 1 投票 0

为物化状态存储设置 lz4 主题压缩

假设 Quarkus Kafka Streams 应用程序通过物化进行某种重新键入和聚合,如下所示: 最终 KTable groupedCommunications =

回答 1 投票 0

Kafka Streams 主题保留问题:动态主题配置和未发布的数据

我在与 Spring Cloud Stream 集成的 Kafka Streams 应用程序方面遇到问题。问题围绕着一个特定的主题,它的大小不断增加并且从不释放数据:

回答 1 投票 0

Kafka Streams KTable-KTable FK join 分区主题 - 分区路由问题?

给定两个实体代理和配置: 类代理机构{ 长 ID; // PK UUID配置Id; // FK -> 配置.id // ... } 类配置{ UUID ID; // PK // ... }

回答 1 投票 0

Kafka Streams 速率限制消耗

我试图根据条件限制我对某个主题的消耗率。 例如: 如果消息包含:“foo” 我想每秒消耗不超过 5 条消息(全球 - 意思是...

回答 1 投票 0

声明检查模式可以与 Kafka Streams 一起使用吗?

在大多数围绕 kafka 流的准备工作中,不建议您在 kafka 流应用程序中请求/响应,或者更一般地在流中与外部系统同步交互

回答 1 投票 0

Kafka KStreams 左连接没有给我预期的输出

直播1: 键:A,值:1 键:B,值:2 键:A,值:3 键:B,值:4 直播2: 键:A,值:X 键:B,值:Y 我得到的输出: 值 1:1,值 2:X 值 1:2,值 2:Y 值 1:3,值...

回答 1 投票 0

BindingsEndpoint kafka Stream spring boot

我猜。它在本地电脑上运行良好,但在云 k8s 上运行不正常,请帮忙,谢谢 在此输入图像描述 'org.springframework.cloud.stream.endpoint.BindingsEndpoint' 不能...

回答 1 投票 0

如何使用Spring Cloud Stream中的Kafka Streams绑定器将同一主题绑定到多个函数?

使用 Spring Cloud Stream 和 Kafka Streams 绑定器,我想在另一个函数中处理函数的输出,如下所示: @豆 公共函数,KStream 使用 Spring Cloud Stream 和 Kafka Streams 绑定器,我想在另一个函数中处理函数的输出,如下所示: @Bean public Function<KStream<String, Double>, KStream<String, Double>> sqrt() { return numbers -> numbers.mapValues(Math::sqrt); } @Bean public Consumer<KStream<String, Double>> log() { return sqrt -> sqrt.foreach((key, value) -> log.info("{}: {}", key, value)); } 其中 sqrt() 输出数字的平方根,然后用 log() 记录。 application.yaml因此看起来像这样: spring: cloud: stream: function: bindings: sqrt-in-0: numbers sqrt-out-0: sqrt-numbers log-in-0: sqrt-numbers kafka: streams: bindings: sqrt: consumer: application-id: sqrtApplicationId log: consumer: application-id: logApplicationId 启动应用程序时,出现以下错误: The bean 'sqrt-numbers' could not be registered. A bean with that name has already been defined and overriding is disabled. Action: Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true 当然,现在将 definition-overriding 设置为 true 并不是一个正确的解决方案,并且它会失败并显示 IllegalStateException。 我该如何解决这个问题? 问题的重现可以在这里找到:https://github.com/cedric-schaller/dltawareprocessor-type-error 假设您有两个名为 numbers 和 sqrt-numbers 的 Kafka 主题,则以下配置应该有效。 spring: cloud: stream: bindings: sqrt-in-0: destination: numbers sqrt-out-0: destination: sqrt-numbers log-in-0: destination: sqrt-numbers kafka: streams: bindings: sqrt-in-0: consumer: application-id: sqrtApplicationId log-in-0: consumer: application-id: logApplicationId 您可以使用 spring.cloud.stream.function.bindings.. 覆盖默认绑定名称。例如,如果您想将绑定名称从 sqrt-in-0 更改为 input,您可以像 spring.cloud.stream.function.bindings.sqrt-in0-0: input 那样进行操作。不过,您仍然需要在覆盖的绑定上设置 destination(通过 spring.cloud.stream.bindings.input.destination)。 您遇到的特定异常是因为您试图重用已创建的绑定名称 - sqrt-numbers。

回答 1 投票 0

Kafka GroupTable 测试使用 ProcessorTopologyTestDriver 时生成额外消息

我编写了一个流,它接收消息并发送出已出现的键表。如果出现某些东西,它将显示计数为 1。这是我的生产 c 的简化版本...

回答 1 投票 0

为什么我的 Kafka Streams 拓扑无法正确重放/重新处理?

我有一个如下所示的拓扑: KTable users = topology.table(USERS); KStream joinRequests = topology.stream(JOIN_REQUESTS) .地图...

回答 2 投票 0

将 Kafka 输入流动态连接到多个输出流

Kafka Streams 中是否内置了允许将单个输入流动态连接到多个输出流的功能? KStream.branch 允许基于真/假预测进行分支...

回答 1 投票 0

合并多个相同的 Kafka Streams 主题

我有 2 个 Kafka 主题,从不同的来源流式传输完全相同的内容,这样我就可以在其中一个来源发生故障时获得高可用性。 我正在尝试将 2 个主题合并为 1 个输出...

回答 1 投票 0

使用 avro 作为带有 kafka 模式注册表的关键主题

我和我的团队最近遇到了用于主题键的 Avro 架构问题。我们更改了对键的评论,这完全破坏了我们的 Kafka Streams 连接,也破坏了我们主题的压缩......

回答 1 投票 0

Kafka 流卡住分区

我们有一组 kafka 流/spring-boot/spring-kafka 应用程序,在过去几天发生了一个事件,我们注意到一个主题的单个分区有数千个我......

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.