我试图找到每批 Spark Streaming 作业中数据帧的大小。我能够成功地找到批处理作业的大小,但是当涉及到流式传输时,我无法做到这一点。
我一直在 databricks 上开发 Spark 应用程序,并在流作业中尝试了“df.queryExecution.optimizedPlan.stats.sizeInBytes”。 但出现以下异常: 流式源的查询必须使用 writeStream.start();;
执行我尝试将“df.queryExecution.optimizedPlan.stats.sizeInBytes”放入 forEachBatch() 函数中:
data.writeStream.foreachBatch { (df: DataFrame, batchId: Long) =>
df.persist()
println("The size of the read is : " + df.queryExecution.optimizedPlan.stats.sizeInBytes)
}.start.option("checkpointLocation", outpath + "/_checkpoint")
但这将创建一个新的流,由于一些限制,我们需要避免它。
val data = spark.readStream
.format("kafka")
.option(....)
.load()
println("The size of the read is : " + data.queryExecution.optimizedPlan.stats.sizeInBytes)
是否有任何 hack 或任何 api 调用可以在不使用“forEachBatch()”或不创建新流的情况下返回流中数据帧的大小?
您可以尝试关注
进口量少
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd
import org.apache.spark.util.SizeEstimator
计算RDD的大小
def calcRDDSize(rdd: RDD[String]): Long = {
rdd.map(_.getBytes("UTF-8").length.toLong)
.reduce(_+_) //add the sizes together
}
尺寸计算方式为
val rdd1 = df.rdd.map(_.toString())
calcRDDSize(rdd1)
其中 df 是您的数据框。它将估计大小(以字节为单位)。
希望这有帮助:)
无法发表评论,所以在这里询问你是否找到了解决方案?