Spring for Apache Kafka(spring-kafka)项目将核心Spring概念应用于基于Kafka的消息传递解决方案的开发。
我正在使用 Kafka 和 Spring-boot: 卡夫卡生产者类: @服务 公共类 MyKafkaProducer { @Autowired 私人 KafkaTemplate kafkaTemplate; 私人静态
如何包含特定版本的spring kafka。 Kafka 版本依赖项是由起始父级导入的 2.5.14。但我想用 2.6.10 版本覆盖它 我该怎么做呢? 这是我的S...
我有一个 springboot 应用程序,它将 kafka 事件发送到具有特定 avro 模式的主题。演示代码如下: @配置 @Slf4j 公共类 KafkaListener { @豆 公共职能 我有一个 springboot 应用程序,它将 kafka 事件发送到具有特定 avro 架构的主题。演示代码如下: @Configuration @Slf4j public class KafkaListener { @Bean public Function<List<DummyClass>, List<DummyClass>> acceptEvent() { return messages -> { List<DummyClass> output = new ArrayList<>(); messages.forEach(msg -> { // do work and add items to the output field }); // some more work return output; }; } } 并且 DummyClass 类是由 avro 模式生成的类(.avsc 文件在 resources folder 中定义并由 avro 插件生成)。 这是一个相当标准的设置。现在,我希望能够将事件发送到 2 个主题,而不是像我现在使用上面的代码那样将事件发送到单个主题。 在网上搜索后,我找到了一个spring文档,说明了使用元组的示例。所以我尝试了一下并实现了除了助焊剂部分之外的所有内容(因为我不想要它)。新代码最终如下: @Configuration @Slf4j public class KafkaListener { @Bean public Function<List<DummyClass>, Tuple2<List<DummyClass>,List<DummyClass>>> acceptEvent() { return messages -> { List<DummyClass> output1 = new ArrayList<>(); List<DummyClass> output2 = new ArrayList<>(); messages.forEach(msg -> { // do work and add items to the output1 and output2 fields }); // some more work return Tuples.of(output1,output2); }; } } 但是,这似乎不起作用,因为我收到以下异常: java.lang.UnsupportedOperationException: At the moment only Tuple-based function are supporting multiple arguments 我怀疑这是因为我没有使用助焊剂。 这个答案表明可能使用了错误的导入,但我验证了我使用的所有内容都来自正确的导入。 稍微不同的实现遇到了本文中指定的类似问题。但是,此人通过将包含输出参数的自定义对象作为字段传递来解决了该问题。我尝试执行相同的操作,但运行应用程序会导致以下异常: Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord 至此,我现在已经用尽了所有选择。我的问题如下: 如何实现将事件写入多个主题?如果我的实现之一就是这样,我做错了什么?还有其他事情可以尝试吗? - 我见过 KStream 据说可以做我想做的事情,但我似乎找不到好的代码示例。 经过一些来回,我发现streamBridge是实现这一目标的最佳选择。 @Configuration @Slf4j @AllArgsContstractor public class KafkaListener { private final StreamBridge streamBridge; @Bean public Function<List<DummyClass>, List<DummyClass>> acceptEvent() { return messages -> { List<DummyClass> output = new ArrayList<>(); messages.forEach(msg -> { // do work and add items to the output field you want to be sent to topic 1 if(condition) { Message<DummyClass> msgToSendViaStreamBridge = createMsg(); // method to create an event for streamBridge to send to the topic 2 streamBridge.send("acceptEvnet-out-1", msgToSendViaStreamBridge); } }); // some more work return output; }; } } 所以现在我的 Function 将向现有主题(由 acceptEvnet-out-0 文件中的 application.yml 绑定定义)发送事件,并将其放入 output 列表中。我们将事件发送到 acceptEvnet-out-1 绑定中定义的不同主题。这可用于将事件发送到任意数量的主题。
如何包含 spring kafka 的特定版本。 Kafka 版本依赖项是由起始父级导入的 2.5.14。但我想用 2.6.10 版本覆盖它 我该怎么做呢? 这是我的S...
容器化 Kafka Consumer 无法连接到 Kafka 容器
我正在尝试使用docker和spring boot来实现微服务架构。我有一个正在运行的 kafka 容器,以及一个正在运行的消费者容器。还有一个制作人面临着同样的问题
当一起运行测试时,KafkaServer 测试会抛出地址已在使用中
我有一个 Spring 应用程序,它启动本地 kafka 服务器,我想测试它。一项测试检查服务器是否正在运行,另一项测试检查 kafka 主题中是否有消息。 然而,...
如何使用Reactor Kafka KafkaSender API向两个不同的主题(位于两个不同的Kafka集群)发送消息?
我正在尝试使用Reactor Kafka的KafkaSender将消息发送到两个不同的kafka主题。 通过两个不同的卡夫卡主题,我的意思是: 集群中有一个名为“first_topic”的主题...
Spring Kafka - CommonErrorHandler 在 DeserializationException 上被忽略
我需要 Spring Kafka 行家对以下错误处理和行为背后的推理进行一些澄清。 我的目的是将 ExponentionalBackOff 暴露为 de...
我尝试通过本地 Spring Boot API 连接 kafka aws 实例。 我可以连接它,但在收听主题时,它抛出以下异常,但创建了新主题
用例是使用 Spring Integration 的 Kafka ConcurrentMessageListenerContainer 延迟重新处理下游失败消息。比如说,最大重试尝试次数应为 2,固定延迟为 5
Spring Batch - Kafka:KafkaItemReader 始终从头开始读取数据
我愿意使用Spring Batch进行Kafka数据消费。这个 spring-tips 链接有一个基本示例。 这是我的读者: @豆 KafkaItemReader kafkaItemRead...
如何将kafka的ConsumerRecord<K, V>或Spring cloud Stream的元数据暴露给反序列化器?
Spring 云流集成允许用户提供 Function> 实现来处理通过某些预配置的 MQ 实现(例如 kafka)接收到的消息。我...
我在AWS中部署了2个SpringBoot应用程序,APP1正在管理汽车,对于每辆新添加的汽车,它将创建2个仅用于该汽车的kafka主题(主题A用于事件APP1 - > APP2和...
如何将主题信息添加到Spring Kafka Template的metrics中?
我有指标“spring_kafka_template_seconds_count”,但没有主题信息。我使用2.8.11 spring-kafka版本。在文档中 https://docs.spring.io/spring-kafka/reference/kafka/micro...
Kafka Streams - 为什么我不能聚合和总结我的多头?
我是 Kafka Streams 的新手,我正在尝试拼凑我的第一个应用程序。 我想将我的银行交易金额加起来。 @豆 公共 KStream kStream(StreamsBuilder
如何处理spring-kafka中批量监听器的验证和转换失败
我们使用来自 spring-kafka 的批处理侦听器以及自动解析为 Java dtos 的 JSON 负载,因此我们的侦听器方法目前看起来像这样: @KafkaListener(主题 = MY_TO...
使用动态加载的证书在 Spring 中设置与 Kafka 的 mTLS 连接
我正在开发一个 Spring Kafka 项目,该项目使用以下库 org.springframework.kafka:spring-kafka:3.0.12 连接到 Kafka 服务器。它使用来自
我一直在尝试对我的代码库中这段看起来非常丑陋的代码进行一些修改。 原始代码如下所示: 长变量=Optional.of(消息) .map(消息::getPayload) ...
为什么Kafka JsonSerializer无法序列化kafka ProducerRecord?
我正在尝试使用 Spring Kafka JsonSerializer 从生产者发送 JSON 对象,但在发送与 ProducerRecord 序列化相关的消息时遇到异常。我预料到了
AdminClient 的 Kafka Spring 启动测试用例中的问题
我正在为下面的课程编写单元测试用例。我正在尝试模拟管理客户端,以便我可以调用下面的方法创建主题。但出现空指针异常。 @服务 公开课TopicSer...