如何在Spark数据框的列值中进行字符串处理

问题描述 投票:0回答:1

我对Spark非常陌生,我必须执行字符串操作操作并在spark数据帧中创建新列。我创建了用于字符串操作的UDF函数,由于性能,我想在没有UDF的情况下执行此操作。以下是我的代码和输出。请帮助我以更好的方式创建它吗?


object Demo2 extends Context {

  import org.apache.spark.sql.functions.udf

  def main(args: Array[String]): Unit = {
    import sparkSession.sqlContext.implicits._

    val data = Seq(
      ("bankInfo.SBI.C_1.Kothrud.Pune.displayInfo"),
      ("bankInfo.ICICI.C_2.TilakRoad.Pune.displayInfo"),
      ("bankInfo.Axis.C_3.Santacruz.Mumbai.displayInfo"),
      ("bankInfo.HDFC.C_4.Deccan.Pune.displayInfo")
    )
    val df = data.toDF("Key")
    println("Input Dataframe")
    df.show(false)

    //get local_address
    val get_local_address = udf((key: String) => {
      val first_index = key.indexOf(".")
      val tmp_key = key.substring(first_index + 1)
      val last_index = tmp_key.lastIndexOf(".")
      val local_address = tmp_key.substring(0, last_index)
      local_address
    })

    //get address
    val get_address = udf((key: String) => {
      val first_index = key.indexOf(".")
      val tmp_key = key.substring(first_index + 1)
      val last_index1 = tmp_key.lastIndexOf(".")
      val tmp_key1 = tmp_key.substring(0, last_index1)

      val last_index2 = tmp_key1.lastIndexOf(".");
      val first_index1 = tmp_key1.lastIndexOf(".", last_index2 - 1);
      val address = tmp_key1.substring(0, first_index1) + tmp_key1.substring(last_index2)
      address
    })

    val df2 = df
      .withColumn("Local Address", get_local_address(df("Key")))
      .withColumn("Address", get_address(df("Key")))

    println("Output Dataframe")
    df2.show(false)

  }
}

Input Dataframe
+----------------------------------------------+
|Key                                           |
+----------------------------------------------+
|bankInfo.SBI.C_1.Kothrud.Pune.displayInfo     |
|bankInfo.ICICI.C_2.TilakRoad.Pune.displayInfo |
|bankInfo.Axis.C_3.Santacruz.Mumbai.displayInfo|
|bankInfo.HDFC.C_4.Deccan.Pune.displayInfo     |
+----------------------------------------------+

Output Dataframe
+----------------------------------------------+-------------------------+---------------+
|Key                                           |Local Address            |Address        |
+----------------------------------------------+-------------------------+---------------+
|bankInfo.SBI.C_1.Kothrud.Pune.displayInfo     |SBI.C_1.Kothrud.Pune     |SBI.C_1.Pune   |
|bankInfo.ICICI.C_2.TilakRoad.Pune.displayInfo |ICICI.C_2.TilakRoad.Pune |ICICI.C_2.Pune |
|bankInfo.Axis.C_3.Santacruz.Mumbai.displayInfo|Axis.C_3.Santacruz.Mumbai|Axis.C_3.Mumbai|
|bankInfo.HDFC.C_4.Deccan.Pune.displayInfo     |HDFC.C_4.Deccan.Pune     |HDFC.C_4.Pune  |
+----------------------------------------------+-------------------------+---------------+
apache-spark apache-spark-sql bigdata
1个回答
0
投票

检查下面的代码。

scala> df.show(false)
+----------------------------------------------+
|Key                                           |
+----------------------------------------------+
|bankInfo.SBI.C_1.Kothrud.Pune.displayInfo     |
|bankInfo.ICICI.C_2.TilakRoad.Pune.displayInfo |
|bankInfo.Axis.C_3.Santacruz.Mumbai.displayInfo|
|bankInfo.HDFC.C_4.Deccan.Pune.displayInfo     |
+----------------------------------------------+


scala> val maxLength = df.select(split($"key","\\.").as("keys")).withColumn("length",size($"keys")).select(max($"length").as("length")).map(_.getAs[Int](0)).collect.head
maxLength: Int = 6

scala> val address_except = Seq(0,3,maxLength-1)
address_except: Seq[Int] = List(0, 3, 5)

scala> val local_address_except = Seq(0,maxLength-1)
local_address_except: Seq[Int] = List(0, 5)

scala> def parse(column: Column,indexes:Seq[Int]) = (0 to maxLength).filter(i => !indexes.contains(i)).map(i => column(i)).reduce(concat_ws(".",_,_))
parse: (column: org.apache.spark.sql.Column, indexes: Seq[Int])org.apache.spark.sql.Column

scala> df.select(split($"key","\\.").as("keys")).withColumn("local_address",parse($"keys",local_address_except)).withColumn("address",parse($"keys",address_except)).show(false)
+-----------------------------------------------------+-------------------------+---------------+
|keys                                                 |local_address            |address        |
+-----------------------------------------------------+-------------------------+---------------+
|[bankInfo, SBI, C_1, Kothrud, Pune, displayInfo]     |SBI.C_1.Kothrud.Pune     |SBI.C_1.Pune   |
|[bankInfo, ICICI, C_2, TilakRoad, Pune, displayInfo] |ICICI.C_2.TilakRoad.Pune |ICICI.C_2.Pune |
|[bankInfo, Axis, C_3, Santacruz, Mumbai, displayInfo]|Axis.C_3.Santacruz.Mumbai|Axis.C_3.Mumbai|
|[bankInfo, HDFC, C_4, Deccan, Pune, displayInfo]     |HDFC.C_4.Deccan.Pune     |HDFC.C_4.Pune  |
+-----------------------------------------------------+-------------------------+---------------+
© www.soinside.com 2019 - 2024. All rights reserved.