将 xcom 传递给 GCSToBigQueryOperator Airflow 中的 source_objects 参数

问题描述 投票:0回答:1

我有包含 2 个任务的 Airflow DAG。 第一个任务是 list_gcs_files ,其定义如下:

    list_gcs_files = PythonOperator(
        task_id='list_gcs_files',
        python_callable=get_file_list,
        op_kwargs={'execution_date': '{{ execution_date }}'},
        provide_context=True,
    )

Python函数,list_gcs_files正在使用,看起来像这样:

def get_file_list(execution_date, **kwargs):
    bucket_name = 'some-dummy-bucket'
    execution_date = pendulum.parse(execution_date).date()
    prefix = f"customers/{execution_date.strftime('%Y-%m')}-{execution_date.day}/"

    hook = GCSHook(google_cloud_storage_conn_id='google_cloud_default')
    blobs = hook.list(bucket_name=bucket_name, prefix=prefix)

    files = [str(blob) for blob in blobs]

    return files

第二个任务是 GCSToBigQueryOperator ,它应该将此数据移动到青铜层。

    load_to_bronze = GCSToBigQueryOperator(
        task_id='load_to_bronze',
        bucket='some-dummy-bucket',
        source_objects="{{ task_instance.xcom_pull(task_ids='list_gcs_files') }}",
        destination_project_dataset_table='bronze.customers',
        schema_fields=[
              # SOME SCHEMA IS DEFINED HERE
        ],
        source_format='CSV',
        skip_leading_rows=1,
        write_disposition='WRITE_TRUNCATE',
    )

在 Airflow 上的 xcom 中第一个任务(成功完成)我看到返回值,它看起来像这样:

['客户/2022-08-2/2022-08-1_part2__customers.csv','客户/2022-08-2/2022-08-2_part1__customers.csv']

第二个任务失败,但出现以下异常:

源 URI 不得包含“,”字符:gs://some-dummy-bucket/['customers/2022-08-2/2022-08-1_part2__customers.csv', 'customers/2022-08-2/ 2022-08-2_part1__customers.csv']

我尝试过的:

我更改了返回xcom数据的类型。我返回由“,”连接的这些文件名的字符串,然后在第二个任务中我将该字符串拆分回列表,因为我认为这是数据类型问题。没有帮助。

我尝试过的另一件事是硬编码的source_objects,它很有帮助。 当我手动将 xcom 的列表提供给此参数时,它正在工作

source_objects=["客户/2022-08-2/2022-08-1_part2__customers.csv", “客户/2022-08-2/2022-08-2_part1__customers.csv”]

我不明白为什么当列表取自 xcom 时它不起作用。

python google-cloud-platform google-bigquery google-cloud-storage airflow
1个回答
0
投票

问题在于

xcom_pull
返回列表的字符串表示形式,而不是列表本身。您需要将其解析回
source_objects
参数中的列表。您可以使用 Python 函数来处理此转换。

from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
from airflow import DAG
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
import json

def get_file_list(execution_date, **kwargs):
    bucket_name = 'some-dummy-bucket'
    execution_date = pendulum.parse(execution_date).date()
    prefix = f"customers/{execution_date.strftime('%Y-%m')}-{execution_date.day}/"

    hook = GCSHook(google_cloud_storage_conn_id='google_cloud_default')
    blobs = hook.list(bucket_name=bucket_name, prefix=prefix)

    files = [str(blob) for blob in blobs]

    return files


def parse_xcom_list(**kwargs):
    ti = kwargs['ti']
    files = ti.xcom_pull(task_ids='list_gcs_files')
    return json.loads(files)

with DAG(dag_id='example_dag', start_date=days_ago(1), schedule_interval='@daily') as dag:
    list_gcs_files = PythonOperator(
        task_id='list_gcs_files',
        python_callable=get_file_list,
        op_kwargs={'execution_date': '{{ execution_date }}'},
        provide_context=True,
    )

    parse_files = PythonOperator(
        task_id='parse_files',
        python_callable=parse_xcom_list,
        provide_context=True,
    )

    load_to_bronze = GCSToBigQueryOperator(
        task_id='load_to_bronze',
        bucket='some-dummy-bucket',
        source_objects="{{ task_instance.xcom_pull(task_ids='parse_files') }}",
        destination_project_dataset_table='bronze.customers',
        schema_fields=[
              # SOME SCHEMA IS DEFINED HERE
        ],
        source_format='CSV',
        skip_leading_rows=1,
        write_disposition='WRITE_TRUNCATE',
    )

    list_gcs_files >> parse_files >> load_to_bronze
© www.soinside.com 2019 - 2024. All rights reserved.