如何将key,value作为spark sql中map的单独列

问题描述 投票:-3回答:1

我有桌子和地图。我想从该地图中制作2个单独的列 - 1.键列2.值列。 input.show();

+---------------------+--------------------+--------------+----------------+---------------+--------------+-----------------------+
|addedSkuWithTimestamp|     fbaSKUAdditions|fbaSKURemovals|      merchantId|mfnSKUAdditions|mfnSKURemovals|removedSkuWithTimestamp|
+---------------------+--------------------+--------------+----------------+---------------+--------------+-----------------------+
| [Test1 -> 1234567...|[CLAM_SUNGLASS_CA...|            []|   A4QA5OYD4Y45F|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|   A4QA5OYD4Y45F|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|   A4QA5OYD4Y45F|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|   A4QA5OYD4Y45F|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|   A4QA5OYD4Y45F|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|   A4QA5OYD4Y45F|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|   A4QA5OYD4Y45F|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|ANOTHER_MERCHANT|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|ANOTHER_MERCHANT|             []|          null|                   null|
+---------------------+--------------------+--------------+----------------+---------------+--------------+-----------------------+

但是我希望输出为

test1  123456789 

Test2  123456780 

如何从地图中获取2个不同的列(键列和值列)?

Dataset<Row> removed_skus = input
                    .withColumn("sku", functions.explode(input.col("removedSkuWithTimestamp")))
                    .withColumn("skuType", functions.lit("MFN"))
                    .select(input.col("merchantId").alias("merchant_id"), new Column("sku").,
                            new Column("skuType"))
                    .distinct()
                    .groupBy("merchant_id")
                    .agg(functions.collect_list("sku").alias("removedSkus"));
sql scala apache-spark apache-spark-sql
1个回答
0
投票

首先让我们创建一些数据:

val df = Seq(
    (Map("sku1"->"timestamp1"), "AFN"),
    (Map("sku2"->"timestamp2"), "AFN"),
    (null, "AFN") 
).toDF("addedSkuWithTimestamp", "skuType")
.show(false)

+---------------------+-------+
|addedSkuWithTimestamp|skuType|
+---------------------+-------+
| [sku1 -> timestamp1]|    AFN|
| [sku2 -> timestamp2]|    AFN|
|                 null|    AFN|
+---------------------+-------+

这将具有以下架构:

scala> df.printSchema()

root
 |-- addedSkuWithTimestamp: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- skuType: string (nullable = true)

下一个代码将使用mapToTupleUDF udf函数从addsSkuWithTimestamp列中提取列sku_key和sku_value。 :

val mapToTupleUDF = udf((sku: Map[String, String]) => if(sku != null) sku.toSeq(0) else null)

df.withColumn("addedSkuWithTimestamp", mapToTupleUDF($"addedSkuWithTimestamp"))
  .withColumn("Sku", when($"addedSkuWithTimestamp".isNotNull, $"addedSkuWithTimestamp._1"))
  .withColumn("Timestamp", when($"addedSkuWithTimestamp".isNotNull, $"addedSkuWithTimestamp._2"))
  .show(false)

+---------------------+-------+----+----------+
|addedSkuWithTimestamp|skuType|Sku |Timestamp |
+---------------------+-------+----+----------+
|[sku1, timestamp1]   |AFN    |sku1|timestamp1|
|[sku2, timestamp2]   |AFN    |sku2|timestamp2|
|null                 |AFN    |null|null      |
+---------------------+-------+----+----------+

请注意,只有当addedSkuWithTimestamp._1不为null时,我们才能访问addedSkuWithTimestamp

© www.soinside.com 2019 - 2024. All rights reserved.