我正在尝试使用气流中的 PythonVirtualenvOperator 为要运行的特定任务创建虚拟环境。 我只是先复制气流文档中给出的示例,
我的Main py和Dag py是两个不同的文件,
Main.py 文件如下所示 :
from airflow.decorators import task
@task.virtualenv(
task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=True
)
def Main_func():
from time import sleep
from colorama import Back, Fore, Style
print(Fore.RED + "some red text")
print(Back.GREEN + "and with a green background")
print(Style.DIM + "and in dim text")
print(Style.RESET_ALL)
for _ in range(4):
print(Style.DIM + "Please wait...", flush=True)
sleep(1)
print("Finished")
task_dag.py 如下所示 :
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from Main import Main_func
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'max_active_runs': 1,
}
Main_Mod_dag = DAG(
'Main_Mod_Run',
catchup=False,
default_args=default_args,
description='Main Module Run',
schedule_interval='48 11 * * 3',
start_date=datetime(2022, 12, 11),
tags=['Main_Mod'],
)
Main_Mod_Func = PythonOperator(task_id='Main_Mod', python_callable=Main_func, dag=Main_Mod_dag)
Main_Mod_Func
预期结果: 成功调度 -> 运行 -> 日志中的所有打印语句
**错误:**
[2023-05-10, 10:47:13 UTC] {python.py:177} INFO - Done. Returned value was: {{ task_instance.xcom_pull(task_ids='virtualenv_python', dag_id='adhoc_airflow', key='return_value') }} [2023-05-10, 10:47:13 UTC] {taskinstance.py:1853} ERROR - Task failed with exception Traceback (most recent call last): File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 72, in wrapper return func(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 2381, in xcom_push XCom.set( File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 72, in wrapper return func(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/xcom.py", line 206, in set value = cls.serialize_value( File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/xcom.py", line 595, in serialize_value return pickle.dumps(value) _pickle.PicklingError: Can't pickle <function Main_func at 0x7fe5c18dba60>: it's not the same object as Main.Main_func