我正在尝试使用下面提到的源代码,使用DataSourceV2将DataFrame保存到Phoenix:
我创建了一个数据框,并希望通过以下方式将其保存到phoenix:
import org.apache.spark.SparkContext
import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource
val conf = new SparkConf().setAppName("Spark sql to convert rdd to df")
val sc = new SparkContext(conf)
val sqlContext= new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val MasterDF = MasterRecordSeq.toDF()
MasterDF.write
.format("phoenix")
.mode(SaveMode.Overwrite)
.options(Map("table" -> masterTableName, PhoenixDataSource.ZOOKEEPER_URL -> "phoenix-server:2181"))
.save()
但是import org.apache.phoenix.spark.datasource.v2.PhoenixDataSource
无法识别。它将引发以下错误:
object datasource is not a member of package org.apache.phoenix.spark
我已经搜索了很多互联网,但是找不到错误所在。
以下是我在build.sbt中添加的依赖项:
libraryDependencies += "org.apache.phoenix" % "phoenix-spark" % "5.0.0-HBase-2.0"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.5"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.5"
libraryDependencies += "org.apache.phoenix" % "phoenix-core" % "5.0.0-HBase-2.0"
以下是完整的构建文件:
import NativePackagerHelper._
import java.util.Properties
import com.typesafe.sbt.packager.MappingsHelper._
//import sbtrelease.ReleaseStateTransformations._
name := """gavel"""
//scapegoatVersion in ThisBuild := "1.1.0"
//version := sys.env.get("BUILD_NUMBER").getOrElse("3.0-LOCAL")
version := "3.0"
scalaVersion := "2.11.12"
//crossScalaVersions := Seq("2.11.11", "2.12.3")
//scapegoatVersion in ThisBuild := "1.3.5"
scalaBinaryVersion in ThisBuild := "2.12"
javacOptions ++= Seq("-source", "1.6", "-target", "1.6")
scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature")
scalacOptions in (Compile, doc) ++= Seq("-unchecked", "-deprecation", "-diagrams", "-implicits", "-skip-packages", "samples")
lazy val root = (project in file(".")).enablePlugins(PlayScala,sbtdocker.DockerPlugin,JavaAppPackaging).settings(
watchSources ++= (baseDirectory.value / "public/frontend" ** "*").get
)
mainClass := Some("play.core.server.ProdServerStart")
fullClasspath in assembly += Attributed.blank(PlayKeys.playPackageAssets.value)
mappings in Universal ++= directory(baseDirectory.value / "public")
unmanagedBase := baseDirectory.value / "libs"
routesGenerator := InjectedRoutesGenerator
resolvers += "scalaz-bintray" at "https://dl.bintray.com/scalaz/releases"
libraryDependencies ++= Seq(
"com.typesafe" % "config" % "1.3.1",
"mysql" % "mysql-connector-java" % "5.1.34",
"com.typesafe.play" %% "play-slick" % "3.0.0",
"com.typesafe.play" %% "play-slick-evolutions" % "3.0.0",
"com.typesafe.play" %% "play-json" % "2.6.0",
"org.scalatestplus.play" %% "scalatestplus-play" % "3.0.0" % "test",
specs2 % Test,
// "io.rest-assured" % "rest-assured" % "3.0.0" % "test",
// "io.rest-assured" % "scala-support" % "3.0.0" % "test",
// "com.squareup.okhttp" % "mockwebserver" % "2.5.0" % "test",
"javax.mail" % "mail" % "1.4",
"io.swagger" %% "swagger-play2" % "1.6.1",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.4.0",
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.4.0",
"com.google.code.gson" % "gson" % "1.7.1",
"commons-io" % "commons-io" % "2.4",
"com.typesafe.akka" %% "akka-actor" % "2.4.16",
"com.typesafe.akka" %% "akka-testkit" % "2.4.16" % "test",
"org.typelevel" %% "macro-compat" % "1.1.1",
"org.scala-lang" % "scala-reflect" % scalaVersion.value % "provided",
"org.scalatest" %% "scalatest" % "3.0.0" % "test",
compilerPlugin("org.scalamacros" %% "paradise" % "2.1.0" cross CrossVersion.full),
guice
)
libraryDependencies ++= Seq(
"com.101tec" % "zkclient" % "0.4",
"org.apache.kafka" % "kafka_2.10" % "0.8.1.1"
exclude("javax.jms", "jms")
exclude("com.sun.jdmk", "jmxtools")
exclude("com.sun.jmx", "jmxri")
)
libraryDependencies += ws
libraryDependencies += ehcache
// https://mvnrepository.com/artifact/org.apache.phoenix/phoenix-spark
libraryDependencies += "org.apache.phoenix" % "phoenix-spark" % "5.0.0-HBase-2.0"
libraryDependencies += "com.google.protobuf" % "protobuf-java" % "2.4.0"
libraryDependencies += "org.codehaus.jackson" % "jackson-mapper-asl" % "1.9.13"
libraryDependencies += "com.google.code.gson" % "gson" % "2.3"
libraryDependencies += "org.apache.phoenix" % "phoenix-queryserver-client" % "4.13.1-HBase-1.2"
libraryDependencies += "com.github.takezoe" %% "solr-scala-client" % "0.0.19"
libraryDependencies += "com.squareup.okhttp" % "okhttp" % "2.7.0"
libraryDependencies += "org.threeten" % "threetenbp" % "1.2"
libraryDependencies += "io.gsonfire" % "gson-fire" % "1.0.1"
libraryDependencies += "au.com.bytecode" % "opencsv" % "2.4"
libraryDependencies += "org.simplejavamail" % "simple-java-mail" % "5.0.8"
libraryDependencies += "org.apache.solr" % "solr-solrj" % "6.6.2"
libraryDependencies += "com.jcraft" % "jsch" % "0.1.55"
libraryDependencies += "com.vmware" % "vijava" % "5.1"
libraryDependencies += "com.microsoft.sqlserver" % "mssql-jdbc" % "6.1.0.jre8" % Test
//libraryDependencies += "com.microsoft.sqlserver" % "sqljdbc4" % "4.0"
libraryDependencies += "org.apache.poi" % "poi" % "3.17"
libraryDependencies += "org.apache.poi" % "poi-ooxml" % "3.17"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.5"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.5"
libraryDependencies += "org.apache.phoenix" % "phoenix-core" % "5.0.0-HBase-2.0"
crossSbtVersions := Seq("0.13.17", "1.1.6")
publishTo := {
val isSnapshotValue = isSnapshot.value
val nexus = "https://oss.sonatype.org/"
if(isSnapshotValue) Some("snapshots" at nexus + "content/repositories/snapshots")
else Some("releases" at nexus + "service/local/staging/deploy/maven2")
}
publishMavenStyle := true
publishArtifact in Test := false
parallelExecution in Test := false
dockerfile in docker := {
// The assembly task generates a fat JAR file
val artifact: File = assembly.value
val artifactTargetPath = s"/app/${artifact.name}"
new Dockerfile {
from("java")
from("mysql:5.7")
add(artifact, artifactTargetPath)
entryPoint("java", "-jar", artifactTargetPath)
}
}
val appProperties = settingKey[Properties]("The application properties")
appProperties := {
val prop = new Properties()
IO.load(prop, new File("./conf/database.conf"))
prop
}
javaOptions in Test += "-Dconfig.file=conf/application.test.conf"
resolvers += "Sonatype snapshots" at "http://oss.sonatype.org/content/repositories/snapshots/"
sourceDirectories in (Compile, TwirlKeys.compileTemplates) :=
(unmanagedSourceDirectories in Compile).value
flywayDriver := "com.mysql.jdbc.Driver"
flywayUrl := appProperties.value.getProperty("slick.dbs.default.db.url").replaceAll("\"", "")
flywayUser := appProperties.value.getProperty("slick.dbs.default.db.user")
flywayPassword := appProperties.value.getProperty("slick.dbs.default.db.password").replaceAll("\"", "")
flywayLocations := Seq("filesystem:conf/db/default")
fork in run := true
//coverageEnabled := false
//coverageMinimum := 70
//coverageFailOnMinimum := true
//coverageHighlighting := true
publishArtifact in Test := false
parallelExecution in Test := false
enablePlugins(SbtProguard)
import com.lightbend.sbt.SbtProguard._
javaOptions in (Proguard, proguard) := Seq("-Xmx2G")
proguardOptions in Proguard ++= Seq("-dontnote", "-dontwarn", "-ignorewarnings")
proguardOptions in Proguard += ProguardOptions.keepMain("some.MainClass")
proguardMergeStrategies in Proguard += ProguardMerge.append("*.conf")
proguardMergeStrategies in Proguard ++= Seq(
ProguardMerge.discard("\\.zip$".r),
ProguardMerge.discard("\\.xml$".r),
ProguardMerge.discard("\\.txt$".r),
ProguardMerge.discard("\\.conf$".r),
ProguardMerge.discard("\\.jar$".r)
)
我的菲尼克斯版本是5.0。我的Hbase版本是2.0.2.3.1.0.0-78。我是否缺少任何配置?
我设法通过编译github上可用的phoenix-spark存储库并将jar导入spark目录来解决它。
这是我运行的用于构建jar的命令,希望对您有所帮助。
sudo yum install maven
wget https://github.com/apache/phoenix-connectors/archive/master.zip
unzip master.zip
cd phoenix-connectors/phoenix-spark
mvn clean compile
mvn package
cd target/scala-2.12/
cp phoenix-spark-1.0.0-SNAPSHOT.jar /usr/hdp/current/spark2-client/jars