我有一个简单的Kafka 2.4.1(Confluent 5.4.1)安装程序在Docker中本地运行。而且我使用用Java编写的测试生产者和测试使用者。该代码可在GitHub中找到。
单元测试可以:
问题是:使用者的第一次运行将跳过该主题中已经产生的消息。 real问题是那些丢失的消息丢失了(从使用者的角度来看:偏移量已移至主题中的最新位置,并且滞后时间为0 Kafka Tool中都可见)
第一次运行后的结果是:
-------------------------------------------------------
T E S T S
-------------------------------------------------------
Running com.example.TestKafkaProducer
Timestamp: Thu Mar 26 10:26:51 CET 2020
Offset: 0
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.686 sec
Running com.example.TestKafkaConsumer
Record count: 0
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.561 sec
Results :
Tests run: 2, Failures: 0, Errors: 0, Skipped: 0
第二次测试给出:
-------------------------------------------------------
T E S T S
-------------------------------------------------------
Running com.example.TestKafkaProducer
Timestamp: Thu Mar 26 10:28:08 CET 2020
Offset: 1
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.538 sec
Running com.example.TestKafkaConsumer
Record count: 1
offset = 1, key = static-key, value = this is the string message at Thu Mar 26 10:28:08 CET 2020
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.138 sec
Results :
Tests run: 2, Failures: 0, Errors: 0, Skipped: 0
我尝试修改各种变体,并且总是得到相同的结果:
有时我还观察到第二次消费者仍然错过了所产生的事件。
查看GitHub存储库上的代码,似乎您没有设置使用者配置auto.offset.reset
。根据documentation,此设置默认为latest
。这意味着,如果经纪人不知道您的测试主题使用者组,则它将仅等待新的传入消息。因此,TestConsumer无法使用生产者测试之前编写的消息。
documentation很好地解释了Kafka中的“消费群体”概念。