我正在尝试从传感器任务中获取 Xcom 值。这是传感器任务的结果:
现在,我想检索字符串 'df-00...' 我可以在 InputFilePattern 参数中使用它,如您在以下任务中所见:
dataflow_gcs_to_bq = DataflowTemplatedJobStartOperator(
task_id='',
template='',
job_name='',
location='',
project_id='',
parameters={
'javascriptTextTransformFunctionName': '',
'JSONPath': '',
'javascriptTextTransformGcsPath': '',
'inputFilePattern': 'HERE',
'outputTable': '',
'bigQueryLoadingTemporaryDirectory': '',
},
)
非常感谢您的帮助。
因为 parameters 是模板化的字段,你可以直接用 Jinja 做。
假设前面的
task_id
是previous_task
那么代码将是:
dataflow_gcs_to_bq = DataflowTemplatedJobStartOperator(
...,
parameters={
"inputFilePattern": "{{ ti.xcom_pull(task_ids='previous_task') }}",
...,
},
)
注意:Jinja 默认呈现为字符串。在您的示例中,我看到该值为列表,因此您可能还需要设置
render_template_as_native_obj
以将值呈现为本机 python 对象,请参阅this answer以获取更多信息。
除了 Elad 之外,我还提出了一个答案,如果您有自定义处理来应用从上一个任务中检索到的值,使用
xcom
。
您可以创建一个自定义运算符来扩展
DataflowTemplatedJobStartOperator
,例如:
from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator
class CustomDataflowTemplatedJobStartOperator(DataflowTemplatedJobStartOperator):
def __init__(
self,
task_id,
template,
job_name,
location,
project_id,
parameters,
) -> None:
super(CustomDataflowTemplatedJobStartOperator, self) \
.__init__(
task_id=task_id,
template=template,
job_name=job_name,
location=location,
project_id=project_id,
parameters=parameters)
def execute(self, context):
task_instance = context['task_instance']
input_file_pattern_xcom = task_instance.xcom_pull(task_ids='previous_task_id')
# Add the input retrieved from xcom and add it to the parameters Dict.
self.parameters['inputFilePattern'] = input_file_pattern_xcom
super(CustomDataflowTemplatedJobStartOperator, self).execute(context)
我们重写
execute
方法并通过当前的 xcom
从上一个任务中检索带有 context
的值。
然后,如果需要,您可以对此值应用转换,并将条目添加到当前运算符的 Dict
parameters
中。
您可以像往常一样实例化您的自定义运算符:
CustomDataflowTemplatedJobStartOperator(
task_id="my_task",
template="my_template",
job_name="my_job_name",
location="my_location",
project_id="project_id",
parameters={
'param1': 'paramValue1'
}
)