嗨,我想以并行读取方式打印 postgre 数据库中的分区数据,但它不起作用 我想知道可以打印吗?我用谷歌搜索了很长时间,但没有任何结果
这是我尝试过的代码:
Dataset<Row> df= spark.read().format("jdbc").option("url","jdbc:postgresql://"+ip+":"+port+"/"+db )
.option("driver", "org.postgresql.Driver")
.option("dbtable", sql)
.option("user", user)
.option("password", passwd)
.option("partitionColumn", "rownum")
.option("numPartitions", 10)
.option("lowerBound", 1)
.option("upperBound", 2000)
.load();
df.foreach(x->{System.out.println("row:"+x);}); --> error
这是错误日志:
java.lang.IllegalStateException: SparkContext has been shutdown
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2053)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:927)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:925)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
您应该利用 Apache Spark 中的
df.foreachPartition
方法并行打印 PostgreSQL 数据库表分区中的数据。您可能尝试在 SparkContext 停止后运行 Spark 作业,这就是为什么您收到错误 java.lang.IllegalStateException:SparkContext 已关闭。” 通过使用 foreachPartition
循环遍历 DataFrame 的每个分区并打印并行数据,下面提供的修改后的代码解决了这个问题:
import org.apache.spark.api.java.function.ForeachPartitionFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> df = spark.read()
.format("jdbc")
.option("url", "jdbc:postgresql://" + ip + ":" + port + "/" + db)
.option("driver", "org.postgresql.Driver")
.option("dbtable", sql)
.option("user", user)
.option("password", passwd)
.option("partitionColumn", "rownum")
.option("numPartitions", 10)
.option("lowerBound", 1)
.option("upperBound", 2000)
.load();
df.foreachPartition((ForeachPartitionFunction<Row>) iterator -> {
while (iterator.hasNext()) {
Row row = iterator.next();
System.out.println("row: " + row);
}
});
通过此代码对每个分区内的数据进行有效的并行处理,可以打印 PostgreSQL 数据而不会遇到与 SparkContext 关闭相关的问题。在运行此代码之前,请确保 Spark 正在运行并检查您的配置以确保 Spark 正确初始化。
希望对你有帮助,想了解更多请联系我