用于与Apache Kafka使用者API相关的问题
我正在使用 Kafka 和 Spring-boot: 卡夫卡生产者类: @服务 公共类 MyKafkaProducer { @Autowired 私人 KafkaTemplate kafkaTemplate; 私人静态
我正在学习 Kafka 并尝试为我最近的搜索应用程序创建一个主题。假设推送到 kafka 主题的数据数量很大。 我的 kafka 集群有 3 个代理,并且有
Kafka消费者客户端未在zookeeper上注册消费者组的偏移量
我正在尝试使用 kafka-clients v.0.10.2.1 创建具有不同消费者组的多个消费者到一个 kafka 主题。虽然我无法检索消费者组提交的最后一个偏移量...
我正在使用 Spring Boot 应用程序在 Kafka Producer 中将其转换为 toString() 来发送 JSON 数组,但在 Consumer 中收到以下错误: org.springframework.kafka.listener.
我们有一个使用 Reactor Kafka 和 KafkaReceiver 进行消费的 Spring Boot 项目,我们希望收集和发出底层消费者指标。看起来我们可以利用 KafkaRecei...
具有基于线程并行性的 Kafka Streams 与用于并行处理的 Kafka Parallel Consumers
我们正在为实时协作形式构建事件驱动的架构。我们的解决方案使用 Kafka 作为事件代理,其中事件排序和有状态流处理是关键要求。 ...
我正在尝试找到某种方法来从手动偏移恢复 Kafka Streams。 通过在互联网上查找,我没有找到任何说可以的答案。 有什么办法吗?或者必须回到低水平
我正在使用dockerized Kafka并编写了一个Kafka消费者程序。当我在本地计算机上的 docker 和应用程序中运行 Kafka 时,它运行得很好。但是当我在
我看到了对卡夫卡消费者组协调员和消费者组领导者的引用...... 有什么区别? 将集团管理分为两组不同的好处是什么
我在java中有一个函数,我试图在其中获取未读的消息。例如,如果我在代理中有offSet 0,1,2 的消息已被消费者读取,并且如果我切换...
我正在研究一种在基于 Kafka 的数据处理管道中进行消费者重试的架构。我们正在使用 Kafka 生产者和消费者,并正在考虑重试主题……
更改 Kafka docker-compose 中的分配策略
我正在使用 docker 在本指南的帮助下运行 Kafka。我运行消费者,但由于错误 *尝试加入组因致命错误而失败:[错误 23]
我在所有三个代理上的代理级别compression.type=zstd 上添加了日志消息压缩,在代理级别或生产者级别上没有其他更改。 当我尝试从我的 py 读取消息时...
如何使用Spring Kafka的Acknowledgement.acknowledge()方法进行手动提交
我第一次使用Spring Kafka,我无法在我的消费者代码中使用Acknowledgement.acknowledge()方法进行手动提交,如此处https://docs.spring.io/spring-kafka/referenc所述。 .
我正在使用 Kafka 0.9.1 新的消费者 API。消费者被手动分配到一个分区。对于这个消费者,我希望看到它的进展(意味着滞后)。由于我添加了组 ID con...
我正在使用 Kafka 0.9.1 新的消费者 API。消费者被手动分配到一个分区。对于这个消费者,我希望看到它的进展(意味着滞后)。由于我添加了组 ID con...
KafkaListenerEndpointContainer 无法使用 Spring Kafka 创建 Kafka 事务
我正在使用 spring-Kafka 2.2.2.RELEASE(org.apache.kafka:kafka-clients:jar:2.0.1) 和 spring-boot(2.1.1)。 我无法执行事务,因为我的侦听器无法获取分区...
如何在 Confluence kafka C# 库中获取 Kafka 主题的最新偏移量?
我正在使用 Confluence kafka C# 客户端。如何获取此主题中消耗的最新偏移量?
一个应用程序(生产者)正在发布消息,这些消息正在被另一个应用程序(具有多个消费者)使用。生产者向现场国家发送数据,我们将有更多...
如果我指定一个返回泛型类的方法,我该怎么做才能动态指定泛型类的类型? 例如 尝试 { 类 c =Class.forName(keytype); 类...