Kafka 消费者在部署期间丢失消息

问题描述 投票:0回答:1

我的消费者继承自BasicKafkaConsumerV2。在部署期间,当 Pod 旋转时,我丢失了一些消息,这些消息从手动 commit() 后打印的偏移量中可见。 如果未提交,Kafka 不应该丢失消息。这里可能有什么问题。

运行命令:

      - name: order-consumer
      image: KUSTOMIZE_PRIMARY
      imagePullPolicy: Always
      command:
        [
        # Invoking wait for pgbouncer script
          "/wait-for.sh",
          "localhost:6432",
          "-s",
          "-t",
          "30",
          "--",
        # Starting main process
          "ddtrace-run",
          "python",
          "manage.py",
          "run_order-consumer",
        ]

消费者:

class BasicKafkaConsumerV2:
    group_id = None  # str
    consumer_name = None  # str
    newrelic_application = None

    topic_handlers = {}  # dict

    DB_EXCEPTION_RETRY_TIMEOUT = 5  # seconds
    DLQ_TOPIC = None

    def __init__(self, latest_offset=False):
        """Inits the Consumer and subscribes to the topics"""
        self.consumer = KafkaConsumer(
            bootstrap_servers=["broker1", "broker2"],
            group_id=self.group_id,
            enable_auto_commit=False,
            auto_offset_reset="latest",
        )
        self.topics_list = list(self.topic_handlers.keys())
        self.consumer.subscribe(self.topics_list)
        self.newrelic_application = newrelic.agent.application()
        logger.info(
            f"{[self.consumer_name]} subscribed to {self.topics_list} with auto_offset_reset {self.auto_offset_reset}"
        )

    def message_handler_wrapped(
        self,
        topic: str,
        kafka_msg_value: bytes,
        headers: dict,
        consumed_message=None,
    ):
        """Processes the message
        Also handles any DB exceptions by retrying the event after a period
        """
        with tracer.trace(
            settings.DD_KAFKA_RESOURCE_NAME,
            service=settings.DD_SERVICE,
            resource=self.group_id,
            span_type="consumer",
        ) as span:
            try:
                json_data = json.loads(kafka_msg_value)
                dict_headers = convert_tuple_to_dict(headers)

                span.set_tag("topic", topic)
                span.set_tag("event", self.get_event_name(json_data))

                self.message_handler(topic, json_data, dict_headers)

            except (InterfaceError, OperationalError) as e:
                """Sleep for sometime to allow the DB to heal
                This will essentially infinitely loop (further processing of events is blocked)
                """
                logger.info(f"[{self.consumer_name}] DB Exception: {e}")
                span.set_tag("type", "retry")
                span.set_exc_info(type(e), e, e.__traceback__)
                time.sleep(self.DB_EXCEPTION_RETRY_TIMEOUT)
                self.message_handler_wrapped(
                    topic, kafka_msg_value, headers, consumed_message
                )

            except Exception as e:
                logger.exception(f"[{self.consumer_name}] Exception: {e}")
                span.set_tag("type", "error")
                span.set_exc_info(type(e), e, e.__traceback__)
                sentry_sdk.capture_exception(e)

    def message_handler(self, topic: str, data: dict, headers: dict):
        """Handles the message"""

        event = self.get_event_name(data)
        topic_handler = self.topic_handlers.get(topic)
        topic_handler.handle_message(event, data, headers)

    def start_consumer(self):
        """Starts consuming messages on the topic"""

        logger.info(f"Consumer [{self.consumer_name}] is starting consuming")

        for msg in self.consumer:
            with LogGuidSetter() as _:
                self.message_handler_wrapped(
                    msg.topic, msg.value, msg.headers, msg
                )
                self.consumer.commit()
                logger.info(
                    f"[{self.consumer_name}] Consumed message from partition: {msg.partition} offset: {msg.offset} with key: {msg.key}"
                )

    def get_event_name(self, data):
        return data.get("event") or data.get("event_name")


    class TopicEventHandler:
        topic = None
        event_handler_mapping = {}  # event name and their fn handlers
    
        def handle_message(self, event, data, headers):
            """Handles the message"""
    
            event_handler = getattr(
                self, self.event_handler_mapping.get(event, ""), None
            )
            if event_handler is None:
                logger.info(f"Topic <{self.topic}> unhandled event : {event}")
                return
    
            event_handler(data, headers)
python django apache-kafka apache-zookeeper kafka-python
1个回答
0
投票

检查 grafana 中的节流问题 kafka 无法处理 >1MB 的消息,因此请验证消息大小是否小于该值。 如果消息> 1MB,则使用更好的压缩技术并按顺序分割消息并将消息作为块发布 在消费者中,我们需要分离块以检索原始消息。

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.