kafka-consumer-api 相关问题

用于与Apache Kafka使用者API相关的问题

如何处理Kafka消费者挂起/滞后和未处理的消息?

我正在使用 Kafka 消费者,该消费者在处理来自分区的消息时偶尔会挂起或滞后。这引起了一些问题,我需要有关最佳实践和配置的指导......

回答 1 投票 0

Kafka 分区重新平衡

当我们的kafka主题中有多个分区时,分区重新平衡是常见的事情吗? 这并不一定意味着我们的应用程序存在一些延迟或问题? 我一直在看日志...

回答 3 投票 0

Kafka 保留设置 - 如果所有消费者组都消费了一个主题,则从队列中删除

假设我有一个kafka队列和一个名为TOPIC的主题,并且我有两个消费者组CONSUMER1和CONSUMER2。我在 TOPIC 中添加了 1000 条数据。 Consumer1 有消费者 800 条数据,CONSUMER2 有

回答 1 投票 0

延迟kafka手动提交偏移量有什么影响?

我们想要手动提交kafka偏移量来控制数据丢失事件,但是我们可能会延迟手动提交,因为我们只想在持久化到数据源之后才执行此操作。 我想了解如何减慢...

回答 1 投票 0

Kafka 生产者超时异常:1 条记录即将过期

我正在使用 Kafka 和 Spring-boot: 卡夫卡生产者类: @服务 公共类 MyKafkaProducer { @Autowired 私人 KafkaTemplate kafkaTemplate; 私人静态

回答 5 投票 0

kafka topic 中理想的分区数是多少?

我正在学习 Kafka 并尝试为我最近的搜索应用程序创建一个主题。假设推送到 kafka 主题的数据数量很大。 我的 kafka 集群有 3 个代理,并且有

回答 3 投票 0

Kafka消费者客户端未在zookeeper上注册消费者组的偏移量

我正在尝试使用 kafka-clients v.0.10.2.1 创建具有不同消费者组的多个消费者到一个 kafka 主题。虽然我无法检索消费者组提交的最后一个偏移量...

回答 2 投票 0

无法使用传入消息调用 Kafka Listener 方法

我正在使用 Spring Boot 应用程序在 Kafka Producer 中将其转换为 toString() 来发送 JSON 数组,但在 Consumer 中收到以下错误: org.springframework.kafka.listener.

回答 2 投票 0

使用 Reactor Kafka 定期轮询消费者指标

我们有一个使用 Reactor Kafka 和 KafkaReceiver 进行消费的 Spring Boot 项目,我们希望收集和发出底层消费者指标。看起来我们可以利用 KafkaRecei...

回答 1 投票 0

具有基于线程并行性的 Kafka Streams 与用于并行处理的 Kafka Parallel Consumers

我们正在为实时协作形式构建事件驱动的架构。我们的解决方案使用 Kafka 作为事件代理,其中事件排序和有状态流处理是关键要求。 ...

回答 1 投票 0

从自定义偏移量恢复 Kafka Stream

我正在尝试找到某种方法来从手动偏移恢复 Kafka Streams。 通过在互联网上查找,我没有找到任何说可以的答案。 有什么办法吗?或者必须回到低水平

回答 1 投票 0

Docker Kafka 与 Python 消费者

我正在使用dockerized Kafka并编写了一个Kafka消费者程序。当我在本地计算机上的 docker 和应用程序中运行 Kafka 时,它运行得很好。但是当我在

回答 4 投票 0

Kafka 中消费者组协调员和消费者组领导者有什么区别?

我看到了对卡夫卡消费者组协调员和消费者组领导者的引用...... 有什么区别? 将集团管理分为两组不同的好处是什么

回答 2 投票 0

如何获取kafka中未提交的消息

我在java中有一个函数,我试图在其中获取未读的消息。例如,如果我在代理中有offSet 0,1,2 的消息已被消费者读取,并且如果我切换...

回答 1 投票 0

通过 Kafka 消费者重试维持订单保证

我正在研究一种在基于 Kafka 的数据处理管道中进行消费者重试的架构。我们正在使用 Kafka 生产者和消费者,并正在考虑重试主题……

回答 3 投票 0

更改 Kafka docker-compose 中的分配策略

我正在使用 docker 在本指南的帮助下运行 Kafka。我运行消费者,但由于错误 *尝试加入组因致命错误而失败:[错误 23]

回答 1 投票 0

Kafka 消息压缩不起作用(代理级别)

我在所有三个代理上的代理级别compression.type=zstd 上添加了日志消息压缩,在代理级别或生产者级别上没有其他更改。 当我尝试从我的 py 读取消息时...

回答 1 投票 0

如何使用Spring Kafka的Acknowledgement.acknowledge()方法进行手动提交

我第一次使用Spring Kafka,我无法在我的消费者代码中使用Acknowledgement.acknowledge()方法进行手动提交,如此处https://docs.spring.io/spring-kafka/referenc所述。 .

回答 4 投票 0

监控分配给分区主题的消费者的延迟

我正在使用 Kafka 0.9.1 新的消费者 API。消费者被手动分配到一个分区。对于这个消费者,我希望看到它的进展(意味着滞后)。由于我添加了组 ID con...

回答 6 投票 0

Kafka:监控分配给分区主题的消费者的延迟

我正在使用 Kafka 0.9.1 新的消费者 API。消费者被手动分配到一个分区。对于这个消费者,我希望看到它的进展(意味着滞后)。由于我添加了组 ID con...

回答 6 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.