我在我的DAG中得到了这个任务:
@task
def get_result_from_records_api(api, tries: int, result_from_salons_api: list):
salon_ids_list = result_from_salons_api[1]
# result is 4 pd.DataFrames
result_from_records_api = get_data_or_raise_error_with_retry(api.get_records_staff_sales_of_services_goods, tries, salon_ids_list=salon_ids_list)
# make lists for dfs
records_lst = []
staff_from_records_lst = []
services_sales_lst = []
good_sales_lst = []
# put dfs in lists
records_lst.append(result_from_records_api[0])
staff_from_records_lst.append(result_from_records_api[1])
services_sales_lst.append(result_from_records_api[2])
good_sales_lst.append(result_from_records_api[3])
# make dict with lists
result_dict = {
'records': records_lst
, 'staff_from_records': staff_from_records_lst
, 'services_sales': services_sales_lst
, 'good_sales': good_sales_lst
}
return result_dict
此任务返回 4 个 pandas.DataFrames,没有错误(返回值是正确的)。
我尝试过将结果放入列表、元组、字典中,但每次都会出现错误:
{xcom.py:664} ERROR - ('cannot mix list and non-list, non-null values', 'Conversion failed for column position with type object'). If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your airflow config or make sure to decorate your object with attr.
如何避免这个错误?
P。 S.我已经在docker-compose.yml中启用了enable_xcom_picking:
AIRFLOW__CORE__ENABLE_XCOM_PICKLING=true
气流2.8.2 Python 3.11
我通过将 pd.DataFrames 转换为 dict 解决了问题:
@task
def get_result_from_records_api(api, tries: int, result_from_salons_api: list):
salon_ids_list = result_from_salons_api[1]
result_from_records_api = get_data_or_raise_error_with_retry(api.get_records_staff_sales_of_services_goods, tries, salon_ids_list=salon_ids_list)
records = result_from_records_api[0].to_dict()
staff_from_records = result_from_records_api[1].to_dict()
services_sales = result_from_records_api[2].to_dict()
good_sales = result_from_records_api[3].to_dict()
result = [records, staff_from_records, services_sales, good_sales]
return result