我正在使用具有架构的数据帧
df.printschema()
root
|-- _id: long (nullable = true)
|-- d: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- k: string (nullable = true)
| | |-- v: string (nullable = true)
|-- c: string (nullable = true)
例如,5行看起来像
df.show(5)
_id|d |c
-----------------------------------------------------------------------
1 |[[k1,v1][k2,v2][k3,v3][k4,v4] |c_1
-----------------------------------------------------------------------
2 |[[k5,v5][k1,v1][k3,v31][k6,v6] |c_2
-----------------------------------------------------------------------
3 |[[k5,v51][k1,v13][k7,v7][k8,v8][[k9,v9][k10,v10][k3,v3][k4,v41] |c_3
-----------------------------------------------------------------------
4 |[[k11,v11][k1,v1][k4,v4][k2,v28][[k9,v92][k10,v101][k3,v32]] |c_1
-----------------------------------------------------------------------
5 |[[k8,v81][k13,v31][k6,v66][k2,v2][[k9,v9][k15,v155][k4,v4]] |c_4
我想计算每个类c_ *和总发生率的每个[k *,v *] l'的出现次数。例如c_1
key_value |c_1_occ|totale_occ
[k1,v1] |[1,4] |[1,2,4]
[k2,v2] |[1] |[1,5]
[k3,v3] |[1] |[1,3]
[k4,v4] |[1,4] |[1,4,5]
[k5,v5] |[] |[2]
[k3,v31] |[] |[2]
[k6,v6] |[] |[2]
[k5,v51] |[] |[3]
[k1,v13] |[] |[3]
[k7,v7] |[] |[3]
[k8,v8] |[] |[3]
[k9,v9] |[] |[3,4]
[k10,k10] |[] |[3]
[k4,v41] |[] |[3]
[k11,v11] |[4] |[4]
[k2,v28] |[4] |[4]
[k9,v92] |[4] |[4]
[k10,v101]|[4] |[4]
[k3,v32] |[4] |[4]
.
.
.
c_1_occ和total_occ表示列表_id在c_1和总类中的[k *,v *](c_1,c_2,c_3,c_4 ......)...提前谢谢
您可以explode
(k,v) - 阵列,并使用groupBy/agg
动态聚合collect_list
而不是id
和不同的c_*
列表,如下例所示:
case class kv(k: String, v: String)
val df = Seq(
(1, Array(kv("k1", "v1"), kv("k2", "v2"), kv("k3", "v3")), "c_1"),
(2, Array(kv("k1", "v1"), kv("k3", "v3"), kv("k5", "v5")), "c_2"),
(3, Array(kv("k2", "v2"), kv("k4", "v4")), "c_1")
).toDF("id", "d", "c")
// +---+---------------------------+---+
// |id |d |c |
// +---+---------------------------+---+
// |1 |[[k1,v1], [k2,v2], [k3,v3]]|c_1|
// |2 |[[k1,v1], [k3,v3], [k5,v5]]|c_2|
// |3 |[[k2,v2], [k4,v4]] |c_1|
// +---+---------------------------+---+
// Assemble a list of distinct c_*
val cList = df.select($"c").map(_.getString(0)).collect.toList.distinct
// cList: List[String] = List(c_1, c_2)
df.withColumn("key_value", explode($"d")).
groupBy($"key_value").agg(
collect_list($"id").as("total_occ"),
cList.map(x => collect_list(when($"c" === x, $"id")).as(s"${x}_occ")): _*
).
orderBy($"key_value").
show
// +---------+---------+-------+-------+
// |key_value|total_occ|c_1_occ|c_2_occ|
// +---------+---------+-------+-------+
// | [k1,v1]| [1, 2]| [1]| [2]|
// | [k2,v2]| [1, 3]| [1, 3]| []|
// | [k3,v3]| [1, 2]| [1]| [2]|
// | [k4,v4]| [3]| [3]| []|
// | [k5,v5]| [2]| []| [2]|
// +---------+---------+-------+-------+