我有包含 2 个任务的 Airflow DAG。 第一个任务是 list_gcs_files ,其定义如下:
list_gcs_files = PythonOperator(
task_id='list_gcs_files',
python_callable=get_file_list,
op_kwargs={'execution_date': '{{ execution_date }}'},
provide_context=True,
)
Python函数,list_gcs_files正在使用,看起来像这样:
def get_file_list(execution_date, **kwargs):
bucket_name = 'some-dummy-bucket'
execution_date = pendulum.parse(execution_date).date()
prefix = f"customers/{execution_date.strftime('%Y-%m')}-{execution_date.day}/"
hook = GCSHook(google_cloud_storage_conn_id='google_cloud_default')
blobs = hook.list(bucket_name=bucket_name, prefix=prefix)
files = [str(blob) for blob in blobs]
return files
第二个任务是 GCSToBigQueryOperator ,它应该将此数据移动到青铜层。
load_to_bronze = GCSToBigQueryOperator(
task_id='load_to_bronze',
bucket='some-dummy-bucket',
source_objects="{{ task_instance.xcom_pull(task_ids='list_gcs_files') }}",
destination_project_dataset_table='bronze.customers',
schema_fields=[
# SOME SCHEMA IS DEFINED HERE
],
source_format='CSV',
skip_leading_rows=1,
write_disposition='WRITE_TRUNCATE',
)
在 Airflow 上的 xcom 中第一个任务(成功完成)我看到返回值,它看起来像这样:
['客户/2022-08-2/2022-08-1_part2__customers.csv','客户/2022-08-2/2022-08-2_part1__customers.csv']
第二个任务失败,但出现以下异常:
源 URI 不得包含“,”字符:gs://some-dummy-bucket/['customers/2022-08-2/2022-08-1_part2__customers.csv', 'customers/2022-08-2/ 2022-08-2_part1__customers.csv']
我尝试过的:
我更改了返回xcom数据的类型。我返回由“,”连接的这些文件名的字符串,然后在第二个任务中我将该字符串拆分回列表,因为我认为这是数据类型问题。没有帮助。
我尝试过的另一件事是硬编码的source_objects,它很有帮助。 当我手动将 xcom 的列表提供给此参数时,它正在工作
source_objects=["客户/2022-08-2/2022-08-1_part2__customers.csv", “客户/2022-08-2/2022-08-2_part1__customers.csv”]
我不明白为什么当列表取自 xcom 时它不起作用。
问题在于
xcom_pull
返回列表的字符串表示形式,而不是列表本身。您需要将其解析回 source_objects
参数中的列表。您可以使用 Python 函数来处理此转换。
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
from airflow import DAG
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
import json
def get_file_list(execution_date, **kwargs):
bucket_name = 'some-dummy-bucket'
execution_date = pendulum.parse(execution_date).date()
prefix = f"customers/{execution_date.strftime('%Y-%m')}-{execution_date.day}/"
hook = GCSHook(google_cloud_storage_conn_id='google_cloud_default')
blobs = hook.list(bucket_name=bucket_name, prefix=prefix)
files = [str(blob) for blob in blobs]
return files
def parse_xcom_list(**kwargs):
ti = kwargs['ti']
files = ti.xcom_pull(task_ids='list_gcs_files')
return json.loads(files)
with DAG(dag_id='example_dag', start_date=days_ago(1), schedule_interval='@daily') as dag:
list_gcs_files = PythonOperator(
task_id='list_gcs_files',
python_callable=get_file_list,
op_kwargs={'execution_date': '{{ execution_date }}'},
provide_context=True,
)
parse_files = PythonOperator(
task_id='parse_files',
python_callable=parse_xcom_list,
provide_context=True,
)
load_to_bronze = GCSToBigQueryOperator(
task_id='load_to_bronze',
bucket='some-dummy-bucket',
source_objects="{{ task_instance.xcom_pull(task_ids='parse_files') }}",
destination_project_dataset_table='bronze.customers',
schema_fields=[
# SOME SCHEMA IS DEFINED HERE
],
source_format='CSV',
skip_leading_rows=1,
write_disposition='WRITE_TRUNCATE',
)
list_gcs_files >> parse_files >> load_to_bronze