当我尝试使用以下命令过滤广播值时不起作用。请帮助我正确的方法
from pyspark.sql import *
spark=SparkSession.builder.appName("broadcast variable").getOrCreate()
states={"CA":"California" , "NY":"Newyork" , "FL":"Florida"}
broadcaststates = spark.sparkContext.broadcast(states)
print(broadcaststates.value)
data = [("James","Smith","USA","CA"),
("Michael","Rose","USA","NY"),
("Robert","Williams","USA","CA"),
("Maria","Jones","USA","FL")
]
columns=["firstname","lastname","country","statename"]
df=spark.createDataFrame(data=data,schema=columns)
df.printSchema()
df.show(truncate=False)
def state_convert(code):
return broadcaststates.value[code]
result=df.rdd.map(lambda x : (x[0],x[1],x[2],state_convert(x[3]))).toDF(columns)
result.show(truncate=False)
fiterDF=df.where(df['states'].isin(broadcaststates.value)))
使用广播词典基本上有两种方法。通过使用
*
拆箱按键集或使用 list
将其转换为列表:
df.where(df['statename'].isin(*broadcaststates.value.keys())).show()
df.where(df['statename'].isin(list(broadcaststates.value.keys()))).show()
但是这里没有广播。 Spark 只需直接使用
states.keys()
即可完成完全相同的操作。确实,看看解释的结果:
df.where(df['statename'].isin(list(broadcaststates.value.keys()))).explain()
== Physical Plan ==
*(1) Filter statename#3 IN (CA,NY,FL)
+- *(1) Scan ExistingRDD[firstname#0,lastname#1,country#2,statename#3]
密钥集不会广播给执行器,而是嵌入到执行计划中。如果
states
不是太大,那就完全没问题了。然而,如果 states
非常大,则可能会导致 OutOfMemoryError
。在这种情况下,实际广播 states
并在 UDF 中使用它可能会更有效,如下所示:
from pyspark.sql import functions as F
from pyspark.sql.types import BooleanType
isin_states = F.udf(lambda x : x in broadcaststates.value, BooleanType())
# the variable is broadcasted and not embedded within the execution plan
df.where(isin_states(df['statename'])).show()