如何避免Airflow中的错误(“不能混合列表和非列表、非空值”、“类型对象的列位置转换失败”)?

问题描述 投票:0回答:1

我在我的DAG中得到了这个任务:

@task
def get_result_from_records_api(api, tries: int, result_from_salons_api: list):
    salon_ids_list = result_from_salons_api[1]
    # result is 4 pd.DataFrames
    result_from_records_api = get_data_or_raise_error_with_retry(api.get_records_staff_sales_of_services_goods, tries, salon_ids_list=salon_ids_list)
    # make lists for dfs
    records_lst = []
    staff_from_records_lst = []
    services_sales_lst = []
    good_sales_lst = []
    # put dfs in lists
    records_lst.append(result_from_records_api[0])
    staff_from_records_lst.append(result_from_records_api[1])
    services_sales_lst.append(result_from_records_api[2])
    good_sales_lst.append(result_from_records_api[3])
    # make dict with lists
    result_dict = {
        'records': records_lst
        , 'staff_from_records': staff_from_records_lst
        , 'services_sales': services_sales_lst
        , 'good_sales': good_sales_lst
    }
    return result_dict

此任务返回 4 个 pandas.DataFrames,没有错误(返回值是正确的)。

我尝试过将结果放入列表、元组、字典中,但每次都会出现错误:

{xcom.py:664} ERROR - ('cannot mix list and non-list, non-null values', 'Conversion failed for column position with type object'). If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your airflow config or make sure to decorate your object with attr.

如何避免这个错误?

P。 S.我已经在docker-compose.yml中启用了enable_xcom_picking:

AIRFLOW__CORE__ENABLE_XCOM_PICKLING=true

气流2.8.2 Python 3.11

python airflow
1个回答
0
投票

我通过将 pd.DataFrames 转换为 dict 解决了问题:

@task
def get_result_from_records_api(api, tries: int, result_from_salons_api: list):
    salon_ids_list = result_from_salons_api[1]
    result_from_records_api = get_data_or_raise_error_with_retry(api.get_records_staff_sales_of_services_goods, tries, salon_ids_list=salon_ids_list)
    records = result_from_records_api[0].to_dict()
    staff_from_records = result_from_records_api[1].to_dict()
    services_sales = result_from_records_api[2].to_dict()
    good_sales = result_from_records_api[3].to_dict()
    result = [records, staff_from_records, services_sales, good_sales]
    return result
© www.soinside.com 2019 - 2024. All rights reserved.