找不到数据源:mongo

问题描述 投票:0回答:1

我正在尝试创建一个本地管道,用于接收从我的 kafka 代理流式传输的消息,并在写入 MongoDB 之前在 Spark 中对其进行处理。我已经安装了所有必要的 JAR 并包含了 mongo Spark 连接器,但我收到一条错误消息,指出未找到数据源。 JAR 版本正确。我正在运行 pyspark 3.5.3、Scala 2.12、python 3.11.6

from pymongo import MongoClient
from pyspark.sql import SparkSession
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def create_sparkconnection():
    spark = None
    spark = SparkSession.builder.appName("spark_streaming")\
                                .config("spark.jars",
                                        ".virtualenv/lib/python3.11/site-packages/pyspark/jars/mongo-spark-connector_2.12-10.4.0.jar,"
                                        ".virtualenv/lib/python3.11/site-packages/pyspark/jars/commons-pool2-2.12.0.jar,"
                                        ".virtualenv/lib/python3.11/site-packages/pyspark/jars/spark-sql-kafka-0-10_2.12-3.5.3.jar,"
                                        ".virtualenv/lib/python3.11/site-packages/pyspark/jars/kafka-clients-3.5.1.jar,"
                                        ".virtualenv/lib/python3.11/site-packages/pyspark/jars/spark-token-provider-kafka-0-10_2.12-3.5.3.jar")\
                                .config("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/my_database.my_collection") \
                                .config('spark.mongodb.output.uri',"mongodb://127.0.0.1:27017/my_database.my_collection")\
                                .getOrCreate()
    
    spark.sparkContext.setLogLevel("ERROR")
    logging.info("Spark connection successfully created")

    return spark

def mongo_connect():
    mongodb_uri = "mongodb://127.0.0.1:27017"
    client = MongoClient(mongodb_uri)
    db = client['my_database']
    collection = db['my_collection']

    print("Mongodb connected")
    return collection

def kafkaconnect(sparkconnection):
    spark_df = None
    spark_df = sparkconnection.readStream \
                .format('kafka') \
                .option('kafka.bootstrap.servers','127.0.0.1:9092') \
                .option('subscribe','users') \
                .option('startingOffsets', 'earliest') \
                .load()
    
    print (spark_df)
    return spark_df

def jsonstream(kafkastream):
    json_df = kafkastream.selectExpr("CAST(value AS STRING) as json_value")

    print(json_df)
    return json_df




if __name__ == "__main__":
    sparkconnected = create_sparkconnection()
    
    if sparkconnected is not None:
        df = kafkaconnect(sparkconnected)
        finaldf = jsonstream(df)
        mongosession = mongo_connect()

        if mongo_connect is not None:
            streaming = finaldf.writeStream.outputMode("append") \
                                .format("mongo") \
                                .option('checkpointLocation','/tmp/checkpoint') \
                                .option('spark.mongodb.output.uri',"mongodb://127.0.0.1:27017/my_database.my_collection") \
                                .start()
            
            streaming.awaitTermination()

错误日志:

py4j.protocol.Py4JJavaError: An error occurred while calling o51.start.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: mongo. Please find packages at `https://spark.apache.org/third-party-projects.html`.
        at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:725)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
        at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:370)
        at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:251)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: mongo.DefaultSource
        at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
        at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
        at scala.util.Try$.apply(Try.scala:213)
        at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
        at scala.util.Failure.orElse(Try.scala:224)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
        ... 14 more

我还尝试使用

spark.jars.packages
直接从 Maven 引用依赖项,而不是下载它,但出现了相同的错误消息。

mongodb apache-spark pyspark apache-spark-sql
1个回答
0
投票

我需要知道您是否尝试过以下配置:

spark-提交
--packages org.mongodb.spark:mongo-spark-connector_2.11:2.x.y
your_spark_script.py

© www.soinside.com 2019 - 2024. All rights reserved.