Google数据流API调用失败-

问题描述 投票:0回答:1

我正在尝试使用数据流运算符从 composer airflow 调用数据流作业,但在调用它时出现以下错误:

googleapiclient.errors.HttpError: <HttpError 400 when requesting https://test returned "Invalid value at 'launch_parameters.parameters' (type.googleapis.com/google.dataflow.v1beta3.LaunchTemplateParameters.ParametersEntry), "{'test1': 'SELECT distinct data\nFROM 
project.dataset.table1
\nWHERE ace_date="2022-05-12"', 'test2': 'SELECT distinct data\nFROM 
project.dataset.table2
\nWHERE ace_date="2022-05-12"', 'priority_data': 'SELECT distinct data\nFROM 
project.dataset.table3
\nWHERE ace_date="2022-05-12"', 'test3': 'SELECT distinct data\nFROM 
project.dataset.table4
\nWHERE ace_date="2022-05-12"', 'test4': 'SELECT distinct data\nFROM 
project.dataset.table5
\nWHERE ace_date="2022-05-12"', 'test5': 'SELECT distinct data\nFROM 
project.dataset.tabl6
 \nWHERE ace_date="2022-05-12"', 'pack_rules': 'SELECT distinct data\nFROM 
project.dataset.table7
\nWHERE ace_date="2022-05-12"', 'test6': 'SELECT distinct row_key_data as data\nFROM 
peoject.dataset.table7
\nWHERE date_of_run="2022-05-16"'}""

下面是从 Airflow 调用它时的相同代码:

def dataflow_trigger(
        task,
):
    """
    Dynamic task for calling dataflow job
    """
    return DataflowTemplatedJobStartOperator(
        task_id=task,
        project_id="{{task_instance.xcom_pull(key='dataflow_settings', task_ids='get_settings')['project']}}",
        job_name="{{task_instance.xcom_pull(key='dataflow_settings', task_ids='get_settings')['job_name']}}",
        template="{{task_instance.xcom_pull(key='dataflow_settings', task_ids='get_settings')['template_path']}}",
        parameters="{{task_instance.xcom_pull(key='parameters', task_ids='get_settings')}}",
        location='europe-west2',
    )
python-3.x airflow google-cloud-dataflow google-cloud-composer
1个回答
0
投票

Airflow xcom pull 只返回字符串 这有助于解决问题,因为 xcom push 将其存储为字符串;在 DAG 构造函数中使用 render_template_as_native_obj=True 解决了这个问题。

© www.soinside.com 2019 - 2024. All rights reserved.