如何向 DataFrame 添加新的 Struct 列

问题描述 投票:0回答:3

我目前正在尝试从 MongoDB 中提取数据库,并使用 Spark 通过

geo_points
摄取到 ElasticSearch 中。

Mongo 数据库有纬度和经度值,但 ElasticSearch 要求将它们转换为

geo_point
类型。

Spark 有没有办法将

lat
lon
列复制到
array
struct
的新列?

如有任何帮助,我们将不胜感激!

scala elasticsearch apache-spark etl apache-spark-sql
3个回答
63
投票

我假设你从某种类似这样的平面模式开始:

root
 |-- lat: double (nullable = false)
 |-- long: double (nullable = false)
 |-- key: string (nullable = false)

首先让我们创建示例数据:

import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types._

val rdd = sc.parallelize(
    Row(52.23, 21.01, "Warsaw") :: Row(42.30, 9.15, "Corte") :: Nil)

val schema = StructType(
    StructField("lat", DoubleType, false) ::
    StructField("long", DoubleType, false) ::
    StructField("key", StringType, false) ::Nil)

val df = sqlContext.createDataFrame(rdd, schema)

一个简单的方法是使用 udf 和 case 类:

case class Location(lat: Double, long: Double)
val makeLocation = udf((lat: Double, long: Double) => Location(lat, long))

val dfRes = df.
   withColumn("location", makeLocation(col("lat"), col("long"))).
   drop("lat").
   drop("long")

dfRes.printSchema

我们得到

root
 |-- key: string (nullable = false)
 |-- location: struct (nullable = true)
 |    |-- lat: double (nullable = false)
 |    |-- long: double (nullable = false)

一个困难的方法是转换数据并随后应用模式:

val rddRes = df.
    map{case Row(lat, long, key) => Row(key, Row(lat, long))}

val schemaRes = StructType(
    StructField("key", StringType, false) ::
    StructField("location", StructType(
        StructField("lat", DoubleType, false) ::
        StructField("long", DoubleType, false) :: Nil
    ), true) :: Nil 
)

sqlContext.createDataFrame(rddRes, schemaRes).show

我们得到了预期的输出

+------+-------------+
|   key|     location|
+------+-------------+
|Warsaw|[52.23,21.01]|
| Corte|  [42.3,9.15]|
+------+-------------+

从头开始创建嵌套模式可能很乏味,所以如果可以的话我会推荐第一种方法。如果您需要更复杂的结构,可以轻松扩展它:

case class Pin(location: Location)
val makePin = udf((lat: Double, long: Double) => Pin(Location(lat, long))

df.
    withColumn("pin", makePin(col("lat"), col("long"))).
    drop("lat").
    drop("long").
    printSchema

我们得到了预期的输出:

root
 |-- key: string (nullable = false)
 |-- pin: struct (nullable = true)
 |    |-- location: struct (nullable = true)
 |    |    |-- lat: double (nullable = false)
 |    |    |-- long: double (nullable = false)

不幸的是,您无法控制

nullable
字段,因此如果它对您的项目很重要,则必须指定架构。

终于可以使用1.4中引入的

struct
功能了:

import org.apache.spark.sql.functions.struct

df.select($"key", struct($"lat", $"long").alias("location"))

7
投票

试试这个:

import org.apache.spark.sql.functions._

df.registerTempTable("dt")

dfres = sql("select struct(lat,lon) as colName from dt")

0
投票

对于 PySpark 用户,以下是官方文档中的示例:

df = spark.createDataFrame([("Alice", 2), ("Bob", 5)], ("name", "age"))
df.select(struct('age', 'name').alias("struct")).collect()

df.select(struct([df.age, df.name]).alias("struct")).collect()

© www.soinside.com 2019 - 2024. All rights reserved.