“java.lang.NoSuchMethodError:'scala.collection.JavaConverters$AsJava scala.collection”错误

问题描述 投票:0回答:1

我在这里陷入困境。我正在尝试实现一个非常基本的管道,它从 kafka 读取数据并在 Spark 中处理它。我面临的问题是 apache Spark 突然关闭并给出上述错误消息。我的pyspark版本是3.5.1,scala版本是2.12.18。

有问题的代码是:-

from pyspark.sql import SparkSession
from pyspark.sql.functions import *


spark = SparkSession.builder \
    .appName('my_app') \
    .config("spark.jars", "/usr/local/spark/jars/spark-sql-kafka-0-10_2.12-3.5.1.jar") \
    .getOrCreate()


df = spark.readStream \
    .format('kafka') \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "quickstart-events") \
    .option("startingOffsets", "earliest") \
    .load()


query = df.writeStream \
    .trigger(processingTime='5 seconds') \
    .outputMode("update") \
    .format("console") \
    .start()

query.awaitTermination()

我已经下载了所有必需的 jar 文件并将它们放在 Spark 中的相应目录中。我能够以客户身份读取来自 kafka 代理的消息,因此排除了我的 kafka 安装出现故障的可能性。任何帮助将不胜感激。

python scala apache-spark pyspark spark-kafka-integration
1个回答
0
投票

NoSuchMethodError 的一般原因是

如何修复 NoSuchMethodError?

Java中NoSuchMethodException和NoSuchMethodError的区别

我怀疑你正在混合

_2.12
_2.13
依赖项。

scala.collection.JavaConverters.AsJava
存在于 Scala 2.13.x 中,但不存在于 Scala 2.12.x 中。

您应该调查您的类路径。修改你的脚本

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .appName('my_app') \
    .getOrCreate()

print('Class path on start')
gateway = SparkContext._gateway
separator = gateway.jvm.java.lang.System.getProperty('path.separator')
for url in gateway.jvm.java.lang.System.getProperty('java.class.path').split(separator):
    print(url)

print()

# classLoader = gateway.jvm.java.lang.ClassLoader.getSystemClassLoader()
# classLoader = gateway.java_gateway_server.getClass().getClassLoader()
classLoader = gateway.jvm.java.lang.Thread.currentThread().getContextClassLoader()
while classLoader is not None:
    try:
        urls = classLoader.getURLs()
        print(f'Class loader {classLoader}:')
        for url in urls:
          print(url)
    except:
        print(f'Class loader {classLoader}: not URLClassLoader')
    print()
    classLoader = classLoader.getParent()

print("ok")

df = spark.readStream \
    .format('kafka') \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "quickstart-events") \
    .option("startingOffsets", "earliest") \
    .load()

query = df.writeStream \
    .trigger(processingTime='5 seconds') \
    .outputMode("update") \
    .format("console") \
    .start()

query.awaitTermination()

并运行它

./spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 /path/to/file.py 

查看输出(您可以在此处发布)是否不仅有

_2.12
jar,而且还有
_2.13
jar。

使用

spark-submit
.config("spark.jars", ...
在代码中不起作用,正确的是命令行中的
--packages ...

为什么 Spark-submit 会忽略我在 Spark 会话配置中包含的包?

© www.soinside.com 2019 - 2024. All rights reserved.