如何将 celery 任务 stdout \ stderr 保存到文本文件?

问题描述 投票:0回答:2

我将 celery 任务与外部模块一起使用,通过 print() 将信息发送到标准输出。如何将 celery 任务的所有打印保存到预定义文件中?理想情况下,如果每个任务都有自己单独的文件。

main.py
上运行 redis 的示例
localhost:6379

import time

from celery import Celery

def long_run_func():
    print('>>> Start running long_run_func()')
    time.sleep(5)
    print('>>> End running long_run_func()')

celery = Celery('celery_task', broker='redis://localhost:6379')

@celery.task(name="long_run_celery_task")
def long_run_celery_task():
    long_run_func()

long_run_celery_task.delay()

现在运行 Celery 工人:

celery -A main:celery worker --loglevel=INFO

如果你要运行

main.py
,你可以在 long_run_func() 打印的 celery 行中看到:

[2024-01-11 17:30:52,746: WARNING/ForkPoolWorker-7] >>> Start running long_run_func()
[2024-01-11 17:30:57,751: WARNING/ForkPoolWorker-7] >>> End running long_run_func()

是否可以设置

@celery.task
将所有这些日志转储到某个字符串变量或文件?我的意思是,如果我有几个这样的任务并且它们同时运行,那么就能够根据任务分离这些输出,而不是将所有内容混合到一个日志中。

python celery
2个回答
0
投票
  1. 可以设置 @celery.task 将所有这些日志转储到某个字符串 var 或文件吗?

  • 您可以使用
    -f --logfile
    命令选项转储所有日志 到你想要的文件。
  • 示例:
command:
celery -A main:celery worker --loglevel=INFO -f test.log

test.log:
[2024-01-13 09:56:04,119: INFO/MainProcess] Task long_run_celery_task[bae57432-18de-4f00-8227-cdf34856cd15] received
[2024-01-13 09:56:04,121: WARNING/ForkPoolWorker-7] >>> Start running long_run_func()
[2024-01-13 09:56:09,123: WARNING/ForkPoolWorker-7] >>> End running long_run_func()
[2024-01-13 09:56:09,127: INFO/ForkPoolWorker-7] Task long_run_celery_task[bae57432-18de-4f00-8227-cdf34856cd15] succeeded in 5.006108791014412s: None

  1. 如果我有几个这样的任务并且它们同时运行,那么能够根据任务分离这些输出,而不是将所有内容混合到一个日志中。

简单但肮脏的方法

  • 如果你只有一些任务要运行,只需编写一些Python文件,然后在不同的命令中运行它们。
  • 示例:
celery -A main1:celery worker --loglevel=INFO -f test1.log
celery -A main2:celery worker --loglevel=INFO -f test2.log
celery -A main3:celery worker --loglevel=INFO -f test3.log

更好的方法

  • 按名称设置
    logger
    并设置
    handler
  • 示例:
import time
import logging
import os
from celery import Celery

def long_run_func():
    l = custom_logger("long")
    l.info('>>> Start running long_run_func()')
    time.sleep(5)
    l.info('>>> End running long_run_func()')

def short_run_func():
    l = custom_logger("short")
    l.info('>>> Start running short_run_func()')
    time.sleep(3)
    l.info('>>> End running short_run_func()')

celery = Celery(__name__,broker='redis://localhost:6379')

@celery.task(name="long_run_celery_task")
def long_run_celery_task():
    long_run_func()

@celery.task(name="short_run_celery_task")
def short_run_celery_task():
    short_run_func()

def custom_logger(name):
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)
    handler = logging.FileHandler(os.path.join(name + '.log'), 'w')
    logger.addHandler(handler)
    return logger

long_run_celery_task.delay()
short_run_celery_task.delay()
  • 结果:
long.log
>>> Start running long_run_func()
>>> End running long_run_func()
short.log
>>> Start running short_run_func()
>>> End running short_run_func()
  • 如果你想使用与
    print
    相同的格式,你可以使用:
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') #custom formatter
handler.setFormatter(formatter)

0
投票

您所要做的就是编写一个记录器类并将标准输出和错误文件描述符重定向到该类。我正在使用 pandas lib,所以我推荐它。 代码如下->

import sys
import pandas as pd

class Logger:
    def __init__(self, pipe, suffix: str, interval: str):
        self.terminal = pipe
        self.file_suffix = suffix
        self.interval = interval
        self.log_time = pd.Timestamp.now().floor(self.interval)
        self.log_file = open(self.log_time.__str__() + self.file_suffix, "a")

    def write(self, message):
        self.terminal.write(message)
        self.terminal.flush()
        if pd.Timestamp.now().floor(self.interval) > self.log_time:
            self.log_time = pd.Timestamp.now().floor(self.interval)
            self.log_file = open(self.log_time.__str__() + self.file_suffix, "a")
            self.log_file.write(message)
            self.log_file.flush()
        else:
            self.log_file.write(message)
            self.log_file.flush()

    def flush(self):
        pass


sys.stdout = Logger(sys.stdout, "_system_stdout.log", "15min")
sys.stderr = Logger(sys.stderr, "_system_stderr.log", "15min")

print("test")
© www.soinside.com 2019 - 2024. All rights reserved.