def kafkaa(self, auto_offset_reset, timeout=500):
group_name = "group name"
config = {"bootstrap.servers": "server",
"schema.registry.url": "url",
"group.id": group_name,
"enable.auto.commit": False,
"auto.offset.reset": False, // True
"sasl.mechanisms": "sasl",
"security.protocol": "protocol",
"sasl.username": "username",
"sasl.password": "pw"}
consumer = AvroConsumer(config)
data_consumed = []
consumer.subscribe(kafkaTopic)
while True:
if time.time() > time.time() + timeout:
break
else:
message = consumer.poll()
if message is not None:
kafka_ms.append(message)
consumer.commit(asynchronous=False)
consumer.close()
return data_consumed
`
使用 auto.offset.reset =latest 时,如果未使用组 ID,则不会返回任何值,因为流并不总是有消息可供使用。
使用 auto.offset.reset =latest 时,组 id 为现有组 id,这会返回偏移后的所有内容,直到超时,但代理会重新启动
基于时间戳
您需要使用
offset_for_times
函数来查找该时间戳的任何主题的偏移量,然后在从该时间戳开始轮询之前向使用者查找这些分区偏移量