我正在尝试读取 kafka 流并将其作为表保存到 Hive。
消费者代码是:
import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.streaming.Trigger
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.streaming.Trigger.ProcessingTime
object testH {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("MyApp")
.master("local")
.enableHiveSupport()
.getOrCreate()
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "my-group-id",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("my-topic")
val schema = new StructType()
.add("id", IntegerType)
.add("name", StringType)
.add("age", IntegerType)
// Read from Kafka topic
val stream: DataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "my-topic")
.option("startingOffsets", "earliest")
.load()
.selectExpr("CAST(value AS STRING)")
// Parse JSON data and write to Hive table
val query = stream
.select(from_json(col("value"), schema).as("data"))
.selectExpr("data.id", "data.name", "data.age")
.writeStream.
foreachBatch {(batchDF: DataFrame, batchId: Long) =>
batchDF
.write
.format("hive")
.mode(SaveMode.Append)
.saveAsTable("test");
}.start()
query.awaitTermination()
}
}
我的build.sbt:
ThisBuild / version := "0.1.0-SNAPSHOT"
name := "kafka-spark-hive"
version := "1.0"
scalaVersion := "2.12.15"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-hive" % "3.3.2",
"org.apache.spark" %% "spark-streaming-kafka-0-10" % "3.3.2",
"org.apache.spark" %% "spark-sql-kafka-0-10" % "3.3.2",
"org.apache.spark" %% "spark-sql" % "3.3.2",
"org.apache.spark" %% "spark-streaming" % "3.3.2",
"org.apache.kafka" % "kafka-clients" % "3.4.0",
"com.typesafe" % "config" % "1.4.2",
"org.apache.hive" % "hive-exec" % "3.1.3" ,
"org.apache.hive" % "hive-metastore" % "3.1.3" ,
"org.apache.hive" % "hive-common" % "3.1.3" ,
"org.apache.hadoop" % "hadoop-common" % "3.3.2" ,
"org.apache.hadoop" % "hadoop-hdfs" % "3.3.2",
"org.apache.hadoop" % "hadoop-auth" % "3.3.2"
)
我得到那个错误:java.lang.NoSuchMethodError: org.apache.hadoop.hive.common.FileUtils.mkdir(Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;ZLorg/apache/ hadoop/conf/配置;)Z
我尝试降级和升级一些依赖项,但我不知道是哪个导致了问题。
看看你应该如何调查这个问题
NoSuchMethodError
https://reflectoring.io/nosuchmethod/
https://www.baeldung.com/java-nosuchmethod-error
https://www.javatpoint.com/java-lang-nosuchmethoderror
https://www.geeksforgeeks.org/how-to-solve-java-lang-nosuchmethoderror-in-java/
方法
org.apache.hadoop.hive.common.FileUtils.mkdir(Lorg/apache/hadoop/fs/FileSystem;Lorg/apache/hadoop/fs/Path;ZLorg/apache/hadoop/conf/Configuration;)Z
来自
hive-common
.
这是
sbt dependencyTree
的输出
$ sbt dependencyTree > output.txt
https://github.com/DmytroMitin/SO-Q75880410-kafka-spark-hive-demo
https://raw.githubusercontent.com/DmytroMitin/SO-Q75880410-kafka-spark-hive-demo/main/output.txt (29 MB)
[info] kafka-spark-hive:kafka-spark-hive_2.12:1.0 [S]
[info] +-...
[info] +-org.apache.spark:spark-hive_2.12:3.3.2
[info] | +-...
[info] | |
[info] | +-org.apache.hive:hive-common:2.3.9 (evicted by: 3.1.3)
[info] | +-org.apache.hive:hive-common:3.1.3
在
hive-common
3.1.3中mkdir
的签名是
public static boolean mkdir(org.apache.hadoop.fs.FileSystem fs,
org.apache.hadoop.fs.Path f,
org.apache.hadoop.conf.Configuration conf)
throws IOException
但是在
hive-common
2.3.9 中签名是
public static boolean mkdir(org.apache.hadoop.fs.FileSystem fs,
org.apache.hadoop.fs.Path f,
boolean inheritPerms,
org.apache.hadoop.conf.Configuration conf)
throws IOException
即有一个额外的参数。
所以你好像在
org.apache.hive:hive-common:3.1.3
和org.apache.spark:spark-hive_2.12:3.3.2
之间有冲突。 spark-hive
3.3.2 预计 hive-common
2.3.9,而不是 3.1.3
https://repo1.maven.org/maven2/org/apache/spark/spark-hive_2.12/3.3.2/spark-hive_2.12-3.3.2.pom
mkDir
的签名在hive-common
3.0.0中更改
所以既然你使用的是 Spark 3.3.2,请尝试降级
"org.apache.hive" % "hive-exec" % "3.1.3" ,
"org.apache.hive" % "hive-metastore" % "3.1.3" ,
"org.apache.hive" % "hive-common" % "3.1.3"
到
"org.apache.hive" % "hive-exec" % "2.3.9" ,
"org.apache.hive" % "hive-metastore" % "2.3.9" ,
"org.apache.hive" % "hive-common" % "2.3.9"
并检查是否不会有其他冲突。