Kafka消费者的第一次调查未检索到主题消息。有什么问题吗?

问题描述 投票:0回答:1

我有一个简单的Kafka 2.4.1(Confluent 5.4.1)安装程序在Docker中本地运行。而且我使用用Java编写的测试生产者和测试使用者。该代码可在GitHub中找到。

单元测试可以:

  • 生产者将一个消息发送到单个分区主题
  • 一个用户订阅了该主题,并向Kafka投票以获取可用消息

问题是:使用者的第一次运行将跳过该主题中已经产生的消息。 real问题是那些丢失的消息丢失了(从使用者的角度来看:偏移量已移至主题中的最新位置,并且滞后时间为0 Kafka Tool中都可见)

第一次运行后的结果是:

-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running com.example.TestKafkaProducer
Timestamp: Thu Mar 26 10:26:51 CET 2020
Offset: 0
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.686 sec
Running com.example.TestKafkaConsumer
Record count: 0
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.561 sec

Results :

Tests run: 2, Failures: 0, Errors: 0, Skipped: 0

第二次测试给出:

-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running com.example.TestKafkaProducer
Timestamp: Thu Mar 26 10:28:08 CET 2020
Offset: 1
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.538 sec
Running com.example.TestKafkaConsumer
Record count: 1
offset = 1, key = static-key, value = this is the string message at Thu Mar 26 10:28:08 CET 2020
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.138 sec

Results :

Tests run: 2, Failures: 0, Errors: 0, Skipped: 0

我尝试修改各种变体,并且总是得到相同的结果:

  • 在产生第一条消息之前,等待卡夫卡“热身”
  • 在生产和消费之间等待一段时间
  • 在第一次使用前产生几条消息
  • 未经Kafka Tool的检查而生产和消费(以避免任何第三者的未知干扰)

有时我还观察到第二次消费者仍然错过了所产生的事件。

apache-kafka kafka-consumer-api kafka-producer-api
1个回答
0
投票

查看GitHub存储库上的代码,似乎您没有设置使用者配置auto.offset.reset。根据documentation,此设置默认为latest。这意味着,如果经纪人不知道您的测试主题使用者组,则它将仅等待新的传入消息。因此,TestConsumer无法使用生产者测试之前编写的消息。

documentation很好地解释了Kafka中的“消费群体”概念。

© www.soinside.com 2019 - 2024. All rights reserved.