Spring for Apache Kafka(spring-kafka)项目将核心Spring概念应用于基于Kafka的消息传递解决方案的开发。
Spring Kafka NewTopic TopicBuilder,--command-config 选项?
当我使用 kafka-topics.bat 创建主题时,我这样做: kafka-topics.bat --bootstrap-server %host%:%port% --create --topic %%t --partitions %partitions% --replication-factor %replication_factor% --
Spring Kafka:发送消息时无法序列化FailedDeserializationInfo
我正在使用 Spring Kafka 批处理侦听器并使用 ErrorHandlingDeserializer 来处理反序列化错误。当消息遇到 DeserializerException 时,其标头包含 '
尝试使用 KafkaAdmin.NewTopics 创建 kafka 主题,但 KafkaListener 设法在它之前创建一个主题
我正在尝试在本地环境中复制生产主题配置。但我有一个 Kafka 监听器,它设法在 NewTopics bean 之前启动,所以只有我的几个主题正在运行
如何使用 testcontainers 和 spring-kafka 准备测试
我正在尝试为 kafka 消息传递设置集成测试,并从使用嵌入式 Kafka 切换到测试容器。给定 docker-compose 和基类的以下配置...
Kafka 客户与 TestContainer 集成测试的问题
我在测试 Kafka 消费者与 kafkaContainer 的集成时遇到问题。 实际上,当我使用模板发送消息时,不会调用消费者。 你能帮助我吗? 这里是...
Springboot Kafka Consumer不在从主题消息接收到的日志中打印TraceId-SpanId
问题在于消费者日志中的traceid 和spanid 未打印。 我有 2 个应用程序生产者和消费者。两者都在 springboot 3 中。 两者都通过 Kafka 进行通信。两者都有微米勇敢的依赖性......
我正在Spring Cloud + Kafka上创建java微服务应用程序,在开发过程中我问自己一个问题,是否正确创建了kafka主题。我就是这样做的...
Spring Kafka JsonMessageConverter 发送消息时抛出 UnsupportedOperationException
@配置 公共类配置{ @豆 公共 RecordMessageConverter 转换器(){ 返回新的 JsonMessageConverter(); } @豆 公共 BatchMessagingMessageConver...
如何在EmbeddedKafkaBroker中生成消息,容器工厂在Spring Boot中使用EmbeddedKafkaBroker
我在我的@KafkaListener容器工厂中使用了EmbeddedKafkaBroker引导服务器作为本地主机:9092,那么我如何向EmbeddedKafkaBroker生成一条消息,以便我的@KafkaListener注释
@KafkaListener 的 autoStartup 为 false 时 Spring-kafka 重试问题
在我的一个应用程序中,我想在手动启动 @KafkaListener 时使用非阻塞重试,如此处所述。 添加一个像这样的@EventListener: @EventListener(ApplicationReadyEvent.
我创建了一个单元测试来测试 Kafka 监听器,如下所示。 @SpringBootTest @EmbeddedKafka(分区= 1,brokerProperties = {“listeners = PLAINTEXT:// localhost:9092”,“端口= ...
等待kafka消费者轮询后再将消息写入kafka spring boot
我有以下问题。 我想启动一个从 kafka 读取和写入的 Spring Boot 应用程序 使用@KafkaListener。 我想要: 将 kafka 消费者初始化为最新的偏移量。 ...
我的Spring Boot kafka生产者工作正常,我使用邮递员测试了它,发送的消息是所以我使用String序列化器作为键,使用Json序列化器作为值, 在
为Spring Kafka设置authorizationExceptionRetryInterval
任何人都知道如何设置新属性:authorizationExceptionRetryInterval,而无需手动创建ConcurrentKafkaListenerContainerFactory。
Springs Kafka Consumer最佳实践:消费者应该接收什么样的消息
我需要开始使用kafka。我很难弄清楚消费者应该收到什么: 据我了解,我们可以通过多种方式配置消费者: 示例1: @
在 Spring boot 中尝试 10 次后 Kafka 偏移量递增
我们有 Spring Boot 应用程序来消费来自 Kafka 的消息并放入数据库。 如果发生任何异常(例如数据库关闭等),我们不希望在应用程序中执行任何错误处理...
我有一个业务需求,需要减慢 kafka 消费者的消费速度,因为它可能会对其他微服务造成影响。经过一番研究,我找到了解决方案
我希望通过让同一生产者的两个或三个实例向 Kafka 发送事件来扩展我的 Kafka 生产者。但是,由于网络故障或其他问题,可能会产生一些事件...
@Bean 公共 RecordMessageConverter 转换器(){ // 返回新的 JsonMessageConverter(); JsonMessageConverter jsonMessageConverter = new JsonMessageConverter();
尝试了下面的SPEL表达式,但失败了。需要帮忙! @KafkaListener(topics = "#{Arrays.asList(${kafka.topic.helloworld}.split(',')).stream().map(p -> p+envSuffix).toArray(String[]::新...