我对Spark非常陌生,我必须执行字符串操作操作并在spark数据帧中创建新列。我创建了用于字符串操作的UDF函数,由于性能,我想在没有UDF的情况下执行此操作。以下是我的代码和输出。请帮助我以更好的方式创建它吗?
object Demo2 extends Context {
import org.apache.spark.sql.functions.udf
def main(args: Array[String]): Unit = {
import sparkSession.sqlContext.implicits._
val data = Seq(
("bankInfo.SBI.C_1.Kothrud.Pune.displayInfo"),
("bankInfo.ICICI.C_2.TilakRoad.Pune.displayInfo"),
("bankInfo.Axis.C_3.Santacruz.Mumbai.displayInfo"),
("bankInfo.HDFC.C_4.Deccan.Pune.displayInfo")
)
val df = data.toDF("Key")
println("Input Dataframe")
df.show(false)
//get local_address
val get_local_address = udf((key: String) => {
val first_index = key.indexOf(".")
val tmp_key = key.substring(first_index + 1)
val last_index = tmp_key.lastIndexOf(".")
val local_address = tmp_key.substring(0, last_index)
local_address
})
//get address
val get_address = udf((key: String) => {
val first_index = key.indexOf(".")
val tmp_key = key.substring(first_index + 1)
val last_index1 = tmp_key.lastIndexOf(".")
val tmp_key1 = tmp_key.substring(0, last_index1)
val last_index2 = tmp_key1.lastIndexOf(".");
val first_index1 = tmp_key1.lastIndexOf(".", last_index2 - 1);
val address = tmp_key1.substring(0, first_index1) + tmp_key1.substring(last_index2)
address
})
val df2 = df
.withColumn("Local Address", get_local_address(df("Key")))
.withColumn("Address", get_address(df("Key")))
println("Output Dataframe")
df2.show(false)
}
}
Input Dataframe
+----------------------------------------------+
|Key |
+----------------------------------------------+
|bankInfo.SBI.C_1.Kothrud.Pune.displayInfo |
|bankInfo.ICICI.C_2.TilakRoad.Pune.displayInfo |
|bankInfo.Axis.C_3.Santacruz.Mumbai.displayInfo|
|bankInfo.HDFC.C_4.Deccan.Pune.displayInfo |
+----------------------------------------------+
Output Dataframe
+----------------------------------------------+-------------------------+---------------+
|Key |Local Address |Address |
+----------------------------------------------+-------------------------+---------------+
|bankInfo.SBI.C_1.Kothrud.Pune.displayInfo |SBI.C_1.Kothrud.Pune |SBI.C_1.Pune |
|bankInfo.ICICI.C_2.TilakRoad.Pune.displayInfo |ICICI.C_2.TilakRoad.Pune |ICICI.C_2.Pune |
|bankInfo.Axis.C_3.Santacruz.Mumbai.displayInfo|Axis.C_3.Santacruz.Mumbai|Axis.C_3.Mumbai|
|bankInfo.HDFC.C_4.Deccan.Pune.displayInfo |HDFC.C_4.Deccan.Pune |HDFC.C_4.Pune |
+----------------------------------------------+-------------------------+---------------+
检查下面的代码。
scala> df.show(false)
+----------------------------------------------+
|Key |
+----------------------------------------------+
|bankInfo.SBI.C_1.Kothrud.Pune.displayInfo |
|bankInfo.ICICI.C_2.TilakRoad.Pune.displayInfo |
|bankInfo.Axis.C_3.Santacruz.Mumbai.displayInfo|
|bankInfo.HDFC.C_4.Deccan.Pune.displayInfo |
+----------------------------------------------+
scala> val maxLength = df.select(split($"key","\\.").as("keys")).withColumn("length",size($"keys")).select(max($"length").as("length")).map(_.getAs[Int](0)).collect.head
maxLength: Int = 6
scala> val address_except = Seq(0,3,maxLength-1)
address_except: Seq[Int] = List(0, 3, 5)
scala> val local_address_except = Seq(0,maxLength-1)
local_address_except: Seq[Int] = List(0, 5)
scala> def parse(column: Column,indexes:Seq[Int]) = (0 to maxLength).filter(i => !indexes.contains(i)).map(i => column(i)).reduce(concat_ws(".",_,_))
parse: (column: org.apache.spark.sql.Column, indexes: Seq[Int])org.apache.spark.sql.Column
scala> df.select(split($"key","\\.").as("keys")).withColumn("local_address",parse($"keys",local_address_except)).withColumn("address",parse($"keys",address_except)).show(false)
+-----------------------------------------------------+-------------------------+---------------+
|keys |local_address |address |
+-----------------------------------------------------+-------------------------+---------------+
|[bankInfo, SBI, C_1, Kothrud, Pune, displayInfo] |SBI.C_1.Kothrud.Pune |SBI.C_1.Pune |
|[bankInfo, ICICI, C_2, TilakRoad, Pune, displayInfo] |ICICI.C_2.TilakRoad.Pune |ICICI.C_2.Pune |
|[bankInfo, Axis, C_3, Santacruz, Mumbai, displayInfo]|Axis.C_3.Santacruz.Mumbai|Axis.C_3.Mumbai|
|[bankInfo, HDFC, C_4, Deccan, Pune, displayInfo] |HDFC.C_4.Deccan.Pune |HDFC.C_4.Pune |
+-----------------------------------------------------+-------------------------+---------------+