根据列表行键创建Spark DataFrame

问题描述 投票:0回答:1

我有一个形式为Array[Row]的HBase行键列表,并希望使用这些RowKey从HBase提取的行中创建一个Spark DataFrame

正在考虑类似的内容:

def getDataFrameFromList(spark: SparkSession, rList : Array[Row]): DataFrame = {

  val conf = HBaseConfiguration.create()
  val mlRows : List[RDD[String]] = new ArrayList[RDD[String]]

  conf.set("hbase.zookeeper.quorum", "dev.server")
  conf.set("hbase.zookeeper.property.clientPort", "2181")
  conf.set("zookeeper.znode.parent","/hbase-unsecure")
  conf.set(TableInputFormat.INPUT_TABLE, "hbase_tbl1")

  rList.foreach( r => {
    var rStr = r.toString()
    conf.set(TableInputFormat.SCAN_ROW_START, rStr)
    conf.set(TableInputFormat.SCAN_ROW_STOP, rStr + "_")
    // read one row
    val recsRdd = readHBaseRdd(spark, conf)
    mlRows.append(recsRdd)
  })

  // This works, but it is only one row
  //val resourcesDf = spark.read.json(recsRdd) 

  var resourcesDf = <Code here to convert List[RDD[String]] to DataFrame>
  //resourcesDf
  spark.emptyDataFrame
}

我可以在for循环中执行recsRdd.collect()并将其转换为字符串,然后将该json附加到ArrayList[String,但不确定其效率如何,在这样的for循环中调用collect()

[readHBaseRdd正在使用newAPIHadoopRDD从HBase获取数据

def readHBaseRdd(spark: SparkSession, conf: Configuration) = {
    val hBaseRDD = spark.sparkContext.newAPIHadoopRDD(conf, classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result])

    hBaseRDD.map {
      case (_: ImmutableBytesWritable, value: Result) =>
          Bytes.toString(value.getValue(Bytes.toBytes("cf"),
                                            Bytes.toBytes("jsonCol")))
        }
    }
  }
scala apache-spark apache-spark-sql hbase rdd
1个回答
0
投票

使用spark.union([mainRdd, recsRdd])代替列表或RDD(mlRows)

而且为什么只从HBase读取一行?尝试具有最大的间隔。

始终避免调用collect(),仅在调试/测试时使用。

© www.soinside.com 2019 - 2024. All rights reserved.