Spring for Apache Kafka(spring-kafka)项目将核心Spring概念应用于基于Kafka的消息传递解决方案的开发。
DefaultErrorHandler 不可配置如果 @RetryableTopic 用于重试和 DLT 处理程序
春季启动版本:2.7.6 春季卡夫卡版本:2.8.11 问题: 我试图处理代码中的反序列化问题。为了在代码中处理此类问题,我通过扩展创建了自己的类
KafkaTemplate.send() 既不返回也不抛出异常
我正在使用 SpringKafka(Springboot 版本 3.1.7)将消息发布到 Azure EventHub 命名空间。生产者应用程序也部署在 Azure 中。直到上周一切都运转良好,但最后
如何在Spring XML + Kafka App中进行错误处理?我正在使用 JSON 来生成和消费消息,但是当消费者获取垃圾数据时,它会运行无限循环。 这是我用过的 spring.xml ...
我想扩展这里面临的进一步问题:设置构造函数时的 org.springframework.kafka.support.serializer.ErrorHandlingDeserializ' of type serializer.ErrorHandlingDeserializer]
在java项目中使用kafka进行更新和插入操作是不是一个好主意?
在java项目中使用kafka进行更新和插入操作是一个好主意吗?我们正在将遗留批处理项目从 Perl 迁移到 Spring Boot 微服务,并且我们计划使用 kafka 作为 DB
每个任务的 KafkaProducer 或单例 KafkaProducer
我有几个并行运行的任务(ExecutorService 和 Runnable 任务)。对所有这些都使用一个 KafkaProducer 实例(Spring singleton bean)是一个好的选择吗?在这种情况下,如何
通过参考:How to handle the exception at Consumer in Spring XMl App?,我试图在 xml bean 中转换 Java bean 依赖关系并面临以下错误。 有人可以解释一下...
我在卡夫卡消费者端遇到了下面的异常。令人惊讶的是,这个问题与旧版本的代码不一致(具有完全相同的配置,但有一些新的不相关...
Spring XMl App中consumer端如何处理异常?
如何在Spring XML + Kafka App中进行错误处理?我正在使用 JSON 来生成和消费消息,但是当消费者获取垃圾数据时,它会运行无限循环。 已在此处上传代码:https://
Spring boot kafka - 如何告诉 JsonDeserializer 忽略类型标头?
Spring 的 Kafka 生产者将类型标头嵌入到消息中,该消息指定消费者应将消息反序列化到哪个类。当生产者不使用 Spring Kafka 时,这是一个问题...
我想将这段Java代码迁移到Spring Boot 3: 公共无效发送(字符串主题,K键,V消息,BiConsumer,Throwable>回调){ 完整的未来 我想将此 Java 代码迁移到 Spring Boot 3: public void send(String topic, K key, V message, BiConsumer<SendResult<K, V>, Throwable> callback) { CompletableFuture<SendResult<K, V>> future = kafkaTemplate.send(topic, key, message); if (Objects.nonNull(callback)) { future.whenComplete(callback); } } ...... // call kafka method kafkaProducer().send("topic", "key", "messge", listenableFutureCallback("12345")); ...... private ListenableFutureCallback listenableFutureCallback(String userId) { return new ListenableFutureCallback() { @Override public void onFailure(Throwable ex) { ....... } @Override public void onSuccess(Object result) { ........ } }; } 我在这一行遇到错误:listenableFutureCallback("12345") Required type: BiConsumer <org.springframework.kafka.support.SendResult<java.lang.String,java.lang.String>, java.lang.Throwable> Provided: ListenableFutureCallback 你知道我应该如何迁移/替换 ListenableFutureCallback 以便从 BiConsumer Java 方法返回 listenableFutureCallback 吗? 这似乎是 BiConsumer 导入的问题。您应该使用“java.util.function.BiConsumer”中的 BiConsumer,而不是“org.apache.kafka.commonKafkaFuture.BiConsumer”。 PS:据我了解,ListenableFutureCallback 在 Spring 6.0 中已被弃用。请尝试从您的代码中删除它并使用 CompletableFuture 代替。 https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/util/concurrent/ListenableFutureCallback.html
Spring boot @KafkaListener 不使用框架中提到的 SPEL 监听主题名称
我开发了一个kafka监听器。以下是我设计的不同课程: 类 KafkaProperties { 公共字符串主题ZZZ; 公共字符串组ID; ... } 类 KafkaConfig { ...
java.lang.RuntimeException:不可为 null 的字段 authBytes 从 Spring Boot 序列化为 null 到 Azure 事件中心
我正在使用 v2.7.5 开发一个简单的 Spring Boot 应用程序,并尝试将消息发送到事件中心。有人可以指导我吗? 错误: java.lang.RuntimeException:不可为 null 的字段 authBytes 为
@SpringBootApplication 公共类 SpringKafkaApplication { 公共静态无效主(字符串[] args){ SpringApplication.run(SpringKafkaApplication.class, args); } @Autowired
当密码包含“:”字符时,JaasConfig 会错误地解析应用配置
我有 SASL 身份验证问题,该身份验证使用 JaasConfig 配置来构建登录连接字符串 我的连接字符串看起来像 required username= pass...
在 Docker-compose 中使用 Kafka 运行 Spring Boot 应用程序
我有应用程序(kafka客户端)。我有下一个属性: spring.kafka.bootstrap-servers=127.0.0.1:29092 spring.kafka.consumer.group-id=mc3 我在 Docker-compose 中有 kafka: 卡夫卡: 图片:
我们正在考虑在我们的消息传递中使用 Kafka,我们的应用程序是使用 Spring 开发的。所以,我们打算使用spring-kafka。 生产者将消息作为 HashMap 对象放入...
KafkaTemplate 和 KafkaProducer 发送方法的区别?
我的问题是在使用kafka的Spring Boot微服务中什么适合使用KafkaTemplate.send()或KafkaProducer.send() 我使用 KafkaConsumer 而不是 KafkaListener 来轮询记录
如何在 Kubernetes multipod 部署中使用 spring kafka 处理 Kafka 容器生命周期
我正在使用 Spring kafka 实现,我需要通过 REST API 启动和停止我的 kafka 消费者。为此,我正在使用 KafkaListenerEndpointRegistry endpointRegistry 端点注册表。
某些消息随机获取 IAMClientCallbackHandler 类未找到异常
在我们目前的用例中,我们一条一条地发送消息[不是批量]以维持传入的顺序顺序,同时一条一条地发送大约 5000 多条消息或使用