有没有办法以编程方式获取气流日志并将其存储在某个变量中? (以字符串的形式) 我想使用该变量中存储的日志来使用气流中错误日志记录级别中存在的所有消息。
最初,我尝试获取日志并使用logging.info()打印它们
我尝试做:
def fetch_logs(context):
ti=context['ti']
logs=ti.log
logging.info(logs)
我期待它为我打印日志,但实际上已经打印了一个记录器对象 -
<Logger airflow.task (INFO) >
我尝试使用 str() 构造函数将记录器对象转换为字符串,但它只是将
我尝试迭代 Logger Object ,但似乎也没有帮助。
有人可以帮我解决这个问题吗? 预先感谢!
您使用的是哪个 Airflow 版本? 您需要什么类型的日志?
您可以使用@KamilCuk Airflow API 已经提到的。
您可以通过 DAGrun 进行交互或使用上下文获取所需的信息:
def check_dump_run(db:str) -> str:
dag_run = DagRun.find(dag_id='dump_db')
dag_run = filter(lambda x: x.start_date.strftime(
'%Y-%m-%d') == date.today().strftime('%Y-%m-%d'), dag_run)
for run in dag_run:
ti_dump = run.get_task_instance(
task_id='dump_db_to_base')
if ti_dump == None:
continue
elif ti_dump.current_state() == 'success':
return 'success'
elif ti_dump.current_state() in ['running','scheduled','up_for_retry']:
return 'running'
return None
或者您可以直接从元数据数据库获取日志,例如airflow.task_instance表。
SELECT state FROM airflow.task_instance
WHERE dag_id = 'test'
AND task_id = 'fooBar'
AND DATE(execution_date) = DATE(NOW())
AND state = 'success'