我编写了代码,将数据从csv文件加载到PostgreSql表。代码之前工作正常,突然失败,并显示错误py4j.protocol.Py4JJavaError: An error occurred while calling o65.save.
: java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.kafka010.KafkaSourceProvider could not be instantiated
似乎在与jdbc驱动程序连接时出现问题。任何建议都会有所帮助,
import pyspark
from pyspark.sql import SparkSession, Row
sc = pyspark.SparkContext('local[*]')
SqlContext = pyspark.SQLContext(sc)
spark = SparkSession(sc)
productsFF = sc.textFile("C:\Hadoop\Data\ProductType.csv")
productsDF = productsFF.map(lambda p: Row(product_type_intrnl_id = int(p.split(",")[0]),
product_type_code = p.split(",")[1], product_sub_type_code = p.split(",")[3])).toDF()
productsDF.createTempView("product_type_tmp")
product_type_tmp = SqlContext.sql("select * from product_type_tmp") #.show()
SqlContext.sql("show tables").show()
product_type_tmp.write.format('jdbc') \
.option("url", "jdbc:postgresql://localhost:5432/xxxx") \
.option("dbtable", "xxxx") \
.option("user", "xxxx") \
.option("password", "xxxx") \
.option("driver", "org.postgresql.Driver").mode("append").save()
下面是详细错误,
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):
File "C:/Users/Macaulay/PycharmProjects/Spark/SparkSqlFile2Table.py", line 18, in <module>
product_type_tmp.write.format('jdbc') \
File "C:\Hadoop\Spark\spark-3.0.0-preview2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\readwriter.py", line 767, in save
File "C:\Hadoop\Spark\spark-3.0.0-preview2-bin-hadoop2.7\python\lib\py4j-0.10.8.1-src.zip\py4j\java_gateway.py", line 1285, in __call__
File "C:\Hadoop\Spark\spark-3.0.0-preview2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\sql\utils.py", line 98, in deco
File "C:\Hadoop\Spark\spark-3.0.0-preview2-bin-hadoop2.7\python\lib\py4j-0.10.8.1-src.zip\py4j\protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o65.save.
: java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.kafka010.KafkaSourceProvider could not be instantiated
at java.util.ServiceLoader.fail(Unknown Source)
at java.util.ServiceLoader.access$100(Unknown Source)
at java.util.ServiceLoader$LazyIterator.nextService(Unknown Source)
at java.util.ServiceLoader$LazyIterator.next(Unknown Source)
at java.util.ServiceLoader$1.next(Unknown Source)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:44)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:255)
at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:249)
at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
at scala.collection.TraversableLike.filter(TraversableLike.scala:347)
at scala.collection.TraversableLike.filter$(TraversableLike.scala:347)
at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:644)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:728)
at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:832)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:252)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.NoClassDefFoundError: org/apache/spark/internal/Logging$class
at org.apache.spark.sql.kafka010.KafkaSourceProvider.<init>(KafkaSourceProvider.scala:40)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
at java.lang.reflect.Constructor.newInstance(Unknown Source)
at java.lang.Class.newInstance(Unknown Source)
... 31 more
Caused by: java.lang.ClassNotFoundException: org.apache.spark.internal.Logging$class
at java.net.URLClassLoader$1.run(Unknown Source)
at java.net.URLClassLoader$1.run(Unknown Source)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
... 37 more```
org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0-preview2