我想使用新的“作业即任务”功能(如在这个SO答案中提到的),但我在将值传递到该作业时遇到问题。
场景
entity_ids
。如果手动启动,此工作流程自行运行完全正常。对于这个场景,我想添加一些逻辑来决定将执行多少工作流程:
如果 Task_A 在其表中找到特定信息,它应该启动 Task_B 中的工作流程,并根据该信息为其提供几个参数(在本示例中:
entity_ids
的列表)。如果未找到该信息,工作流程应正常结束并等待下一个间隔。
我的问题: 如何将(多个)值传递到 Task_B 中引用的作业中?
我曾尝试在 Task_A 中使用
dbutils.jobs.taskValues.set("entity_id", "[1, 2]")
进行设置,并在 Task_B 中工作流程的第一个笔记本中使用 dbutils.jobs.taskValues.get("Task_A", "entity_ids", debugValue="[]" )
进行读取,但这会在嵌套作业中引发错误:
Task key does not exist in run: Task_A.
我的猜测是,Task_B 中的嵌套工作流不知道父工作流,并且可能在不同的上下文中运行,因此找不到
taskKey == "Task_A"
。
为了验证我的假设,我尝试设置一个(仅测试)笔记本,仅使用
entity_ids
函数读取 get()
。
在这两种情况下,它始终是完全相同的笔记本。
您无法在运行作业类型的后续任务中获取在一项任务中设置的任务值。要获取这些值,您需要使用动态值将它们作为参数传递,并使用笔记本中的以下代码访问它们,该代码位于另一个工作流程作业中。
dbutils.widgets.get("entity_id")
单击浏览动态值了解更多信息。
{{tasks.[task_name].values.[value_name]}}
是获取特定任务和给定键的任务值的动态表达式。
就你而言,是
{{tasks.Task_A.values.entity_id}}
输出:
这里,B_tasks是另一项工作。
我尝试了您的方法,并且可以确认任务变量在“运行作业”任务中使用时的行为如您所说。
请检查以下替代方案是否适合您?
在您的
Task_B
(输入“运行作业”)中,创建一个名为 entity_id
的小部件
在实际工作流程中,在
Task_A
之后,添加一个if then else
块并检查任务值。请参阅文档了解更多信息。
真相流后面可以是
Run Job
任务,您可以将任务值 {{tasks.Task_A.values.entity_ids}}
作为参数传递给小部件。我可以确认这有效。
替代建议:
但是考虑到你的参数数量,看看你是否可以设置一个标志作为任务值,将entity_ids写入DBFS或云中的文件,并在任务值标志上使用条件,然后读取Task_B中的文件 - 也有可能是任务值/小部件可以接受的字符数的限制。