我正在 azure databricks 中运行以下代码。
使用 erp_bu 创建查找字典,它是具有两列“erp_code”和“bu”的行对象列表
广播查找字典
尝试在 udf 中查找值
如果这不正确,还有其他方法吗? 我收到一条错误消息,指出广播变量未加载。我尝试将广播变量传递给 udf,但它只接受字符串或列。
erp_bu_dic = {}
for row in erp_bu:
if row['erp_code']:
erp_bu_dic[row['erp_code']] = row['bu']
broadcast_erp_bu_dic = sc.broadcast(erp_bu_dic)
j06 = j06.withColumn('business_unit', udf(lambda x : broadcast_erp_bu_dic.value[x])("entity_source_id"))
display(j06)```
原则上你的方法应该有效,答案是肯定的,可以广播字典并将其用作查找。
如果没有更多细节,很难确定您的问题是什么,但我尝试模拟一个尽可能接近您的描述的场景,并且下面的代码运行良好。
from pyspark.sql.functions import udf
from pyspark.sql import Row
#mock the j06 dataframe
j06_data = [("1YZ", "meh"), ("X2Z", "blah"), ("XY3", "ha")]
df_j06 = spark.createDataFrame(j06_data, ["entity_source_id", "other_col"])
#mock the business unit dictionary
bu_rows = [Row("1YZ", "BU One"), Row("X2Z", "BU Two"), Row("XY3", "BU Three")]
erp_bu_dict = {}
for row in bu_rows:
erp_bu_dict[row[0]] = row[1]
#broadcast the dict
broadcast_bu_dict = spark.sparkContext.broadcast(erp_bu_dict)
#define mapping udf
udf_map_bu = udf(lambda x : broadcast_bu_dict.value[x])
#map business unit
df_j06 = df_j06.withColumn('business_unit', udf_map_bu('entity_source_id'))
display(df_j06)
我推测问题出现在您的代码中较早的位置,并且与您的“erp_bu”行列表相关。
我还会考虑将“erp_bu”转换为数据帧并运行连接而不是当前的方法。