我想知道如何将 Hive 集成到 Spark Scala 程序中。这是我尝试过的一个例子:
我在 Windows 11 上使用 Docker 使用 Hive 在本地运行 Hadoop 集群。我使用了以下 docker-compose.yml 文件,并用注释突出显示了附加更新(来自 此存储库):
version: "3"
services:
namenode:
image: bde2020/hadoop-namenode:2.0.0-hadoop2.7.4-java8
volumes:
- namenode:/hadoop/dfs/name
environment:
- CLUSTER_NAME=test
env_file:
- ./hadoop-hive.env
ports:
- "50070:50070"
- "8020:8020" # Add this line to expose HDFS port
datanode:
image: bde2020/hadoop-datanode:2.0.0-hadoop2.7.4-java8
volumes:
- datanode:/hadoop/dfs/data
env_file:
- ./hadoop-hive.env
environment:
SERVICE_PRECONDITION: "namenode:50070"
ports:
- "50075:50075"
hive-server:
image: bde2020/hive:2.3.2-postgresql-metastore
env_file:
- ./hadoop-hive.env
environment:
HIVE_CORE_CONF_javax_jdo_option_ConnectionURL: "jdbc:postgresql://hive-metastore/metastore"
SERVICE_PRECONDITION: "hive-metastore:9083"
ports:
- "10000:10000"
hive-metastore:
image: bde2020/hive:2.3.2-postgresql-metastore
env_file:
- ./hadoop-hive.env
command: /opt/hive/bin/hive --service metastore
environment:
SERVICE_PRECONDITION: "namenode:50070 datanode:50075 hive-metastore-postgresql:5432"
ports:
- "9083:9083"
hive-metastore-postgresql:
image: bde2020/hive-metastore-postgresql:2.3.0
presto-coordinator:
image: shawnzhu/prestodb:0.181
ports:
- "8080:8080"
volumes:
namenode:
datanode:
我在 Hive 中创建和查询表所遵循的步骤:
启动 Docker 容器:
docker-compose up -d
检查容器:
docker ps -a
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
7ad6cea6fab8 bde2020/hadoop-datanode:2.0.0-hadoop2.7.4-java8 "/entrypoint.sh /run…" 14 minutes ago Up 14 minutes (healthy) 0.0.0.0:50075->50075/tcp hive-datanode-1
5af113c30d83 shawnzhu/prestodb:0.181 "./bin/launcher run" 14 minutes ago Up 14 minutes 0.0.0.0:8080->8080/tcp hive-presto-coordinator-1
4201d0ddb29b bde2020/hive:2.3.2-postgresql-metastore "entrypoint.sh /opt/…" 14 minutes ago Up 14 minutes 10000/tcp, 0.0.0.0:9083->9083/tcp, 10002/tcp hive-hive-metastore-1
645afa8a33bd bde2020/hive-metastore-postgresql:2.3.0 "/docker-entrypoint.…" 14 minutes ago Up 14 minutes 5432/tcp hive-hive-metastore-postgresql-1
b1b7514736b1 bde2020/hive:2.3.2-postgresql-metastore "entrypoint.sh /bin/…" 14 minutes ago Up 14 minutes 0.0.0.0:10000->10000/tcp, 10002/tcp hive-hive-server-1
6e0956988e0e bde2020/hadoop-namenode:2.0.0-hadoop2.7.4-java8 "/entrypoint.sh /run…" 14 minutes ago Up 14 minutes (healthy) 0.0.0.0:8020->8020/tcp, 0.0.0.0:50070->50070/tcp hive-namenode-1
使用 Beeline 连接到 Hive 服务器:
docker-compose exec hive-server bash -c "/opt/hive/bin/beeline -u jdbc:hive2://localhost:10000"
在 Beeline shell 中,创建一个表并查询它:
0: jdbc:hive2://localhost:10000> CREATE TABLE test_table (id INT, name STRING);
No rows affected (0.492 seconds)
0: jdbc:hive2://localhost:10000> INSERT INTO test_table VALUES (1, 'Alice'), (2, 'Bob');
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
No rows affected (3.968 seconds)
0: jdbc:hive2://localhost:10000> SELECT * FROM test_table;
+----------------+------------------+
| test_table.id | test_table.name |
+----------------+------------------+
| 1 | Alice |
| 2 | Bob |
+----------------+------------------+
2 rows selected (0.176 seconds)
我可以在HDFS中成功找到该表:
root@b1b7514736b1:/opt# hdfs dfs -cat /user/hive/warehouse/test_table/000000_0
1Alice
2Bob
问题:
我无法从 Spark 应用程序查询相同的 Hive 表。这是我的 build.sbt 文件:
name := "SparkHiveExample"
version := "1.0"
scalaVersion := "2.11.12"
// Spark, Hive, and Hadoop dependencies
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.3.0",
"org.apache.spark" %% "spark-sql" % "2.3.0",
"org.apache.spark" %% "spark-hive" % "2.3.0"
)
lazy val root = (project in file("."))
.settings(
name := "Test_hive"
)
这是我的 Scala 代码:
import org.apache.spark.sql.SparkSession
object HiveTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("HiveTest")
.master("local[*]")
.config("spark.hadoop.fs.defaultFS", "hdfs://localhost:8020")
.config("hive.metastore.uris", "thrift://localhost:9083")
.enableHiveSupport()
.getOrCreate()
// Try to read the Hive table
spark.sql("show databases").show()
spark.sql("show tables").show()
spark.sql("SELECT * FROM test_table").show()
spark.stop()
}
}
这是输出:
...
+------------+
|databaseName|
+------------+
| default|
+------------+
...
+--------+----------+-----------+
|database| tableName|isTemporary|
+--------+----------+-----------+
| default|test_table| false|
+--------+----------+-----------+
24/09/07 07:51:54 INFO ContextCleaner: Cleaned accumulator 1
24/09/07 07:51:54 INFO ContextCleaner: Cleaned accumulator 0
24/09/07 07:51:54 INFO CodeGenerator: Code generated in 16.7214 ms
24/09/07 07:51:55 INFO CodeGenerator: Code generated in 36.9971 ms
24/09/07 07:51:55 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 222.0 KB, free 1983.1 MB)
24/09/07 07:51:55 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 20.9 KB, free 1983.1 MB)
24/09/07 07:51:55 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on MY-PC:52273 (size: 20.9 KB, free: 1983.3 MB)
24/09/07 07:51:55 INFO SparkContext: Created broadcast 0 from
24/09/07 07:51:58 INFO FileInputFormat: Total input paths to process : 2
24/09/07 07:51:58 INFO SparkContext: Starting job: show at Main.scala:16
24/09/07 07:51:58 INFO DAGScheduler: Got job 0 (show at Main.scala:16) with 1 output partitions
24/09/07 07:51:58 INFO DAGScheduler: Final stage: ResultStage 0 (show at Main.scala:16)
24/09/07 07:51:58 INFO DAGScheduler: Parents of final stage: List()
24/09/07 07:51:58 INFO DAGScheduler: Missing parents: List()
24/09/07 07:51:58 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[6] at show at Main.scala:16), which has no missing parents
24/09/07 07:51:58 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 11.7 KB, free 1983.1 MB)
24/09/07 07:51:58 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 5.7 KB, free 1983.0 MB)
24/09/07 07:51:58 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on MY-PC:52273 (size: 5.7 KB, free: 1983.3 MB)
24/09/07 07:51:58 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1039
24/09/07 07:51:58 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[6] at show at Main.scala:16) (first 15 tasks are for partitions Vector(0))
24/09/07 07:51:58 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
24/09/07 07:51:58 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, ANY, 7907 bytes)
24/09/07 07:51:58 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
24/09/07 07:51:58 INFO HadoopRDD: Input split: hdfs://namenode:8020/user/hive/warehouse/test_table/000000_0:0+14
24/09/07 07:51:58 INFO CodeGenerator: Code generated in 10.0738 ms
24/09/07 07:52:19 WARN BlockReaderFactory: I/O error constructing remote block reader.
java.net.ConnectException: Connection timed out: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:530)
at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3090)
at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:778)
at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:693)
at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:354)
at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:617)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:841)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:889)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
at org.apache.hadoop.mapred.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:208)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:246)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:277)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:214)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
24/09/07 07:52:19 WARN DFSClient: Failed to connect to /123.45.6.7:50010 for block, add to deadNodes and continue. java.net.ConnectException: Connection timed out: no further information
java.net.ConnectException: Connection timed out: no further information
...
Input split: hdfs://namenode:8020
如果您不在同一个 Docker 网络中(即在容器本身内)运行 Spark,那么该连接将会失败
然后,如果您确实在容器中运行,则需要更改配置以使用其他容器的主机名而不是本地主机。
您使用 Hive 的设置很好。 HDFS 未能为您的查询正确返回内容。
顺便说一句,我建议更新到最新的 Hadoop、Hive 和 Spark 3。您也不需要 HDFS 或 Postgres 来让 Hive+Spark 工作。