如何从之前的任务中获取 Xcom 值

问题描述 投票:0回答:2

我正在尝试从传感器任务中获取 Xcom 值。这是传感器任务的结果:

现在,我想检索字符串 'df-00...' 我可以在 InputFilePattern 参数中使用它,如您在以下任务中所见:

    dataflow_gcs_to_bq = DataflowTemplatedJobStartOperator(
    task_id='',
    template='',
    job_name='',
    location='',
    project_id='',
    parameters={
        'javascriptTextTransformFunctionName': '',
        'JSONPath': '',
        'javascriptTextTransformGcsPath': '',
        'inputFilePattern': 'HERE',
        'outputTable': '',
        'bigQueryLoadingTemporaryDirectory': '',
    },        
)

非常感谢您的帮助。

apache google-cloud-platform airflow google-cloud-composer airflow-xcom
2个回答
1
投票

因为 parameters 是模板化的字段,你可以直接用 Jinja 做。

假设前面的

task_id
previous_task
那么代码将是:

dataflow_gcs_to_bq = DataflowTemplatedJobStartOperator(
    ...,
    parameters={
        "inputFilePattern": "{{ ti.xcom_pull(task_ids='previous_task') }}",
        ...,
    },
)

注意:Jinja 默认呈现为字符串。在您的示例中,我看到该值为列表,因此您可能还需要设置

render_template_as_native_obj
以将值呈现为本机 python 对象,请参阅this answer以获取更多信息。


0
投票

除了 Elad 之外,我还提出了一个答案,如果您有自定义处理来应用从上一个任务中检索到的值,使用

xcom

您可以创建一个自定义运算符来扩展

DataflowTemplatedJobStartOperator
,例如:

from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator


class CustomDataflowTemplatedJobStartOperator(DataflowTemplatedJobStartOperator):

    def __init__(
            self,
            task_id,
            template,
            job_name,
            location,
            project_id,
            parameters,
    ) -> None:
        super(CustomDataflowTemplatedJobStartOperator, self) \
            .__init__(
            task_id=task_id,
            template=template,
            job_name=job_name,
            location=location,
            project_id=project_id,
            parameters=parameters)

    def execute(self, context):
        task_instance = context['task_instance']
        input_file_pattern_xcom = task_instance.xcom_pull(task_ids='previous_task_id')

        # Add the input retrieved from xcom and add it to the parameters Dict.
        self.parameters['inputFilePattern'] = input_file_pattern_xcom

        super(CustomDataflowTemplatedJobStartOperator, self).execute(context)

我们重写

execute
方法并通过当前的
xcom
从上一个任务中检索带有
context
的值。

然后,如果需要,您可以对此值应用转换,并将条目添加到当前运算符的 Dict

parameters
中。

您可以像往常一样实例化您的自定义运算符:

CustomDataflowTemplatedJobStartOperator(
    task_id="my_task",
    template="my_template",
    job_name="my_job_name",
    location="my_location",
    project_id="project_id",
    parameters={    
        'param1': 'paramValue1'
    }
)
© www.soinside.com 2019 - 2024. All rights reserved.