无法使用 YAML 配置文件与 FastAPI 服务器来配置 Celery 日志记录

问题描述 投票:0回答:1

我使用 Celery 作为带有 FastAPI Web 服务器的后台任务处理器。 FastAPI 的日志记录是通过 Uvicorn 使用

logging.yaml
文件配置的,如下所示:

uvicorn syncapp.main:app --workers 4 --host 0.0.0.0 --port 8084 --log-config logging.yaml

我已经配置了多个记录器,这些记录器可以很好地用于 API 服务器。但是,无论我做什么,我都无法让 Celery 使用

logging.yaml
文件中定义的记录器。我尝试过其他解决方案,但似乎没有一个适合我的情况。应用程序的目录结构如下:

myproj
|
|--- syncapp
|    |--- tasks
|    |    |--- copy_data.py  (this is a task)
|    |
|    |--- main.py  (FastAPI application entrypoint)
|    |--- worker.py  (Celery application entrypoint)
|
|--- logging.yaml
|--- pyproject.toml

这是 Celery 主要应用程序:(

file: syncapp/worker.py
)

from celery import Celery

from syncapp.settings import get_settings

settings = get_settings()

app = Celery(
    main=settings.app_name,
    broker=settings.celery_broker_url,
    backend=settings.celery_result_backend,
    include="syncapp.tasks.copy_data",
)

app.conf.update(
    broker_connection_retry_on_startup=True,
    worker_hijack_root_logger=False,
)

这就是任务访问记录器对象的方式(

file: syncapp/tasks/copy_data.py

from celery.utils.log import get_task_logger

from syncapp.commons.enums import WorkerQueue
from syncapp.worker import app

logger = get_task_logger(__name__)


@app.task(queue=WorkerQueue.COPY_DATA.value)
def copy_index_data(index_name: str) -> bool:
    """Copies index data up until the current date."""
    logger.info("Copying data from source index %s", index_name)
    return True

logging.yaml
文件的内容(一些键被删除,因为它们不相关):

version: 1

objects:
  syncapp_queue:
    class: queue.Queue
    maxsize: 1000
  celery_queue:
    class: queue.Queue
    maxsize: 1000

formatters:
  simple:
    format: "[%(asctime)s.%(msecs)03d] [pid %(process)d] [%(levelname)s]: %(message)s"
    datefmt: "%Y-%m-%d %H:%M:%S"
  extended:
    format: "[%(asctime)s.%(msecs)03d] [pid %(process)d] [%(levelname)s] - [%(module)s:%(lineno)d]: %(message)s"
    datefmt: "%Y-%m-%d %H:%M:%S"

handlers:
  console:
    class: logging.StreamHandler
    formatter: simple
    stream: ext://sys.stdout

  syncapp_file_handler:
    class: logging.FileHandler
    filename: logs/syncapp.log
    formatter: extended

  celery_file_handler:
    class: logging.FileHandler
    filename: logs/synctask.log
    formatter: extended

  syncapp_queue_handler:
    class: logging_.handlers.QueueListenerHandler
    handlers:
      - cfg://handlers.console
      - cfg://handlers.syncapp_file_handler
    queue: cfg://objects.syncapp_queue

  celery_queue_handler:
    class: logging_.handlers.QueueListenerHandler
    handlers:
      - cfg://handlers.console
      - cfg://handlers.celery_file_handler
    queue: cfg://objects.celery_queue

loggers:
  syncapp:
    level: DEBUG
    handlers:
      - syncapp_queue_handler
    propagate: no
  syncapp.tasks:
    level: DEBUG
    handlers:
      - celery_queue_handler
    propagate: no
  celery:
    level: INFO
    handlers:
      - celery_queue_handler
    propagate: no

root:
  level: INFO
  handlers:
    - console

我希望 Celery 后端使用

celery
记录器记录其事件,并将任务记录到
syncapp.tasks
记录器。主应用程序日志应通过
syncapp
记录器(确实如此)。

我已经尝试过设置信号,但没有帮助。例如:

@after_setup_logger.connect
def setup_loggers(*args, **kwargs):
    logger = kwargs["logger"]
    for handler in logger.handlers:
        logger.removeHandler(handler)
    with open("logging.yaml", "r") as config_file:
        # Load logging configuration from YAML file
        YAMLConfig(config_file.read(), silent=True)
    logger = logging.getLogger("syncapp.tasks")
    logger.info("Logging configuration loaded. Handlers: %s", logger.handlers)
    kwargs["logger"] = logger

事实上,它会导致 Celery 控制台日志出现错误

syncdata-1  | Exception in thread Thread-1 (_monitor):
syncdata-1  | Traceback (most recent call last):
syncdata-1  |   File "/usr/local/lib/python3.11/threading.py", line 1038, in _bootstrap_inner
syncdata-1  |     self.run()
syncdata-1  |   File "/usr/local/lib/python3.11/threading.py", line 975, in run
syncdata-1  |     self._target(*self._args, **self._kwargs)
syncdata-1  |   File "/usr/local/lib/python3.11/logging/handlers.py", line 1584, in _monitor
syncdata-1  |     self.handle(record)
syncdata-1  |   File "/usr/local/lib/python3.11/logging/handlers.py", line 1563, in handle
syncdata-1  |     process = record.levelno >= handler.level
syncdata-1  |                                 ^^^^^^^^^^^^^
syncdata-1  | AttributeError: 'ConvertingDict' object has no attribute 'level'

这对于 Flask 来说并不是很困难。但是,在这里,我找不到一种方法让它将日志输出到指定的文件。

python logging celery fastapi
1个回答
0
投票

Celery 初始化它自己的日志系统,这就是你发出问题的原因。 您应该在 celery 初始化它自己之前配置日志记录。 所以你可以在你的syncapp/worker.py中做这样的事情

import os
import logging
import logging.config
import yaml

from celery import Celery
from syncapp.settings import get_settings

current_dir= os.path.dirname(os.path.abspath(__file__))
project_root =os.path.dirname(current_dir)
logging_yaml_path = os.path.join(project_root, 'logging.yaml')

with open(logging_yaml_path,'r') as f:
    logging_config = yaml.safe_load(f)
    logging.config.dictConfig(logging_config)

app = Celery(
    ...
)

app.conf.update(
    ...
)

我还注意到您在logging.yaml中使用了logging_。这是一些自定义日志系统吗?如果是这样,你应该在我上面的例子中使用它。

© www.soinside.com 2019 - 2024. All rights reserved.