Hive Spark 集成问题

问题描述 投票:0回答:1

我想知道如何将 Hive 集成到 Spark Scala 程序中。这是我尝试过的一个例子:

我在 Windows 11 上使用 Docker 使用 Hive 在本地运行 Hadoop 集群。我使用了以下 docker-compose.yml 文件,并用注释突出显示了附加更新(来自 此存储库):

version: "3"

services:
  namenode:
    image: bde2020/hadoop-namenode:2.0.0-hadoop2.7.4-java8
    volumes:
      - namenode:/hadoop/dfs/name
    environment:
      - CLUSTER_NAME=test
    env_file:
      - ./hadoop-hive.env
    ports:
      - "50070:50070"
      - "8020:8020"  # Add this line to expose HDFS port
  datanode:
    image: bde2020/hadoop-datanode:2.0.0-hadoop2.7.4-java8
    volumes:
      - datanode:/hadoop/dfs/data
    env_file:
      - ./hadoop-hive.env
    environment:
      SERVICE_PRECONDITION: "namenode:50070"
    ports:
      - "50075:50075"
  hive-server:
    image: bde2020/hive:2.3.2-postgresql-metastore
    env_file:
      - ./hadoop-hive.env
    environment:
      HIVE_CORE_CONF_javax_jdo_option_ConnectionURL: "jdbc:postgresql://hive-metastore/metastore"
      SERVICE_PRECONDITION: "hive-metastore:9083"
    ports:
      - "10000:10000"
  hive-metastore:
    image: bde2020/hive:2.3.2-postgresql-metastore
    env_file:
      - ./hadoop-hive.env
    command: /opt/hive/bin/hive --service metastore
    environment:
      SERVICE_PRECONDITION: "namenode:50070 datanode:50075 hive-metastore-postgresql:5432"
    ports:
      - "9083:9083"
  hive-metastore-postgresql:
    image: bde2020/hive-metastore-postgresql:2.3.0
  presto-coordinator:
    image: shawnzhu/prestodb:0.181
    ports:
      - "8080:8080"

volumes:
  namenode:
  datanode:

我在 Hive 中创建和查询表所遵循的步骤:

启动 Docker 容器:

docker-compose up -d

检查容器:

docker ps -a

CONTAINER ID   IMAGE                                             COMMAND                  CREATED          STATUS                    PORTS                                              NAMES
7ad6cea6fab8   bde2020/hadoop-datanode:2.0.0-hadoop2.7.4-java8   "/entrypoint.sh /run…"   14 minutes ago   Up 14 minutes (healthy)   0.0.0.0:50075->50075/tcp                           hive-datanode-1
5af113c30d83   shawnzhu/prestodb:0.181                           "./bin/launcher run"     14 minutes ago   Up 14 minutes             0.0.0.0:8080->8080/tcp                             hive-presto-coordinator-1
4201d0ddb29b   bde2020/hive:2.3.2-postgresql-metastore           "entrypoint.sh /opt/…"   14 minutes ago   Up 14 minutes             10000/tcp, 0.0.0.0:9083->9083/tcp, 10002/tcp       hive-hive-metastore-1
645afa8a33bd   bde2020/hive-metastore-postgresql:2.3.0           "/docker-entrypoint.…"   14 minutes ago   Up 14 minutes             5432/tcp                                           hive-hive-metastore-postgresql-1
b1b7514736b1   bde2020/hive:2.3.2-postgresql-metastore           "entrypoint.sh /bin/…"   14 minutes ago   Up 14 minutes             0.0.0.0:10000->10000/tcp, 10002/tcp                hive-hive-server-1
6e0956988e0e   bde2020/hadoop-namenode:2.0.0-hadoop2.7.4-java8   "/entrypoint.sh /run…"   14 minutes ago   Up 14 minutes (healthy)   0.0.0.0:8020->8020/tcp, 0.0.0.0:50070->50070/tcp   hive-namenode-1

我看到名称节点连接工作正常: namenode

使用 Beeline 连接到 Hive 服务器:

docker-compose exec hive-server bash -c "/opt/hive/bin/beeline -u jdbc:hive2://localhost:10000"

在 Beeline shell 中,创建一个表并查询它:

0: jdbc:hive2://localhost:10000> CREATE TABLE test_table (id INT, name STRING);
No rows affected (0.492 seconds)
0: jdbc:hive2://localhost:10000> INSERT INTO test_table VALUES (1, 'Alice'), (2, 'Bob');
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
No rows affected (3.968 seconds)
0: jdbc:hive2://localhost:10000> SELECT * FROM test_table;
+----------------+------------------+
| test_table.id  | test_table.name  |
+----------------+------------------+
| 1              | Alice            |
| 2              | Bob              |
+----------------+------------------+
2 rows selected (0.176 seconds)

我可以在HDFS中成功找到该表:

root@b1b7514736b1:/opt# hdfs dfs -cat /user/hive/warehouse/test_table/000000_0
1Alice
2Bob

问题:

我无法从 Spark 应用程序查询相同的 Hive 表。这是我的 build.sbt 文件:

name := "SparkHiveExample"

version := "1.0"

scalaVersion := "2.11.12"

// Spark, Hive, and Hadoop dependencies
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.3.0",
  "org.apache.spark" %% "spark-sql" % "2.3.0",
  "org.apache.spark" %% "spark-hive" % "2.3.0"
)


lazy val root = (project in file("."))
  .settings(
    name := "Test_hive"
  )

这是我的 Scala 代码:

import org.apache.spark.sql.SparkSession

object HiveTest {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("HiveTest")
      .master("local[*]")
      .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:8020")
      .config("hive.metastore.uris", "thrift://localhost:9083")
      .enableHiveSupport()
      .getOrCreate()

    // Try to read the Hive table
    spark.sql("show databases").show()
    spark.sql("show tables").show()
    spark.sql("SELECT * FROM test_table").show()

    spark.stop()
  }
}

这是输出:

...
    +------------+
    |databaseName|
    +------------+
    |     default|
    +------------+
 ...   
    +--------+----------+-----------+
    |database| tableName|isTemporary|
    +--------+----------+-----------+
    | default|test_table|      false|
    +--------+----------+-----------+
    24/09/07 07:51:54 INFO ContextCleaner: Cleaned accumulator 1
    24/09/07 07:51:54 INFO ContextCleaner: Cleaned accumulator 0
    24/09/07 07:51:54 INFO CodeGenerator: Code generated in 16.7214 ms
    24/09/07 07:51:55 INFO CodeGenerator: Code generated in 36.9971 ms
    24/09/07 07:51:55 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 222.0 KB, free 1983.1 MB)
    24/09/07 07:51:55 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 20.9 KB, free 1983.1 MB)
    24/09/07 07:51:55 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on MY-PC:52273 (size: 20.9 KB, free: 1983.3 MB)
    24/09/07 07:51:55 INFO SparkContext: Created broadcast 0 from 
    24/09/07 07:51:58 INFO FileInputFormat: Total input paths to process : 2
    24/09/07 07:51:58 INFO SparkContext: Starting job: show at Main.scala:16
    24/09/07 07:51:58 INFO DAGScheduler: Got job 0 (show at Main.scala:16) with 1 output partitions
    24/09/07 07:51:58 INFO DAGScheduler: Final stage: ResultStage 0 (show at Main.scala:16)
    24/09/07 07:51:58 INFO DAGScheduler: Parents of final stage: List()
    24/09/07 07:51:58 INFO DAGScheduler: Missing parents: List()
    24/09/07 07:51:58 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[6] at show at Main.scala:16), which has no missing parents
    24/09/07 07:51:58 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 11.7 KB, free 1983.1 MB)
    24/09/07 07:51:58 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 5.7 KB, free 1983.0 MB)
    24/09/07 07:51:58 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on MY-PC:52273 (size: 5.7 KB, free: 1983.3 MB)
    24/09/07 07:51:58 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1039
    24/09/07 07:51:58 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[6] at show at Main.scala:16) (first 15 tasks are for partitions Vector(0))
    24/09/07 07:51:58 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
    24/09/07 07:51:58 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, ANY, 7907 bytes)
    24/09/07 07:51:58 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
    24/09/07 07:51:58 INFO HadoopRDD: Input split: hdfs://namenode:8020/user/hive/warehouse/test_table/000000_0:0+14
    24/09/07 07:51:58 INFO CodeGenerator: Code generated in 10.0738 ms
    24/09/07 07:52:19 WARN BlockReaderFactory: I/O error constructing remote block reader.
    java.net.ConnectException: Connection timed out: no further information
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
        at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
        at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:530)
        at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:3090)
        at org.apache.hadoop.hdfs.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:778)
        at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:693)
        at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:354)
        at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:617)
        at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:841)
        at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:889)
        at java.io.DataInputStream.read(DataInputStream.java:149)
        at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
        at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
        at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
        at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
        at org.apache.hadoop.mapred.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:208)
        at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:246)
        at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:48)
        at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:277)
        at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:214)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
    24/09/07 07:52:19 WARN DFSClient: Failed to connect to /123.45.6.7:50010 for block, add to deadNodes and continue. java.net.ConnectException: Connection timed out: no further information
    java.net.ConnectException: Connection timed out: no further information
    ...
docker apache-spark hadoop docker-compose hive
1个回答
0
投票

Input split: hdfs://namenode:8020

如果您不在同一个 Docker 网络中(即在容器本身内)运行 Spark,那么该连接将会失败

然后,如果您确实在容器中运行,则需要更改配置以使用其他容器的主机名而不是本地主机。

您使用 Hive 的设置很好。 HDFS 未能为您的查询正确返回内容。

顺便说一句,我建议更新到最新的 Hadoop、Hive 和 Spark 3。您也不需要 HDFS 或 Postgres 来让 Hive+Spark 工作。

© www.soinside.com 2019 - 2024. All rights reserved.