kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group webapi-group
上面的命令给了我:
Error: Consumer group 'web-api' does not exist.
这是正确的,因为 web-api 组不存在。我似乎不记得我创建了哪些消费者群体。我怎么看到它?
我也尝试过:
bin\kafka-consumer-groups.sh -bootstrap-server localhost:9092 -list
但是什么也没发生。
编辑: 开火后:
bin\kafka-consumer-groups.sh -bootstrap-server localhost:9092 --list
终端窗口打开几秒钟,然后自行关闭,没有任何输出。
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --all-groups count_errors --describe
使用此命令列出所有组和主题详细信息。拥有消费者群体后,您应该能够看到类似的反应
Consumer group 'order-consumer-group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
order-consumer-group checkins 0 18 25
7 - - -
上面 @Rohik 的答案对我来说不起作用,因为我的企业有太多 Kafka 组,无法使用单个命令列出所有组而不会超时。我刚刚得到如下堆栈跟踪:
Error: Executing consumer group command failed due to org.apache.kafka.common.errors.TimeoutException: Call(callName=listOffsets(api=METADATA), deadlineMs=1698212646571, tries=1, nextAllowedTryMs=1698212646676) timed out at 1698212646576 after 1 attempt(s)
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=listOffsets(api=METADATA), deadlineMs=1698212646571, tries=1, nextAllowedTryMs=1698212646676) timed out at 1698212646576 after 1 attempt(s)
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.getLogEndOffsets(ConsumerGroupCommand.scala:638)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$collectGroupsOffsets$7(ConsumerGroupCommand.scala:411)
at scala.collection.immutable.List.flatMap(List.scala:293)
at scala.collection.immutable.List.flatMap(List.scala:79)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$collectGroupsOffsets$2(ConsumerGroupCommand.scala:567)
at scala.collection.Iterator$$anon$9.next(Iterator.scala:584)
at scala.collection.mutable.Growable.addAll(Growable.scala:62)
at scala.collection.mutable.Growable.addAll$(Growable.scala:57)
at scala.collection.mutable.HashMap.addAll(HashMap.scala:124)
at scala.collection.mutable.HashMap$.from(HashMap.scala:604)
at scala.collection.mutable.HashMap$.from(HashMap.scala:597)
at scala.collection.MapOps$WithFilter.map(Map.scala:381)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.collectGroupsOffsets(ConsumerGroupCommand.scala:560)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.describeGroups(ConsumerGroupCommand.scala:367)
at kafka.admin.ConsumerGroupCommand$.run(ConsumerGroupCommand.scala:72)
at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:59)
at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=listOffsets(api=METADATA), deadlineMs=1698212646571, tries=1, nextAllowedTryMs=1698212646676) timed out at 1698212646576 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.DisconnectException: Cancelled listOffsets(api=METADATA) request with correlation id 10169 due to node 4 being disconnected
话虽如此,我通过在 Macbook 上的 ZSH 终端上运行的管道命令想出了一个不错的解决方法:
kafka-consumer-groups --bootstrap-server <my-kafka-cluster>:9093 --command-config <my-cfg-file-name> --list \
| xargs -I% kafka-consumer-groups --bootstrap-server <my-kafka-cluster>:9093 --command-config <my-cfg-file-name> --group % --describe 2>/dev/null \
| grep -E <my-topic-name>
基本上,我在第一行列出了所有消费者群体。
在第二行,我通过
xargs
命令描述每个消费者组,因为我无法将消费者组名称作为 shell 输入直接传递到 kafka-consumer-groups
命令中。请注意,我通过 Consumer group '<some-group>' has no active members
附件忽略了不相关的错误输出行,例如 2>/dev/null
。作为输出的一部分,除了第一列中的消费者组名称之外,主题名称还应列在第二列中。
最后,我通过第三行的
grep
过滤掉所有不包含我想要的主题名称的行。
通过使用此输出中的第一列,可以获得所有消费者组的列表。