我正在oracle数据库中执行更新事务,更新完成后,应在相应主题中发布一条汇合的kafka消息。 现在我想使用消息 KEY 或 VALUE 找到已发布的消息。我如何搜索消息并打印它。请有人帮助我举一些例子
我已经尝试过以下方法,但没有任何效果:
def filter_messages(self, topic, key_filter=None,value_filter = None, timeout=10.0):
try:
# Get the list of partitions for the topic
partitions = self.consumer.list_topics(topic).topics[topic].partitions
# Create TopicPartition objects for all partitions
topic_partitions = [TopicPartition(topic, p) for p in partitions]
# Assign the consumer to all partitions
self.consumer.assign(topic_partitions)
while True:
# Poll for a new message
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue # No new message within the poll interval
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print("Reached end of partition.")
break # Stop if reached the end of the partition
else:
print(f"Error while consuming message: {msg.error()}")
continue
# Decode the message key and value if present
key = msg.key().decode('utf-8') if msg.key() else None
value = msg.value().decode('utf-8') if msg.value() else None
# Check if both key and value match the target criteria
if key == key_filter and value == value_filter:
print(f"Found matching message:\nKey: {key}\nValue: {value}")
return # Stop after printing the first match
except KeyboardInterrupt:
pass
finally:
self.consumer.close()
使用消息 KEY 或 VALUE
您需要使用
or
,而不是 and
否则,您的代码是正确的,您需要调试两个过滤器参数