我将 celery 任务与外部模块一起使用,通过 print() 将信息发送到标准输出。如何将 celery 任务的所有打印保存到预定义文件中?理想情况下,如果每个任务都有自己单独的文件。
在
main.py
上运行 redis 的示例 localhost:6379
:
import time
from celery import Celery
def long_run_func():
print('>>> Start running long_run_func()')
time.sleep(5)
print('>>> End running long_run_func()')
celery = Celery('celery_task', broker='redis://localhost:6379')
@celery.task(name="long_run_celery_task")
def long_run_celery_task():
long_run_func()
long_run_celery_task.delay()
现在运行 Celery 工人:
celery -A main:celery worker --loglevel=INFO
如果你要运行
main.py
,你可以在 long_run_func() 打印的 celery 行中看到:
[2024-01-11 17:30:52,746: WARNING/ForkPoolWorker-7] >>> Start running long_run_func()
[2024-01-11 17:30:57,751: WARNING/ForkPoolWorker-7] >>> End running long_run_func()
是否可以设置
@celery.task
将所有这些日志转储到某个字符串变量或文件?我的意思是,如果我有几个这样的任务并且它们同时运行,那么就能够根据任务分离这些输出,而不是将所有内容混合到一个日志中。
可以设置 @celery.task 将所有这些日志转储到某个字符串 var 或文件吗?
-f --logfile
命令选项转储所有日志
到你想要的文件。command:
celery -A main:celery worker --loglevel=INFO -f test.log
test.log:
[2024-01-13 09:56:04,119: INFO/MainProcess] Task long_run_celery_task[bae57432-18de-4f00-8227-cdf34856cd15] received
[2024-01-13 09:56:04,121: WARNING/ForkPoolWorker-7] >>> Start running long_run_func()
[2024-01-13 09:56:09,123: WARNING/ForkPoolWorker-7] >>> End running long_run_func()
[2024-01-13 09:56:09,127: INFO/ForkPoolWorker-7] Task long_run_celery_task[bae57432-18de-4f00-8227-cdf34856cd15] succeeded in 5.006108791014412s: None
--help
,例如celery --help
如果我有几个这样的任务并且它们同时运行,那么能够根据任务分离这些输出,而不是将所有内容混合到一个日志中。
celery -A main1:celery worker --loglevel=INFO -f test1.log
celery -A main2:celery worker --loglevel=INFO -f test2.log
celery -A main3:celery worker --loglevel=INFO -f test3.log
logger
并设置handler
import time
import logging
import os
from celery import Celery
def long_run_func():
l = custom_logger("long")
l.info('>>> Start running long_run_func()')
time.sleep(5)
l.info('>>> End running long_run_func()')
def short_run_func():
l = custom_logger("short")
l.info('>>> Start running short_run_func()')
time.sleep(3)
l.info('>>> End running short_run_func()')
celery = Celery(__name__,broker='redis://localhost:6379')
@celery.task(name="long_run_celery_task")
def long_run_celery_task():
long_run_func()
@celery.task(name="short_run_celery_task")
def short_run_celery_task():
short_run_func()
def custom_logger(name):
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
handler = logging.FileHandler(os.path.join(name + '.log'), 'w')
logger.addHandler(handler)
return logger
long_run_celery_task.delay()
short_run_celery_task.delay()
long.log
>>> Start running long_run_func()
>>> End running long_run_func()
short.log
>>> Start running short_run_func()
>>> End running short_run_func()
print
相同的格式,你可以使用:formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') #custom formatter
handler.setFormatter(formatter)
您所要做的就是编写一个记录器类并将标准输出和错误文件描述符重定向到该类。我正在使用 pandas lib,所以我推荐它。 代码如下->
import sys
import pandas as pd
class Logger:
def __init__(self, pipe, suffix: str, interval: str):
self.terminal = pipe
self.file_suffix = suffix
self.interval = interval
self.log_time = pd.Timestamp.now().floor(self.interval)
self.log_file = open(self.log_time.__str__() + self.file_suffix, "a")
def write(self, message):
self.terminal.write(message)
self.terminal.flush()
if pd.Timestamp.now().floor(self.interval) > self.log_time:
self.log_time = pd.Timestamp.now().floor(self.interval)
self.log_file = open(self.log_time.__str__() + self.file_suffix, "a")
self.log_file.write(message)
self.log_file.flush()
else:
self.log_file.write(message)
self.log_file.flush()
def flush(self):
pass
sys.stdout = Logger(sys.stdout, "_system_stdout.log", "15min")
sys.stderr = Logger(sys.stderr, "_system_stderr.log", "15min")
print("test")