我有一个源数据框,有一些记录。我想对此数据帧的每一行执行一些操作。为此,使用了rdd.map函数。但是,查看使用累加器记录的日志,看起来像某些行多次调用映射函数。根据文档,它应该只调用一次。
我尝试在一个小脚本中复制问题并注意到相同的行为。该脚本如下所示:
import os
import sys
os.environ['SPARK_HOME'] = "/usr/lib/spark/"
sys.path.append("/usr/lib/spark/python/")
from pyspark.sql import *
from pyspark.accumulators import AccumulatorParam
class StringAccumulatorParam(AccumulatorParam):
def zero(self, initialValue=""):
return ""
def addInPlace(self, s1, s2):
return s1.strip() + " " + s2.strip()
def mapped_func(row, logging_acc):
logging_acc += "Started map"
logging_acc += str(row)
return "test"
if __name__ == "__main__":
spark_session = SparkSession.builder.enableHiveSupport().appName("rest-api").getOrCreate()
sc = spark_session.sparkContext
df = spark_session.sql("select col1, col2, col3, col4, col5, col6 from proj1_db.dw_table where col3='P1'")
df.show()
logging_acc = sc.accumulator("", StringAccumulatorParam())
result_rdd = df.rdd.map(lambda row: Row(row, mapped_func(row, logging_acc)))
result_rdd.toDF().show()
print "logs: " + str(logging_acc.value)
以下是相关的输出:
+----+----+----+----+----+----+
|col1|col2|col3|col4|col5|col6|
+----+----+----+----+----+----+
| 1| 1| P1| 2| 10| 20|
| 3| 1| P1| 1| 25| 25|
+----+----+----+----+----+----+
+--------------------+----+
| _1| _2|
+--------------------+----+
|[1, 1, P1, 2, 10,...|test|
|[3, 1, P1, 1, 25,...|test|
+--------------------+----+
logs: Started map Row(col1=1, col2=1, col3=u'P1', col4=2, col5=10, col6=20) Started map Row(col1=1, col2=1, col3=u'P1', col4=2, col5=10, col6=20) Started map Row(col1=3, col2=1, col3=u'P1', col4=1, col5=25, col6=25)
第一个表是源数据帧,第二个表是在map函数调用后创建的结果数据帧。如图所示,该函数被第一行调用两次。任何人都可以帮助我了解正在发生的事情,以及我们如何确保每行只调用一次映射函数。
根据文档,它应该只调用一次。
事实并非如此。任何转换都可以执行任意次数(通常是在出现故障或支持二级逻辑的情况下)和the documentation says explicitly that:
对于仅在操作内执行的累加器更新,Spark保证每个任务对累加器的更新仅应用一次
因此,每个任务可以多次更新内部转换中使用的隐式累加器(如map
)。
在您的情况下,由于在将RDD
转换为DataFrame
时未提供架构,因此会发生多次执行。在这种情况下,Spark将执行另一次数据扫描以从数据推断模式,即
spark.createDataFrame(result_rdd, schema)
然而,这只会解决这个特定的问题,关于转型和累积器行为的一般观点。