以下是使用Spark-Hbase连接器将spark Dataframe写入Hbase的简单代码。我在进行df.write操作时遇到NullPointerException,并停止向Hbase写入DF。然而,我可以使用Spark-Hbase连接器从Hbase读取。这个问题已经在下面的链接中讨论过了,但是建议的解决方案没有帮助。
https:/github.comhortonworks-sparkhcissues278
https:/github.comhortonworks-sparkshcissues46。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog
object SparkHbaseHW {
case class Employee(key: String, fName: String, lName: String,
mName: String, addressLine: String, city: String,
state: String, zipCode: String)
def main(args: Array[String]): Unit = {
def catalog =
s"""{
|"table":{"namespace":"default", "name":"employee"},
|"rowkey":"key",
|"columns":{
|"key":{"cf":"rowkey", "col":"key", "type":"string"},
|"fName":{"cf":"person", "col":"firstName", "type":"string"},
|"lName":{"cf":"person", "col":"lastName", "type":"string"},
|"mName":{"cf":"person", "col":"middleName", "type":"string"},
|"addressLine":{"cf":"address", "col":"addressLine", "type":"string"},
|"city":{"cf":"address", "col":"city", "type":"string"},
|"state":{"cf":"address", "col":"state", "type":"string"},
|"zipCode":{"cf":"address", "col":"zipCode", "type":"string"}
|}
|}""".stripMargin
val data = Seq(Employee("1", "Abby", "Smith", "K", "3456 main", "Orlando", "FL", "45235"),
Employee("2", "Amaya", "Williams", "L", "123 Orange", "Newark", "NJ", "27656"),
Employee("3", "Alchemy", "Davis", "P", "Warners", "Sanjose", "CA", "34789"))
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("SparkHbaseWrite")
.getOrCreate()
import spark.implicits._
val df = spark.sparkContext.parallelize(data).toDF
df.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "4"))
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()
}
}
异常。
Exception in thread "main" java.lang.NullPointerException
at org.apache.hadoop.hbase.security.UserProvider.instantiate(UserProvider.java:124)
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:202)
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:114)
at org.apache.hadoop.hbase.mapreduce.TableOutputFormat.checkOutputSpecs(TableOutputFormat.java:179)
at org.apache.spark.internal.io.HadoopMapReduceWriteConfigUtil.assertConf(SparkHadoopWriter.scala:391)
at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1083)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1081)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1081)
at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.insert(HBaseRelation.scala:230)
at org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:61)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
at com.test.HBaseSparkHW$.main(HbaseSparkHW.scala:55)
at com.test.HBaseSparkHW.main(HbaseSparkHW.scala)
20/05/18 16:59:50 INFO SparkContext: Invoking stop() from shutdown hook```
正如讨论过的 此处 我对SparkSession builder进行了额外的配置更改,异常消失了。但是,我不清楚原因和修复方法。希望有人能解释一下。
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("HbaseSparkWrite")
.config("spark.hadoop.validateOutputSpecs", false)
.getOrCreate()