我的消费者继承自BasicKafkaConsumerV2。在部署期间,当 Pod 旋转时,我丢失了一些消息,这些消息从手动 commit() 后打印的偏移量中可见。 如果未提交,Kafka 不应该丢失消息。这里可能有什么问题。
运行命令:
- name: order-consumer
image: KUSTOMIZE_PRIMARY
imagePullPolicy: Always
command:
[
# Invoking wait for pgbouncer script
"/wait-for.sh",
"localhost:6432",
"-s",
"-t",
"30",
"--",
# Starting main process
"ddtrace-run",
"python",
"manage.py",
"run_order-consumer",
]
消费者:
class BasicKafkaConsumerV2:
group_id = None # str
consumer_name = None # str
newrelic_application = None
topic_handlers = {} # dict
DB_EXCEPTION_RETRY_TIMEOUT = 5 # seconds
DLQ_TOPIC = None
def __init__(self, latest_offset=False):
"""Inits the Consumer and subscribes to the topics"""
self.consumer = KafkaConsumer(
bootstrap_servers=["broker1", "broker2"],
group_id=self.group_id,
enable_auto_commit=False,
auto_offset_reset="latest",
)
self.topics_list = list(self.topic_handlers.keys())
self.consumer.subscribe(self.topics_list)
self.newrelic_application = newrelic.agent.application()
logger.info(
f"{[self.consumer_name]} subscribed to {self.topics_list} with auto_offset_reset {self.auto_offset_reset}"
)
def message_handler_wrapped(
self,
topic: str,
kafka_msg_value: bytes,
headers: dict,
consumed_message=None,
):
"""Processes the message
Also handles any DB exceptions by retrying the event after a period
"""
with tracer.trace(
settings.DD_KAFKA_RESOURCE_NAME,
service=settings.DD_SERVICE,
resource=self.group_id,
span_type="consumer",
) as span:
try:
json_data = json.loads(kafka_msg_value)
dict_headers = convert_tuple_to_dict(headers)
span.set_tag("topic", topic)
span.set_tag("event", self.get_event_name(json_data))
self.message_handler(topic, json_data, dict_headers)
except (InterfaceError, OperationalError) as e:
"""Sleep for sometime to allow the DB to heal
This will essentially infinitely loop (further processing of events is blocked)
"""
logger.info(f"[{self.consumer_name}] DB Exception: {e}")
span.set_tag("type", "retry")
span.set_exc_info(type(e), e, e.__traceback__)
time.sleep(self.DB_EXCEPTION_RETRY_TIMEOUT)
self.message_handler_wrapped(
topic, kafka_msg_value, headers, consumed_message
)
except Exception as e:
logger.exception(f"[{self.consumer_name}] Exception: {e}")
span.set_tag("type", "error")
span.set_exc_info(type(e), e, e.__traceback__)
sentry_sdk.capture_exception(e)
def message_handler(self, topic: str, data: dict, headers: dict):
"""Handles the message"""
event = self.get_event_name(data)
topic_handler = self.topic_handlers.get(topic)
topic_handler.handle_message(event, data, headers)
def start_consumer(self):
"""Starts consuming messages on the topic"""
logger.info(f"Consumer [{self.consumer_name}] is starting consuming")
for msg in self.consumer:
with LogGuidSetter() as _:
self.message_handler_wrapped(
msg.topic, msg.value, msg.headers, msg
)
self.consumer.commit()
logger.info(
f"[{self.consumer_name}] Consumed message from partition: {msg.partition} offset: {msg.offset} with key: {msg.key}"
)
def get_event_name(self, data):
return data.get("event") or data.get("event_name")
class TopicEventHandler:
topic = None
event_handler_mapping = {} # event name and their fn handlers
def handle_message(self, event, data, headers):
"""Handles the message"""
event_handler = getattr(
self, self.event_handler_mapping.get(event, ""), None
)
if event_handler is None:
logger.info(f"Topic <{self.topic}> unhandled event : {event}")
return
event_handler(data, headers)
检查 grafana 中的节流问题 kafka 无法处理 >1MB 的消息,因此请验证消息大小是否小于该值。 如果消息> 1MB,则使用更好的压缩技术并按顺序分割消息并将消息作为块发布 在消费者中,我们需要分离块以检索原始消息。