我使用 Celery 作为带有 FastAPI Web 服务器的后台任务处理器。 FastAPI 的日志记录是通过 Uvicorn 使用
logging.yaml
文件配置的,如下所示:
uvicorn syncapp.main:app --workers 4 --host 0.0.0.0 --port 8084 --log-config logging.yaml
我已经配置了多个记录器,这些记录器可以很好地用于 API 服务器。但是,无论我做什么,我都无法让 Celery 使用
logging.yaml
文件中定义的记录器。我尝试过其他解决方案,但似乎没有一个适合我的情况。应用程序的目录结构如下:
myproj
|
|--- syncapp
| |--- tasks
| | |--- copy_data.py (this is a task)
| |
| |--- main.py (FastAPI application entrypoint)
| |--- worker.py (Celery application entrypoint)
|
|--- logging.yaml
|--- pyproject.toml
这是 Celery 主要应用程序:(
file: syncapp/worker.py
)
from celery import Celery
from syncapp.settings import get_settings
settings = get_settings()
app = Celery(
main=settings.app_name,
broker=settings.celery_broker_url,
backend=settings.celery_result_backend,
include="syncapp.tasks.copy_data",
)
app.conf.update(
broker_connection_retry_on_startup=True,
worker_hijack_root_logger=False,
)
这就是任务访问记录器对象的方式(
file: syncapp/tasks/copy_data.py
)
from celery.utils.log import get_task_logger
from syncapp.commons.enums import WorkerQueue
from syncapp.worker import app
logger = get_task_logger(__name__)
@app.task(queue=WorkerQueue.COPY_DATA.value)
def copy_index_data(index_name: str) -> bool:
"""Copies index data up until the current date."""
logger.info("Copying data from source index %s", index_name)
return True
logging.yaml
文件的内容(一些键被删除,因为它们不相关):
version: 1
objects:
syncapp_queue:
class: queue.Queue
maxsize: 1000
celery_queue:
class: queue.Queue
maxsize: 1000
formatters:
simple:
format: "[%(asctime)s.%(msecs)03d] [pid %(process)d] [%(levelname)s]: %(message)s"
datefmt: "%Y-%m-%d %H:%M:%S"
extended:
format: "[%(asctime)s.%(msecs)03d] [pid %(process)d] [%(levelname)s] - [%(module)s:%(lineno)d]: %(message)s"
datefmt: "%Y-%m-%d %H:%M:%S"
handlers:
console:
class: logging.StreamHandler
formatter: simple
stream: ext://sys.stdout
syncapp_file_handler:
class: logging.FileHandler
filename: logs/syncapp.log
formatter: extended
celery_file_handler:
class: logging.FileHandler
filename: logs/synctask.log
formatter: extended
syncapp_queue_handler:
class: logging_.handlers.QueueListenerHandler
handlers:
- cfg://handlers.console
- cfg://handlers.syncapp_file_handler
queue: cfg://objects.syncapp_queue
celery_queue_handler:
class: logging_.handlers.QueueListenerHandler
handlers:
- cfg://handlers.console
- cfg://handlers.celery_file_handler
queue: cfg://objects.celery_queue
loggers:
syncapp:
level: DEBUG
handlers:
- syncapp_queue_handler
propagate: no
syncapp.tasks:
level: DEBUG
handlers:
- celery_queue_handler
propagate: no
celery:
level: INFO
handlers:
- celery_queue_handler
propagate: no
root:
level: INFO
handlers:
- console
我希望 Celery 后端使用
celery
记录器记录其事件,并将任务记录到 syncapp.tasks
记录器。主应用程序日志应通过 syncapp
记录器(确实如此)。
我已经尝试过设置信号,但没有帮助。例如:
@after_setup_logger.connect
def setup_loggers(*args, **kwargs):
logger = kwargs["logger"]
for handler in logger.handlers:
logger.removeHandler(handler)
with open("logging.yaml", "r") as config_file:
# Load logging configuration from YAML file
YAMLConfig(config_file.read(), silent=True)
logger = logging.getLogger("syncapp.tasks")
logger.info("Logging configuration loaded. Handlers: %s", logger.handlers)
kwargs["logger"] = logger
事实上,它会导致 Celery 控制台日志出现错误
syncdata-1 | Exception in thread Thread-1 (_monitor):
syncdata-1 | Traceback (most recent call last):
syncdata-1 | File "/usr/local/lib/python3.11/threading.py", line 1038, in _bootstrap_inner
syncdata-1 | self.run()
syncdata-1 | File "/usr/local/lib/python3.11/threading.py", line 975, in run
syncdata-1 | self._target(*self._args, **self._kwargs)
syncdata-1 | File "/usr/local/lib/python3.11/logging/handlers.py", line 1584, in _monitor
syncdata-1 | self.handle(record)
syncdata-1 | File "/usr/local/lib/python3.11/logging/handlers.py", line 1563, in handle
syncdata-1 | process = record.levelno >= handler.level
syncdata-1 | ^^^^^^^^^^^^^
syncdata-1 | AttributeError: 'ConvertingDict' object has no attribute 'level'
这对于 Flask 来说并不是很困难。但是,在这里,我找不到一种方法让它将日志输出到指定的文件。
Celery 初始化它自己的日志系统,这就是你发出问题的原因。 您应该在 celery 初始化它自己之前配置日志记录。 所以你可以在你的syncapp/worker.py中做这样的事情
import os
import logging
import logging.config
import yaml
from celery import Celery
from syncapp.settings import get_settings
current_dir= os.path.dirname(os.path.abspath(__file__))
project_root =os.path.dirname(current_dir)
logging_yaml_path = os.path.join(project_root, 'logging.yaml')
with open(logging_yaml_path,'r') as f:
logging_config = yaml.safe_load(f)
logging.config.dictConfig(logging_config)
app = Celery(
...
)
app.conf.update(
...
)
我还注意到您在logging.yaml中使用了logging_。这是一些自定义日志系统吗?如果是这样,你应该在我上面的例子中使用它。