Spring for Apache Kafka(spring-kafka)项目将核心Spring概念应用于基于Kafka的消息传递解决方案的开发。
DefaultKafkaHeaderMapper 无法解码 json 类型
我使用的是spring-kafka,我的生产者是org.apache.kafka:kafka-clients:2.3.12,消费者是org.springframework.kafka:spring-kafka:2.1.7.RELEASE 我在消费者上收到以下错误...
Spring Batch 中的远程分区 - 在大量记录的作业完成之前,少量记录的作业无法完成
我正在尝试使用远程分区[主从方法]的Spring Batch。 我有一个主步骤,通过 KAFKA 将记录发送到工作节点。 一切都工作正常,直到并行作业执行......
为什么KafkaItemReader不支持自动重新平衡?如何实施?
我正在使用 Spring Batch 并使用 KafkaItemReader 从 Kafka 主题读取数据。但是,我注意到 KafkaItemReader 需要在
WebSocket 与 Kafka/Active Mq/Rabbit Mq |春天
我的本地应用程序详细信息: 前端 - Reactjs 部署在 4 个虚拟机中 [a、b、c、d] 后端 - Spring 微服务[无状态]部署在 4 个虚拟机中 [w,x,y,z]] 数据库-Sql服务器 负载均衡器 - F5 CDN -
在 spring-cloud-stream-binder-kafka 中配置(spring 集成的)logginghandler
我对这个问题有一个后续问题 有没有办法在 Spring Cloud Stream 中自定义 Spring Integration 模块的 LoggingHandler ?即设置 shouldLogFullMessage 为 false 或
同一个容器多次调用ContainerTestUtils.waitForAssignment时如何使用?
我正在尝试编写 Kafka 监听器的可靠测试。值得一提的是,我使用外部 kafka 容器而不是 @EmbeddedKafka。 我正在努力解决的最大问题是如何确保 Kafka
场景: 我有一个 kafka 生产者,它向主题 A 生成一些消息。该消息有一个名为 correlationID 的专用标头,我想用它来将请求消息与响应关联起来...
我有一个使用spring-kafka的maven项目。如果我运行 mvn dependency:tree 我得到: .... [信息] +- org.springframework.kafka:spring-kafka:jar:3.3.0:compile [信息] | +- org.springframework:spring-
我在不使用 Spring Boot 和 Spring Kafka 的事务性生产者的情况下遇到了 InvalidPidMappingException
GitHub 中的存储库 你好!我希望你做得很好! 我在 kubernetes 环境中工作,有 3 个使用 Spring Boot 和 Kotlin 的 Pod,并且我们使用 Apache Kafka 和 Azure 事件中心。在...
我的团队正在开发一个聊天应用程序,该应用程序使用 Kafka Reactor 在客户端之间传输数据。 我们的问题之一是向用户返回反馈。通过调用 API,例如...
kafka-client 2.7.0 使用 SASL 连接 kafka2.2,第一次发送消息抛出 TimeoutException Topic *** 在 1000 毫秒后不存在于元数据中
kafka客户端版本2.7.0 卡夫卡版本2.2 生产者配置 Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "服务器1:19000,服务器2...
我正在使用 Kafka 和 Spring-boot: 卡夫卡生产者类: @服务 公共类 MyKafkaProducer { @Autowired 私人 KafkaTemplate kafkaTemplate; 私人静态
如何包含特定版本的spring kafka。 Kafka 版本依赖项是由起始父级导入的 2.5.14。但我想用 2.6.10 版本覆盖它 我该怎么做呢? 这是我的S...
我有一个 springboot 应用程序,它将 kafka 事件发送到具有特定 avro 模式的主题。演示代码如下: @配置 @Slf4j 公共类 KafkaListener { @豆 公共职能 我有一个 springboot 应用程序,它将 kafka 事件发送到具有特定 avro 架构的主题。演示代码如下: @Configuration @Slf4j public class KafkaListener { @Bean public Function<List<DummyClass>, List<DummyClass>> acceptEvent() { return messages -> { List<DummyClass> output = new ArrayList<>(); messages.forEach(msg -> { // do work and add items to the output field }); // some more work return output; }; } } 并且 DummyClass 类是由 avro 模式生成的类(.avsc 文件在 resources folder 中定义并由 avro 插件生成)。 这是一个相当标准的设置。现在,我希望能够将事件发送到 2 个主题,而不是像我现在使用上面的代码那样将事件发送到单个主题。 在网上搜索后,我找到了一个spring文档,说明了使用元组的示例。所以我尝试了一下并实现了除了助焊剂部分之外的所有内容(因为我不想要它)。新代码最终如下: @Configuration @Slf4j public class KafkaListener { @Bean public Function<List<DummyClass>, Tuple2<List<DummyClass>,List<DummyClass>>> acceptEvent() { return messages -> { List<DummyClass> output1 = new ArrayList<>(); List<DummyClass> output2 = new ArrayList<>(); messages.forEach(msg -> { // do work and add items to the output1 and output2 fields }); // some more work return Tuples.of(output1,output2); }; } } 但是,这似乎不起作用,因为我收到以下异常: java.lang.UnsupportedOperationException: At the moment only Tuple-based function are supporting multiple arguments 我怀疑这是因为我没有使用助焊剂。 这个答案表明可能使用了错误的导入,但我验证了我使用的所有内容都来自正确的导入。 稍微不同的实现遇到了本文中指定的类似问题。但是,此人通过将包含输出参数的自定义对象作为字段传递来解决了该问题。我尝试执行相同的操作,但运行应用程序会导致以下异常: Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord 至此,我现在已经用尽了所有选择。我的问题如下: 如何实现将事件写入多个主题?如果我的实现之一就是这样,我做错了什么?还有其他事情可以尝试吗? - 我见过 KStream 据说可以做我想做的事情,但我似乎找不到好的代码示例。 经过一些来回,我发现streamBridge是实现这一目标的最佳选择。 @Configuration @Slf4j @AllArgsContstractor public class KafkaListener { private final StreamBridge streamBridge; @Bean public Function<List<DummyClass>, List<DummyClass>> acceptEvent() { return messages -> { List<DummyClass> output = new ArrayList<>(); messages.forEach(msg -> { // do work and add items to the output field you want to be sent to topic 1 if(condition) { Message<DummyClass> msgToSendViaStreamBridge = createMsg(); // method to create an event for streamBridge to send to the topic 2 streamBridge.send("acceptEvnet-out-1", msgToSendViaStreamBridge); } }); // some more work return output; }; } } 所以现在我的 Function 将向现有主题(由 acceptEvnet-out-0 文件中的 application.yml 绑定定义)发送事件,并将其放入 output 列表中。我们将事件发送到 acceptEvnet-out-1 绑定中定义的不同主题。这可用于将事件发送到任意数量的主题。
如何包含 spring kafka 的特定版本。 Kafka 版本依赖项是由起始父级导入的 2.5.14。但我想用 2.6.10 版本覆盖它 我该怎么做呢? 这是我的S...
容器化 Kafka Consumer 无法连接到 Kafka 容器
我正在尝试使用docker和spring boot来实现微服务架构。我有一个正在运行的 kafka 容器,以及一个正在运行的消费者容器。还有一个制作人面临着同样的问题
当一起运行测试时,KafkaServer 测试会抛出地址已在使用中
我有一个 Spring 应用程序,它启动本地 kafka 服务器,我想测试它。一项测试检查服务器是否正在运行,另一项测试检查 kafka 主题中是否有消息。 然而,...
如何使用Reactor Kafka KafkaSender API向两个不同的主题(位于两个不同的Kafka集群)发送消息?
我正在尝试使用Reactor Kafka的KafkaSender将消息发送到两个不同的kafka主题。 通过两个不同的卡夫卡主题,我的意思是: 集群中有一个名为“first_topic”的主题...
Spring Kafka - CommonErrorHandler 在 DeserializationException 上被忽略
我需要 Spring Kafka 行家对以下错误处理和行为背后的推理进行一些澄清。 我的目的是将 ExponentionalBackOff 暴露为 de...
我尝试通过本地 Spring Boot API 连接 kafka aws 实例。 我可以连接它,但在收听主题时,它抛出以下异常,但创建了新主题