为什么在 GCP 的气流中使用 PythonVirtualenvOperator 时出现“_pickle.PicklingError: Can't pickle”错误?

问题描述 投票:0回答:0

我正在尝试使用气流中的 PythonVirtualenvOperator 为要运行的特定任务创建虚拟环境。 我只是先复制气流文档中给出的示例,

我的Main py和Dag py是两个不同的文件,

Main.py 文件如下所示 :

from airflow.decorators import task

@task.virtualenv(
    task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=True
)

def Main_func():
    from time import sleep
    from colorama import Back, Fore, Style

    print(Fore.RED + "some red text")
    print(Back.GREEN + "and with a green background")
    print(Style.DIM + "and in dim text")
    print(Style.RESET_ALL)
    for _ in range(4):
        print(Style.DIM + "Please wait...", flush=True)
        sleep(1)
    print("Finished")

task_dag.py 如下所示 :

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from Main import Main_func

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'max_active_runs': 1,
}

Main_Mod_dag = DAG(
    'Main_Mod_Run',
    catchup=False,
    default_args=default_args,
    description='Main Module Run',
    schedule_interval='48 11 * * 3',
    start_date=datetime(2022, 12, 11),
    tags=['Main_Mod'],
)

Main_Mod_Func = PythonOperator(task_id='Main_Mod', python_callable=Main_func, dag=Main_Mod_dag)

Main_Mod_Func

预期结果: 成功调度 -> 运行 -> 日志中的所有打印语句

**错误:**

[2023-05-10, 10:47:13 UTC] {python.py:177} INFO - Done. Returned value was: {{ task_instance.xcom_pull(task_ids='virtualenv_python', dag_id='adhoc_airflow', key='return_value') }} [2023-05-10, 10:47:13 UTC] {taskinstance.py:1853} ERROR - Task failed with exception Traceback (most recent call last): File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 72, in wrapper return func(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 2381, in xcom_push XCom.set( File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 72, in wrapper return func(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/xcom.py", line 206, in set value = cls.serialize_value( File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/xcom.py", line 595, in serialize_value return pickle.dumps(value) _pickle.PicklingError: Can't pickle <function Main_func at 0x7fe5c18dba60>: it's not the same object as Main.Main_func

python google-cloud-platform airflow virtualenv google-cloud-composer
© www.soinside.com 2019 - 2024. All rights reserved.