ClassNotFoundException。不同 Scala 版本有问题吗?

问题描述 投票:0回答:1

当我尝试运行这个.py时:

import logging

from cassandra.cluster import Cluster
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType


def create_keyspace(session):
    session.execute("""
        CREATE KEYSPACE IF NOT EXISTS spark_streams
        WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
    """)

    print("Keyspace created successfully!")


def create_table(session):
    session.execute("""
    CREATE TABLE IF NOT EXISTS spark_streams.created_users (
        id UUID PRIMARY KEY,
        first_name TEXT,
        last_name TEXT,
        gender TEXT,
        address TEXT,
        post_code TEXT,
        email TEXT,
        username TEXT,
        registered_date TEXT,
        phone TEXT,
        picture TEXT);
    """)

    print("Table created successfully!")


def insert_data(session, **kwargs):
    print("inserting data...")

    user_id = kwargs.get('id')
    first_name = kwargs.get('first_name')
    last_name = kwargs.get('last_name')
    gender = kwargs.get('gender')
    address = kwargs.get('address')
    postcode = kwargs.get('post_code')
    email = kwargs.get('email')
    username = kwargs.get('username')
    dob = kwargs.get('dob')
    registered_date = kwargs.get('registered_date')
    phone = kwargs.get('phone')
    picture = kwargs.get('picture')

    try:
        session.execute("""
            INSERT INTO spark_streams.created_users(id, first_name, last_name, gender, address, 
                post_code, email, username, dob, registered_date, phone, picture)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        """, (user_id, first_name, last_name, gender, address,
              postcode, email, username, dob, registered_date, phone, picture))
        logging.info(f"Data inserted for {first_name} {last_name}")

    except Exception as e:
        logging.error(f'could not insert data due to {e}')


def create_spark_connection():
    s_conn = None

    try:
        s_conn = SparkSession.builder \
            .appName('SparkDataStreaming') \
            .config('spark.jars.packages', "com.datastax.spark:spark-cassandra-connector_2.13:3.4.1,"
                                           "org.apache.spark:spark-sql-kafka-0-10_2.13:3.4.1") \
            .config('spark.cassandra.connection.host', 'localhost') \
            .getOrCreate()

        s_conn.sparkContext.setLogLevel("ERROR")
        logging.info("Spark connection created successfully!")
    except Exception as e:
        logging.error(f"Couldn't create the spark session due to exception {e}")

    return s_conn


def connect_to_kafka(spark_conn):
    spark_df = None
    try:
        spark_df = spark_conn.readStream \
            .format('kafka') \
            .option('kafka.bootstrap.servers', 'localhost:9092') \
            .option('subscribe', 'users_created') \
            .option('startingOffsets', 'earliest') \
            .load()
        logging.info("kafka dataframe created successfully")
    except Exception as e:
        logging.warning(f"kafka dataframe could not be created because: {e}")

    return spark_df


def create_cassandra_connection():
    try:
        # connecting to the cassandra cluster
        cluster = Cluster(['localhost'])

        cas_session = cluster.connect()

        return cas_session
    except Exception as e:
        logging.error(f"Could not create cassandra connection due to {e}")
        return None


def create_selection_df_from_kafka(spark_df):
    schema = StructType([
        StructField("id", StringType(), False),
        StructField("first_name", StringType(), False),
        StructField("last_name", StringType(), False),
        StructField("gender", StringType(), False),
        StructField("address", StringType(), False),
        StructField("post_code", StringType(), False),
        StructField("email", StringType(), False),
        StructField("username", StringType(), False),
        StructField("registered_date", StringType(), False),
        StructField("phone", StringType(), False),
        StructField("picture", StringType(), False)
    ])

    sel = spark_df.selectExpr("CAST(value AS STRING)") \
        .select(from_json(col('value'), schema).alias('data')).select("data.*")
    print(sel)

    return sel


if __name__ == "__main__":
    # create spark connection
    spark_conn = create_spark_connection()

    if spark_conn is not None:
        # connect to kafka with spark connection
        spark_df = connect_to_kafka(spark_conn)
        selection_df = create_selection_df_from_kafka(spark_df)
        session = create_cassandra_connection()

        if session is not None:
            create_keyspace(session)
            create_table(session)

            logging.info("Streaming is being started...")

            streaming_query = (selection_df.writeStream.format("org.apache.spark.sql.cassandra")
                               .option('checkpointLocation', '/tmp/checkpoint')
                               .option('keyspace', 'spark_streams')
                               .option('table', 'created_users')
                               .start())

            streaming_query.awaitTermination()

我遇到这个问题:

(.venv) fran@fran-VirtualBox:~/PycharmProjects/data-engineering$ python spark_stream.py 
24/05/12 18:54:01 WARN Utils: Your hostname, fran-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
24/05/12 18:54:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/fran/PycharmProjects/data-engineering/.venv/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/fran/.ivy2/cache
The jars for the packages stored in: /home/fran/.ivy2/jars
com.datastax.spark#spark-cassandra-connector_2.13 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-8a294b11-ad47-470c-8f84-295222264cee;1.0
        confs: [default]
        found com.datastax.spark#spark-cassandra-connector_2.13;3.4.1 in central
        found com.datastax.spark#spark-cassandra-connector-driver_2.13;3.4.1 in central
        found org.scala-lang.modules#scala-collection-compat_2.13;2.11.0 in central
        found org.scala-lang.modules#scala-parallel-collections_2.13;1.0.4 in central
        found com.datastax.oss#java-driver-core-shaded;4.13.0 in central
        found com.datastax.oss#native-protocol;1.5.0 in central
        found com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 in central
        found com.typesafe#config;1.4.1 in central
        found org.slf4j#slf4j-api;1.7.26 in central
        found io.dropwizard.metrics#metrics-core;4.1.18 in central
        found org.hdrhistogram#HdrHistogram;2.1.12 in central
        found org.reactivestreams#reactive-streams;1.0.3 in central
        found com.github.stephenc.jcip#jcip-annotations;1.0-1 in central
        found com.github.spotbugs#spotbugs-annotations;3.1.12 in central
        found com.google.code.findbugs#jsr305;3.0.2 in central
        found com.datastax.oss#java-driver-mapper-runtime;4.13.0 in central
        found com.datastax.oss#java-driver-query-builder;4.13.0 in central
        found org.apache.commons#commons-lang3;3.10 in central
        found com.thoughtworks.paranamer#paranamer;2.8 in central
        found org.scala-lang#scala-reflect;2.13.11 in central
        found org.apache.spark#spark-sql-kafka-0-10_2.13;3.4.1 in central
        found org.apache.spark#spark-token-provider-kafka-0-10_2.13;3.4.1 in central
        found org.apache.kafka#kafka-clients;3.3.2 in central
        found org.lz4#lz4-java;1.8.0 in central
        found org.xerial.snappy#snappy-java;1.1.10.1 in central
        found org.slf4j#slf4j-api;2.0.6 in central
        found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
        found org.apache.hadoop#hadoop-client-api;3.3.4 in central
        found commons-logging#commons-logging;1.1.3 in central
        found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 2530ms :: artifacts dl 40ms
        :: modules in use:
        com.datastax.oss#java-driver-core-shaded;4.13.0 from central in [default]
        com.datastax.oss#java-driver-mapper-runtime;4.13.0 from central in [default]
        com.datastax.oss#java-driver-query-builder;4.13.0 from central in [default]
        com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 from central in [default]
        com.datastax.oss#native-protocol;1.5.0 from central in [default]
        com.datastax.spark#spark-cassandra-connector-driver_2.13;3.4.1 from central in [default]
        com.datastax.spark#spark-cassandra-connector_2.13;3.4.1 from central in [default]
        com.github.spotbugs#spotbugs-annotations;3.1.12 from central in [default]
        com.github.stephenc.jcip#jcip-annotations;1.0-1 from central in [default]
        com.google.code.findbugs#jsr305;3.0.2 from central in [default]
        com.thoughtworks.paranamer#paranamer;2.8 from central in [default]
        com.typesafe#config;1.4.1 from central in [default]
        commons-logging#commons-logging;1.1.3 from central in [default]
        io.dropwizard.metrics#metrics-core;4.1.18 from central in [default]
        org.apache.commons#commons-lang3;3.10 from central in [default]
        org.apache.commons#commons-pool2;2.11.1 from central in [default]
        org.apache.hadoop#hadoop-client-api;3.3.4 from central in [default]
        org.apache.hadoop#hadoop-client-runtime;3.3.4 from central in [default]
        org.apache.kafka#kafka-clients;3.3.2 from central in [default]
        org.apache.spark#spark-sql-kafka-0-10_2.13;3.4.1 from central in [default]
        org.apache.spark#spark-token-provider-kafka-0-10_2.13;3.4.1 from central in [default]
        org.hdrhistogram#HdrHistogram;2.1.12 from central in [default]
        org.lz4#lz4-java;1.8.0 from central in [default]
        org.reactivestreams#reactive-streams;1.0.3 from central in [default]
        org.scala-lang#scala-reflect;2.13.11 from central in [default]
        org.scala-lang.modules#scala-collection-compat_2.13;2.11.0 from central in [default]
        org.scala-lang.modules#scala-parallel-collections_2.13;1.0.4 from central in [default]
        org.slf4j#slf4j-api;2.0.6 from central in [default]
        org.xerial.snappy#snappy-java;1.1.10.1 from central in [default]
        :: evicted modules:
        org.slf4j#slf4j-api;1.7.26 by [org.slf4j#slf4j-api;2.0.6] in [default]
        com.google.code.findbugs#jsr305;3.0.0 by [com.google.code.findbugs#jsr305;3.0.2] in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   31  |   0   |   0   |   2   ||   29  |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-8a294b11-ad47-470c-8f84-295222264cee
        confs: [default]
        0 artifacts copied, 29 already retrieved (0kB/54ms)
24/05/12 18:54:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
WARNING:root:kafka dataframe could not be created because: An error occurred while calling o36.load.
: java.lang.NoClassDefFoundError: scala/$less$colon$less
        at org.apache.spark.sql.kafka010.KafkaSourceProvider.org$apache$spark$sql$kafka010$KafkaSourceProvider$$validateStreamOptions(KafkaSourceProvider.scala:338)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:71)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:233)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118)
        at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:35)
        at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:168)
        at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:144)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: scala.$less$colon$less
        at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
        ... 20 more

Traceback (most recent call last):
  File "/home/fran/PycharmProjects/data-engineering/spark_stream.py", line 143, in <module>
    selection_df = create_selection_df_from_kafka(spark_df)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/fran/PycharmProjects/data-engineering/spark_stream.py", line 129, in create_selection_df_from_kafka
    sel = spark_df.selectExpr("CAST(value AS STRING)") \
          ^^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'selectExpr'

我认为这个“**ClassNotFoundException for

scala.$less$colon$less"
**是Spark使用的Scala版本的问题:

Spark version 3.4.1
      /_/
                        
Using Scala version 2.12.17, OpenJDK 64-Bit Server VM, 11.0.22

这可能与 Kafka 使用的 Scala 版本不同(注意我使用的是 python 3.11 的 kafka-python 包)。

这可能是问题所在吗?如何对齐两个 Scala 版本?

scala apache-spark apache-kafka
1个回答
0
投票

根据输出

Using Scala version 2.12.17
,您需要使用 Spark 依赖项与
2.12
而不是
2.13
,例如 Cassandra 和 Kafka 连接器

注意我正在使用 python 3.11 的 kafka-python 包

这是一个原生 Python 包,因此不使用 Scala

© www.soinside.com 2019 - 2024. All rights reserved.