如何打印JDBC数据库并行读取的分区数据

问题描述 投票:0回答:1

嗨,我想以并行读取方式打印 postgre 数据库中的分区数据,但它不起作用 我想知道可以打印吗?我用谷歌搜索了很长时间,但没有任何结果

这是我尝试过的代码:

Dataset<Row> df= spark.read().format("jdbc").option("url","jdbc:postgresql://"+ip+":"+port+"/"+db )
                      .option("driver", "org.postgresql.Driver")
                      .option("dbtable", sql)
                      .option("user", user)
                      .option("password", passwd)
                      .option("partitionColumn", "rownum")
                    .option("numPartitions", 10)
                    .option("lowerBound", 1)
                    .option("upperBound", 2000)
                      .load();

df.foreach(x->{System.out.println("row:"+x);}); --> error

这是错误日志:

java.lang.IllegalStateException: SparkContext has been shutdown
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2053)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:927)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:925)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
java apache-spark jdbc parallel-processing
1个回答
0
投票

您应该利用 Apache Spark 中的

df.foreachPartition
方法并行打印 PostgreSQL 数据库表分区中的数据。您可能尝试在 SparkContext 停止后运行 Spark 作业,这就是为什么您收到错误 java.lang.IllegalStateException:SparkContext 已关闭。” 通过使用
foreachPartition
循环遍历 DataFrame 的每个分区并打印并行数据,下面提供的修改后的代码解决了这个问题:

import org.apache.spark.api.java.function.ForeachPartitionFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> df = spark.read()
    .format("jdbc")
    .option("url", "jdbc:postgresql://" + ip + ":" + port + "/" + db)
    .option("driver", "org.postgresql.Driver")
    .option("dbtable", sql)
    .option("user", user)
    .option("password", passwd)
    .option("partitionColumn", "rownum")
    .option("numPartitions", 10)
    .option("lowerBound", 1)
    .option("upperBound", 2000)
    .load();

df.foreachPartition((ForeachPartitionFunction<Row>) iterator -> {
    while (iterator.hasNext()) {
        Row row = iterator.next();
        System.out.println("row: " + row);
    }
});

通过此代码对每个分区内的数据进行有效的并行处理,可以打印 PostgreSQL 数据而不会遇到与 SparkContext 关闭相关的问题。在运行此代码之前,请确保 Spark 正在运行并检查您的配置以确保 Spark 正确初始化。

希望对你有帮助,想了解更多请联系我

© www.soinside.com 2019 - 2024. All rights reserved.