如何测试偏移量是否已提交到 Kafka

问题描述 投票:0回答:3

我有一个 Akka Stream Kafka 源,正在读取 Kafka 主题。

我有一个简单的任务,允许禁用消息偏移量的提交。提交通常是调用 commitScaladsl 完成的。

我的问题是我不知道如何测试偏移量是否已提交。

我们通常使用EmbeddedKafka进行测试,但我还没有找到一种方法来询问最后提交的偏移量。

这是我编写的测试示例:

  "KafkaSource" should {
    "consume from a kafka topic and pass the message " in {
      val commitToKafka = true
      val key = "key".getBytes
      val message = "message".getBytes

      withRunningKafka {

        val source = getKafkaSource(commitToKafka)
        val (_, sub) = source
          .toMat(TestSink.probe[CommittableMessage[Array[Byte], Array[Byte], ConsumerMessage.CommittableOffset]])(Keep.both)
          .run()

        val messageOpt = publishAndRequestRetry(topic, key, message, sub, retries)
        messageOpt should not be empty
        messageOpt.get.value shouldBe message
      }
    }

现在我想添加一个检查是否已提交偏移量。

scala unit-testing apache-kafka akka-stream
3个回答
2
投票

我最终使用 ConsumerInterceptor 解决了这个问题,定义为:

class Interceptor extends ConsumerInterceptor[Array[Byte], Array[Byte]] {
  override def onConsume(records: ConsumerRecords[Array[Byte], Array[Byte]]): ConsumerRecords[Array[Byte], Array[Byte]] = records

  override def onCommit(offsets: java.util.Map[TopicPartition, OffsetAndMetadata]): Unit = {
    import scala.collection.JavaConverters._
    OffsetRecorder.add(offsets.asScala)
  }

  override def close(): Unit = {}

  override def configure(configs: java.util.Map[String, _]): Unit = OffsetRecorder.clear

}

onCommit 在提交完成时被调用,在本例中我只是记录它。我使用配置方法在每次测试开始时都有空记录。

然后,在为源创建使用者设置时,我将拦截器添加为属性:

  ConsumerSettings(system, new ByteArrayDeserializer, new ByteArrayDeserializer)
    .withBootstrapServers(s"localhost:${kafkaPort}")
    .withGroupId("group-id")
    .withProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "package.of.my.test.Interceptor")

1
投票

Kafka 通过 TopicName 和 PartitionID 存储偏移量。因此,您可以使用

.committed()
.position
方法来检查 Kafka 消费者最后提交的偏移量或当前位置。

comfilled():获取给定分区的最后提交偏移量(无论提交是由这个进程还是另一个进程发生)。

position():获取将要获取的下一条记录的偏移量(如果存在具有该偏移量的记录)。


0
投票

虽然使用

ConsumerInterceptor
效果很好,但我还建议更通用的方法,以防不可能使用之前答案中的拦截器(例如,在集成或端到端测试中)。

这个想法是轮询消费者组的当前偏移量:

def assertConsumedFromKafka(record: RecordMetadata): Assertion = {
  record.hasOffset shouldBe true

  val topicPartition = new TopicPartition(record.topic, record.partition)
  val groupSpec      = new ListConsumerGroupOffsetsSpec().topicPartitions(Collections.singletonList(topicPartition))
  val groupId = "YOUR-CONSUMER-GROUP-ID-HERE"
  val offsets = kafkaAdmin
    .listConsumerGroupOffsets(Collections.singletonMap(groupId, groupSpec))
    .partitionsToOffsetAndMetadata(groupId)
    .get()
  val offset = offsets.get(topicPartition)

  assert(offset != null, s"No offset for $topicPartition, offsets: ${offsets.asScala}")

  // using +1 here to make sure that offset was committed (moved to the next message)
  assert(offset.offset() >= (record.offset() + 1), "Offset wasn't committed yet")
}

请注意最后一个断言中的

(record.offset() + 1)
,使用 +1 进行比较至关重要,以确保当前消息已处理并且其偏移量已提交(移至下一条消息)。

© www.soinside.com 2019 - 2024. All rights reserved.