如何将 Salesforce 与 pyspark 连接?

问题描述 投票:0回答:1

我想将 Salesforce 与 pyspark 连接 我这样做是为了将 salesforce 连接到 pyspark,但我经常收到此 NoClassDefFoundError。请建议解决这个问题。

from pyspark.sql import SparkSession

jdbc_driver = "/home/azminds/pyspark/spark-3.5.0-bin-hadoop3/jars/spark- 
salesforce_2.12-1.1.4.jar"
spark = SparkSession.builder.master("local[*]") \
    .appName('Db2Connection') \
    .config('spark.driver.extraClassPath', jdbc_driver) \
    .getOrCreate()

try:
    soql = "SELECT * FROM account"  
    df = spark \
        .read \
        .format("com.springml.spark.salesforce") \
        .option("username", "[email protected]") \
        .option("password", "password+securityToken") \
        .option("soql", soql) \
        .option("version", "56.0") \
        .load()

    df.show()
except Exception as Error:
    print("Error ==",Error)

但是我收到此错误 NoClassDefFoundError 我认为我的驱动程序版本不匹配,或者我是否必须包含更多驱动程序

Error == An error occurred while calling o37.load.
: java.lang.NoClassDefFoundError: org/apache/spark/sql/sources/v2/ReadSupport
        at java.base/java.lang.ClassLoader.defineClass1(Native Method)
        at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1022)
        at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
        at java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(BuiltinClassLoader.java:800)
        at java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(BuiltinClassLoader.java:698)
        at java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(BuiltinClassLoader.java:621)
        at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:579)
        at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:581)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
        at java.base/java.lang.Class.forName0(Native Method)
        at java.base/java.lang.Class.forName(Class.java:398)
        at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.nextProviderClass(ServiceLoader.java:1210)
        at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1221)
        at java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1265)
        at java.base/java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1300)
        at java.base/java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1385)
        at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:45)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303)
        at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297)
        at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
        at scala.collection.TraversableLike.filter(TraversableLike.scala:395)
        at scala.collection.TraversableLike.filter$(TraversableLike.scala:395)
        at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:629)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:208)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.sources.v2.ReadSupport
        at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
        at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
        ... 46 more

我认为我的驱动程序版本不匹配我正在使用 pyspark 3.5.0 和 scala 2.12 有人可以建议解决这个问题吗?

python pyspark salesforce databricks
1个回答
0
投票

尝试使用

spark.jars.packages
属性。

spark = SparkSession.builder.master("local[*]") \
.appName('Db2Connection') \
.config('spark.jars.packages', 'com.springml:spark-salesforce_2.12:1.1.4') \
.getOrCreate()
© www.soinside.com 2019 - 2024. All rights reserved.