与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。
spring-cloud-stream-binder-kafka-streams 消费者在发生 RuntimeException 时关闭
spring-cloud-stream-binder-kafka-streams 当消费者发生异常时,消费者停止并进入 EMPTY 状态。我想测试重试机制,但它没有按预期工作。 (https://docs.s...
有没有办法同步具有kafka流的应用程序以避免重复的消息处理?
在我的 Spring Boot 应用程序中,我使用了 kafka 流。它首先按键对来自某个主题的消息进行分组,根据一定的时间间隔对它们进行窗口化,使用reduce仅保留最新的m...
我正在尝试使用golang在Go中创建kafka流客户端。据我所知,只有使用 Java 客户端才有可能实现这一点。我做了一些搜索,发现了一些其他第三方库......
使用 Kafka Streams 仅基于密钥过滤和转发 Kafka 消息
我想了解并可能改进我目前正在处理的一些 Kafka Streams 代码。现在我不完全确定反序列化和序列化的完整生命周期
在没有 Kafka-Streams 的情况下在 Kafka 上进行共同分区
我很好奇 kafka 是否默认在消费者组内进行共同分区,或者这是否是 kafka-streams 添加的功能。 例如:假设我有一个消费者组 group-1 并且该组消费
当事件时间在 ts 到 ts -1 分钟之间的所有消息中满足条件时,kafka 会生成消息
我想要处理kafka消息,考虑到eventTime,我在其中接收非线性格式的数据 我想在收到以下 JSON 格式的消息后,如果我得到一个
我有两个包含一些指标事件的主题。每个指标都是在特定时间戳生成的。我需要合并一些指标;唯一的共同键是时间戳。 我创建了两个 KStream wi...
我们在其中一个 Kafka 流应用程序中看到这样的错误: org.apache.kafka.streams.errors.StreamsException:进程中捕获异常。 taskId=0_6,处理器=KSTREAM-SOURCE-0000000000,主题=
我使用 JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(24)) 进行左连接时遇到问题
我在使用 JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(24)) 进行左连接时遇到问题 该方法是否应该将左流中的所有记录保留 24 小时以查找匹配
Kafka Streams:通过两个或多个外键重新设置 KTable
假设有一个(简化的)A类,如下所示: A类{ 私人长ID; 私有字符串一些内容; 私人长fk1; 私人长fk2; // 相应的 getter 和 setter } fk1 和 fk2 ...
假设 Quarkus Kafka Streams 应用程序通过物化进行某种重新键入和聚合,如下所示: 最终 KTable groupedCommunications =
Kafka Streams 主题保留问题:动态主题配置和未发布的数据
我在与 Spring Cloud Stream 集成的 Kafka Streams 应用程序方面遇到问题。问题围绕着一个特定的主题,它的大小不断增加并且从不释放数据:
Kafka Streams KTable-KTable FK join 分区主题 - 分区路由问题?
给定两个实体代理和配置: 类代理机构{ 长 ID; // PK UUID配置Id; // FK -> 配置.id // ... } 类配置{ UUID ID; // PK // ... }
我试图根据条件限制我对某个主题的消耗率。 例如: 如果消息包含:“foo” 我想每秒消耗不超过 5 条消息(全球 - 意思是...
声明检查模式可以与 Kafka Streams 一起使用吗?
在大多数围绕 kafka 流的准备工作中,不建议您在 kafka 流应用程序中请求/响应,或者更一般地在流中与外部系统同步交互
直播1: 键:A,值:1 键:B,值:2 键:A,值:3 键:B,值:4 直播2: 键:A,值:X 键:B,值:Y 我得到的输出: 值 1:1,值 2:X 值 1:2,值 2:Y 值 1:3,值...
BindingsEndpoint kafka Stream spring boot
我猜。它在本地电脑上运行良好,但在云 k8s 上运行不正常,请帮忙,谢谢 在此输入图像描述 'org.springframework.cloud.stream.endpoint.BindingsEndpoint' 不能...
如何使用Spring Cloud Stream中的Kafka Streams绑定器将同一主题绑定到多个函数?
使用 Spring Cloud Stream 和 Kafka Streams 绑定器,我想在另一个函数中处理函数的输出,如下所示: @豆 公共函数,KStream 使用 Spring Cloud Stream 和 Kafka Streams 绑定器,我想在另一个函数中处理函数的输出,如下所示: @Bean public Function<KStream<String, Double>, KStream<String, Double>> sqrt() { return numbers -> numbers.mapValues(Math::sqrt); } @Bean public Consumer<KStream<String, Double>> log() { return sqrt -> sqrt.foreach((key, value) -> log.info("{}: {}", key, value)); } 其中 sqrt() 输出数字的平方根,然后用 log() 记录。 application.yaml因此看起来像这样: spring: cloud: stream: function: bindings: sqrt-in-0: numbers sqrt-out-0: sqrt-numbers log-in-0: sqrt-numbers kafka: streams: bindings: sqrt: consumer: application-id: sqrtApplicationId log: consumer: application-id: logApplicationId 启动应用程序时,出现以下错误: The bean 'sqrt-numbers' could not be registered. A bean with that name has already been defined and overriding is disabled. Action: Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true 当然,现在将 definition-overriding 设置为 true 并不是一个正确的解决方案,并且它会失败并显示 IllegalStateException。 我该如何解决这个问题? 问题的重现可以在这里找到:https://github.com/cedric-schaller/dltawareprocessor-type-error 假设您有两个名为 numbers 和 sqrt-numbers 的 Kafka 主题,则以下配置应该有效。 spring: cloud: stream: bindings: sqrt-in-0: destination: numbers sqrt-out-0: destination: sqrt-numbers log-in-0: destination: sqrt-numbers kafka: streams: bindings: sqrt-in-0: consumer: application-id: sqrtApplicationId log-in-0: consumer: application-id: logApplicationId 您可以使用 spring.cloud.stream.function.bindings.. 覆盖默认绑定名称。例如,如果您想将绑定名称从 sqrt-in-0 更改为 input,您可以像 spring.cloud.stream.function.bindings.sqrt-in0-0: input 那样进行操作。不过,您仍然需要在覆盖的绑定上设置 destination(通过 spring.cloud.stream.bindings.input.destination)。 您遇到的特定异常是因为您试图重用已创建的绑定名称 - sqrt-numbers。
Kafka GroupTable 测试使用 ProcessorTopologyTestDriver 时生成额外消息
我编写了一个流,它接收消息并发送出已出现的键表。如果出现某些东西,它将显示计数为 1。这是我的生产 c 的简化版本...
为什么我的 Kafka Streams 拓扑无法正确重放/重新处理?
我有一个如下所示的拓扑: KTable users = topology.table(USERS); KStream joinRequests = topology.stream(JOIN_REQUESTS) .地图...