获取并存储气流日志

问题描述 投票:0回答:1

有没有办法以编程方式获取气流日志并将其存储在某个变量中? (以字符串的形式) 我想使用该变量中存储的日志来使用气流中错误日志记录级别中存在的所有消息。

最初,我尝试获取日志并使用logging.info()打印它们

我尝试做:

def fetch_logs(context):
    ti=context['ti']
    logs=ti.log
    logging.info(logs)

我期待它为我打印日志,但实际上已经打印了一个记录器对象 -

<Logger airflow.task (INFO) > 

我尝试使用 str() 构造函数将记录器对象转换为字符串,但它只是将 转换为字符串,而不是将日志转换为字符串

我尝试迭代 Logger Object ,但似乎也没有帮助。

有人可以帮我解决这个问题吗? 预先感谢!

python airflow
1个回答
0
投票

您使用的是哪个 Airflow 版本? 您需要什么类型的日志?

您可以使用@KamilCuk Airflow API 已经提到的。

您可以通过 DAGrun 进行交互或使用上下文获取所需的信息:

def check_dump_run(db:str) -> str:
    dag_run = DagRun.find(dag_id='dump_db')
    dag_run = filter(lambda x: x.start_date.strftime(
                '%Y-%m-%d') == date.today().strftime('%Y-%m-%d'), dag_run)
    for run in dag_run:
        ti_dump = run.get_task_instance(
            task_id='dump_db_to_base')
        if ti_dump == None:
            continue
        elif ti_dump.current_state()  == 'success':
            return 'success'
        elif ti_dump.current_state() in ['running','scheduled','up_for_retry']:
            return 'running'
    return None

或者您可以直接从元数据数据库获取日志,例如airflow.task_instance表。

SELECT state FROM airflow.task_instance 
WHERE dag_id = 'test'
AND task_id = 'fooBar' 
AND DATE(execution_date) = DATE(NOW()) 
AND state = 'success'
© www.soinside.com 2019 - 2024. All rights reserved.