这是我的数据集架构:
request_type | request_body
1 body A
2 body B
3 ...
4
5
6
..
32 body XXX
我的强力解决方案是运行 5 个 Spark sql 查询,然后合并结果。有没有更好/更聪明的选择来做到这一点?
sampleBy 函数 执行类似的操作,但您只能提供每个
request_type
的预期分数,而不是预期行数。因此,手动实施某种分层抽样。
创建一个数据框,其中每个
request_type
包含 5000 到 10000 行之间的随机数量。
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
val spark = ...
import spark.implicits._
val m = 5000
val random = new scala.util.Random
val data: ListBuffer[Int] = ListBuffer()
for( i <- 1 to 32)
data ++= List.fill((m + random.nextInt(m)).toInt)(i)
val df = data.toDF("request_type")
val counts: Array[Row] = df.groupBy("request_type").count().collect()
val fractions: Map[Int, Double] = counts.map(e=>(e.getInt(0),e.getLong(1)))
.toList
.groupBy(e => if(e._1 <=4) e._1 else 5) //1 to 4 are separate buckets, the rest goes into 5
.mapValues(_.map(_._2))
.map{case (key, value) => (key, 100.0/value.sum)} //take 100 rows from each bucket
fraction
现在包含一个地图,其中包含应为每个存储桶保留的行的分数。
例如:
Map(5 -> 4.696290869471292E-4, 1 -> 0.012753475322025252, 2 -> 0.01278281989006775, 3 -> 0.014253135689851768, 4 -> 0.010255358424776945)
这意味着(在这个随机示例中)我们希望保留 request_type = 1 的所有行的 1.2%、request_type >= 5 的所有行的 0.04% 等等。
如果每个 request_type 的行数已知,我们可以跳过此步骤并直接设置fractions-map。
现在将一列具有 0 到 1 之间的随机值添加到数据帧中,并仅保留该值小于或等于步骤 2 中的分数的行。
val fcol: Column = fractions.foldLeft(lit(fractions(5)))
{case (acc, (list, value)) => when('request_type.equalTo(list), value).otherwise(acc)}
val sampledDf = df.withColumn("frac", fcol)
.withColumn("r", rand())
.filter('r <= 'frac)
sampledDf.groupBy("request_type").count().orderBy("request_type").show()
打印
+------------+-----+
|request_type|count|
+------------+-----+
| 1| 94|
| 2| 109|
| 3| 100|
| 4| 112|
| 5| 2|
| 6| 5|
| 7| 4|
| 8| 2|
| 9| 1|
| 10| 5|
...