我有一个 python 服务,旨在运行长任务作业(有些可能需要 5 分钟)。
我运行长任务,最后我发送确认。
我发现工作时间越长,我就越有可能得到错误:
connection_lost: "StreamLostError: (\"Stream connection lost: RxEndOfFile(-1, 'End of input stream (EOF)')\",)"
这是我用来启动 RabbitMQ 连接的服务: bin/RabbitMqService.py
import pika
import json
from src.bin.LogService import LogService
logger = LogService.init()
from src.config.RabbitMqConfig import RabbitMqConfig
class RabbitMqService:
def __init__(self) -> None:
if not RabbitMqConfig.endpoint.strip():
logger.warn('[HL] [AMQP] No endpoint configured')
return None
self.input_connection = pika.BlockingConnection(pika.URLParameters(RabbitMqConfig.endpoint))
self.channel = self.input_connection.channel()
self.channel.basic_qos(prefetch_count=1) # handling one message at a time
def listen_input(self, queue, callback_function):
self.channel.basic_consume(queue=queue, on_message_callback=callback_function)
def start_consuming(self):
self.channel.start_consuming()
def acknowledge(channel, delivery_tag):
channel.basic_ack(delivery_tag=delivery_tag)
logger.debug('[HL] [AMQP] Message acknowledged with delivery_tag ' + str(delivery_tag))
def send_output(channel, output_exchange, message_body):
logger.debug('[HL] [AMQP] Publishing message to output queue')
channel.basic_publish(exchange=output_exchange, routing_key='', body=json.dumps(message_body))
logger.info('[HL] [AMQP] Message published top queue %s', output_exchange)
这是主要文件(为帖子做了简化):
import argparse
import json
import sys
from src.bin.LogService import LogService
logger = LogService.init()
from datetime import datetime as dt
from src.bin.MinioService import MinioService
from src.bin.RabbitMqService import RabbitMqService
from src.config.MinioConfig import MinioConfig
from src.config.RabbitMqConfig import RabbitMqConfig
from src.lib.SmoothMeasures import run_smoothing
from src.lib.CountCollects import run_collects
from src.config.SentryConfig import SentryConfig
from src.bin.SentryService import SentryService
minio_service = None
def smoothing_message_handler(channel, method_frame, properties, body):
if not RabbitMqConfig.smoothing_output_exchange.strip():
logger.warn('[HL] [AMQP] No output configured for smoothing')
return None
message_handler('smoothing_measures', channel, RabbitMqConfig.smoothing_output_exchange, method_frame, properties, body)
def collect_message_handler(channel, method_frame, properties, body):
if not RabbitMqConfig.collect_output_exchange.strip():
logger.warn('[HL] [AMQP] No output configured for collects')
return None
message_handler('count_collects', channel, RabbitMqConfig.collect_output_exchange, method_frame, properties, body)
def get_blob_info(json_body):
[...]
return {'path': path, 'bucket': bucket}
def message_handler(type, channel, output_exchange, method_frame, properties, body):
try:
blob_info = get_blob_info(json.loads(body))
except:
RabbitMqService.acknowledge(channel, method_frame.delivery_tag) # avoid testing same message again
raise ValueError("Received body from rabbitMq is not a JSON / have not the correct properties")
result = process_input_message(type, blob_info)
RabbitMqService.send_output(channel, output_exchange, result)
RabbitMqService.acknowledge(channel, method_frame.delivery_tag)
def process_input_message(type: str, blob_info: dict):
logger.debug('[HL] Handle message, bucketName: %s, blobPath: %s', blob_info['path'], blob_info['bucket'])
path = blob_info['path']
bucket = blob_info['bucket']
if path.count('input') != 1:
raise Exception("Missing 'input' keyword in blob_path: %s", path)
input_blob_content = minio_service.get_blob(bucket, path)
# Here is the long process
res = run_kpi(type, input_blob_content, path)
return {'bucket': bucket, 'path': res['output_blob_path']}
def run_kpi(type: str, input_blob_content: dict, path: str):
[...]
if __name__ == "__main__":
SentryService.init(config=SentryConfig)
logger.info("[HL] Parsing arguments ...")
parser = argparse.ArgumentParser(description='Description du script')
parser.add_argument('--smoothing', dest='smoothing_input_listener', action='store_true', help='Activer le listener pour le queue de smoothing')
parser.add_argument('--collect', dest='collect_input_listener', action='store_true', help='Activer le listener pour le queue de collect')
args = parser.parse_args()
logger.info("[HL] Executing main with arguments %s ...", args)
logger.info("[HL] Creating RabbitMq client and Minio Client ...")
rabbit_smoothing_service = RabbitMqService()
minio_service = MinioService(config=MinioConfig)
try:
if args.smoothing_input_listener:
rabbit_smoothing_service.listen_input(queue=RabbitMqConfig.smoothing_input_queue, callback_function=smoothing_message_handler)
logger.info("[HL] Smoothing Client is listening the queue : %s", RabbitMqConfig.smoothing_input_queue)
if args.collect_input_listener:
rabbit_smoothing_service.listen_input(queue=RabbitMqConfig.collect_input_queue, callback_function=collect_message_handler)
logger.info("[HL] Collect Client is listening the queue : %s", RabbitMqConfig.collect_input_queue)
rabbit_smoothing_service.start_consuming()
except KeyboardInterrupt:
sys.exit(0)
我做错了什么/理解错了吗?