使用spark数据帧/数据集/ RDD使用内部联接进行更新

问题描述 投票:0回答:1

我正在将ms sql server查询逻辑转换为spark。要转换的查询如下:

Update enc set PrUid=m.OriginalPrUid
FROM CachePatDemo enc 
inner join #MergePreMap m on enc.PrUid=m.NewPrUid
WHERE StatusId is null

我正在使用数据框进行转换,我在两个数据帧中有两个表,我作为内部联接加入。我需要找到一种方法来获取表1的所有列和更新的列(这两列都是通用的)。

我试过用这个:

val result = CachePatDemo.as("df123").
  join(MergePreMap.as("df321"), CachePatDemo("prUid") === MergePreMap("prUid"),"inner").where("StatusId is null")
  select($"df123.pId", 
         $"df321.provFname".as("firstName"), 
         $"df123.lastName", 
         $"df123.prUid")

它似乎没有解决我的问题。有人可以帮忙吗?

sql sql-server apache-spark apache-spark-sql
1个回答
0
投票

在Spark 2.1这有效

case class TestModel(x1: Int, x2: String, x3: Int)

object JoinDataFrames extends App {
  import org.apache.spark.sql.{DataFrame, SparkSession}
  val spark = SparkSession.builder.appName("GroupOperations").master("local[2]").enableHiveSupport.getOrCreate

  import spark.implicits._
  import org.apache.spark.sql.functions._

  val list1 = (3 to 10).toList.map(i => new TestModel(i, "This is df1 " + i, i * 3))
  val list2 = (0 to 5).toList.map(i => new TestModel(i, "This is df2 " + i, i * 13))
  val df1: DataFrame = spark.sqlContext.createDataFrame[TestModel](list1)
  val df2: DataFrame = spark.sqlContext.createDataFrame[TestModel](list2)
  val res = df1.join(df2, Seq("x1"), "inner")
  println("from DF1")
  res.select(df1("x2")).show()
  println("from DF2")
  res.select(df2("x2")).show()
}
© www.soinside.com 2019 - 2024. All rights reserved.