使用键或值搜索消息

问题描述 投票:0回答:1

我想使用消息 KEY 或 VALUE 查找已发布的消息。如何搜索并打印消息?

我已经尝试过以下方法,但没有任何效果:

def filter_messages(self, topic, key_filter=None,value_filter = None, timeout=10.0):

    try:
        # Get the list of partitions for the topic
        partitions = self.consumer.list_topics(topic).topics[topic].partitions
        # Create TopicPartition objects for all partitions
        topic_partitions = [TopicPartition(topic, p) for p in partitions]
        # Assign the consumer to all partitions
        self.consumer.assign(topic_partitions)

        while True:
            # Poll for a new message
            msg = self.consumer.poll(timeout=1.0)
            if msg is None:
                continue  # No new message within the poll interval
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    print("Reached end of partition.")
                    break  # Stop if reached the end of the partition
                else:
                    print(f"Error while consuming message: {msg.error()}")
                    continue

            # Decode the message key and value if present
            key = msg.key().decode('utf-8') if msg.key() else None
            value = msg.value().decode('utf-8') if msg.value() else None

            # Check if both key and value match the target criteria
            if key == key_filter and value == value_filter:
                print(f"Found matching message:\nKey: {key}\nValue: {value}")
                return  # Stop after printing the first match

    except KeyboardInterrupt:
        pass
    finally:
        self.consumer.close()
python apache-kafka confluent-kafka-python
1个回答
0
投票

使用消息 KEY VALUE

您需要使用

or
,而不是
and

否则,您的代码是正确的,您需要调试两个过滤器参数


你不需要 python...只需使用 kafka-console-consumer 和 grep

© www.soinside.com 2019 - 2024. All rights reserved.