Spring for Apache Kafka(spring-kafka)项目将核心Spring概念应用于基于Kafka的消息传递解决方案的开发。
用例是使用 Spring Integration 的 Kafka ConcurrentMessageListenerContainer 延迟重新处理下游失败消息。比如说,最大重试尝试次数应为 2,固定延迟为 5
Spring Batch - Kafka:KafkaItemReader 始终从头开始读取数据
我愿意使用Spring Batch进行Kafka数据消费。这个 spring-tips 链接有一个基本示例。 这是我的读者: @豆 KafkaItemReader kafkaItemRead...
如何将kafka的ConsumerRecord<K, V>或Spring cloud Stream的元数据暴露给反序列化器?
Spring 云流集成允许用户提供 Function> 实现来处理通过某些预配置的 MQ 实现(例如 kafka)接收到的消息。我...
我在AWS中部署了2个SpringBoot应用程序,APP1正在管理汽车,对于每辆新添加的汽车,它将创建2个仅用于该汽车的kafka主题(主题A用于事件APP1 - > APP2和...
如何将主题信息添加到Spring Kafka Template的metrics中?
我有指标“spring_kafka_template_seconds_count”,但没有主题信息。我使用2.8.11 spring-kafka版本。在文档中 https://docs.spring.io/spring-kafka/reference/kafka/micro...
Kafka Streams - 为什么我不能聚合和总结我的多头?
我是 Kafka Streams 的新手,我正在尝试拼凑我的第一个应用程序。 我想将我的银行交易金额加起来。 @豆 公共 KStream kStream(StreamsBuilder
如何处理spring-kafka中批量监听器的验证和转换失败
我们使用来自 spring-kafka 的批处理侦听器以及自动解析为 Java dtos 的 JSON 负载,因此我们的侦听器方法目前看起来像这样: @KafkaListener(主题 = MY_TO...
使用动态加载的证书在 Spring 中设置与 Kafka 的 mTLS 连接
我正在开发一个 Spring Kafka 项目,该项目使用以下库 org.springframework.kafka:spring-kafka:3.0.12 连接到 Kafka 服务器。它使用来自
我一直在尝试对我的代码库中这段看起来非常丑陋的代码进行一些修改。 原始代码如下所示: 长变量=Optional.of(消息) .map(消息::getPayload) ...
为什么Kafka JsonSerializer无法序列化kafka ProducerRecord?
我正在尝试使用 Spring Kafka JsonSerializer 从生产者发送 JSON 对象,但在发送与 ProducerRecord 序列化相关的消息时遇到异常。我预料到了
AdminClient 的 Kafka Spring 启动测试用例中的问题
我正在为下面的课程编写单元测试用例。我正在尝试模拟管理客户端,以便我可以调用下面的方法创建主题。但出现空指针异常。 @服务 公开课TopicSer...
我正在使用 Kafka 来处理一个有 4 个分区的主题。 Kafka 中消息的保留期 (TTL) 默认设置为 7 天。我正在运行一个非流式批处理作业,处理来自...的数据
使用 spring kafka 在 kafka 事务中进行一次发送有意义吗?
从 kafka 3.0 开始,我们默认获得最强的交付保证(acks=all,enable.idempotence=true)。 (https://www.confluence.io/blog/apache-kafka-3-0-major-improvements-and-new-features/)。 大多数情况下,
我有一个用例,其中有 2 个不同的主题,来自 2 个不同的应用程序/生产者,其中的事件通过键(例如用户 ID)相关。 为了顺序处理......
批处理监听器没有 BackOff 的 DefaultAfterRollbackProcessor
我正在尝试为批处理侦听器配置错误处理,这样,如果单个记录处理失败,则整个批次将被发送到死信主题(带有一些额外的日志记录)...
如何使用Spring Kafka的Acknowledgement.acknowledge()方法进行手动提交
我第一次使用Spring Kafka,我无法在我的消费者代码中使用Acknowledgement.acknowledge()方法进行手动提交,如此处https://docs.spring.io/spring-kafka/referenc所述。 .
我正在编写一个spring-boot应用程序,接收请求并进行一些数据处理,然后将数据写入kafka。 当我进行压力测试时,我发现kafka消息传递非常慢。 数据公关...
Kafka AdminClient 连接问题:Spring 应用程序中“与 localhost/127.0.0.1 的连接已断开”
我在从本地运行的 Spring 应用程序连接到我的 Kafka 集群时遇到问题。应用程序成功将 localhost 解析为 127.0.0.1,但随后立即断开连接...
ErrorHandlingDeserializer 中的 Kafka 消息验证
我不明白应该如何配置它来验证 ErrorHandlingDeserializer 中的消息。 春天: 格式: 使用: 类型: 标题:假 卡夫卡: 消费者: 关键-deseria...
KafkaListenerEndpointContainer 无法使用 Spring Kafka 创建 Kafka 事务
我正在使用 spring-Kafka 2.2.2.RELEASE(org.apache.kafka:kafka-clients:jar:2.0.1) 和 spring-boot(2.1.1)。 我无法执行事务,因为我的侦听器无法获取分区...