我遇到了这个错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o59.start.
: java.lang.NoClassDefFoundError: com/datastax/spark/connector/util/Logging
at java.base/java.lang.ClassLoader.defineClass1(Native Method)
at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1022)
at java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
at java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(BuiltinClassLoader.java:800)
at java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(BuiltinClassLoader.java:698)
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(BuiltinClassLoader.java:621)
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:579)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
at org.apache.spark.sql.cassandra.DefaultSource.getTable(DefaultSource.scala:55)
at org.apache.spark.sql.cassandra.DefaultSource.inferSchema(DefaultSource.scala:67)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:90)
at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:396)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:249)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: com.datastax.spark.connector.util.Logging
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
... 26 more
运行此Python脚本时:
import logging
from cassandra.cluster import Cluster
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType
def create_keyspace(session):
session.execute("""
CREATE KEYSPACE IF NOT EXISTS spark_streams
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
""")
print("Keyspace created successfully!")
def create_table(session):
session.execute("""
CREATE TABLE IF NOT EXISTS spark_streams.created_users (
id UUID PRIMARY KEY,
first_name TEXT,
last_name TEXT,
gender TEXT,
address TEXT,
post_code TEXT,
email TEXT,
username TEXT,
registered_date TEXT,
phone TEXT,
picture TEXT);
""")
print("Table created successfully!")
def insert_data(session, **kwargs):
print("inserting data...")
user_id = kwargs.get('id')
first_name = kwargs.get('first_name')
last_name = kwargs.get('last_name')
gender = kwargs.get('gender')
address = kwargs.get('address')
postcode = kwargs.get('post_code')
email = kwargs.get('email')
username = kwargs.get('username')
dob = kwargs.get('dob')
registered_date = kwargs.get('registered_date')
phone = kwargs.get('phone')
picture = kwargs.get('picture')
try:
session.execute("""
INSERT INTO spark_streams.created_users(id, first_name, last_name, gender, address,
post_code, email, username, dob, registered_date, phone, picture)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (user_id, first_name, last_name, gender, address,
postcode, email, username, dob, registered_date, phone, picture))
logging.info(f"Data inserted for {first_name} {last_name}")
except Exception as e:
logging.error(f'could not insert data due to {e}')
def create_spark_connection():
s_conn = None
try:
s_conn = SparkSession.builder \
.appName('SparkDataStreaming') \
.config('spark.jars.packages', "com.datastax.spark:spark-cassandra-connector_2.12:3.4.1,"
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1,"
"org.apache.kafka/kafka-clients:2.3.1") \
.config('spark.cassandra.connection.host', 'localhost') \
.getOrCreate()
s_conn.sparkContext.setLogLevel("ERROR")
logging.info("Spark connection created successfully!")
except Exception as e:
logging.error(f"Couldn't create the spark session due to exception {e}")
return s_conn
def connect_to_kafka(spark_conn):
spark_df = None
try:
spark_df = spark_conn.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', 'localhost:9092') \
.option('subscribe', 'users_created') \
.option('startingOffsets', 'earliest') \
.load()
logging.info("kafka dataframe created successfully")
except Exception as e:
logging.warning(f"kafka dataframe could not be created because: {e}")
return spark_df
def create_cassandra_connection():
try:
# connecting to the cassandra cluster
cluster = Cluster(['localhost'])
cas_session = cluster.connect()
return cas_session
except Exception as e:
logging.error(f"Could not create cassandra connection due to {e}")
return None
def create_selection_df_from_kafka(spark_df):
schema = StructType([
StructField("id", StringType(), False),
StructField("first_name", StringType(), False),
StructField("last_name", StringType(), False),
StructField("gender", StringType(), False),
StructField("address", StringType(), False),
StructField("post_code", StringType(), False),
StructField("email", StringType(), False),
StructField("username", StringType(), False),
StructField("registered_date", StringType(), False),
StructField("phone", StringType(), False),
StructField("picture", StringType(), False)
])
sel = spark_df.selectExpr("CAST(value AS STRING)") \
.select(from_json(col('value'), schema).alias('data')).select("data.*")
print(sel)
return sel
if __name__ == "__main__":
# create spark connection
spark_conn = create_spark_connection()
if spark_conn is not None:
# connect to kafka with spark connection
spark_df = connect_to_kafka(spark_conn)
selection_df = create_selection_df_from_kafka(spark_df)
session = create_cassandra_connection()
if session is not None:
create_keyspace(session)
create_table(session)
logging.info("Streaming is being started...")
streaming_query = (selection_df.writeStream.format("org.apache.spark.sql.cassandra")
.option('checkpointLocation', '/tmp/checkpoint')
.option('keyspace', 'spark_streams')
.option('table', 'created_users')
.start())
streaming_query.awaitTermination()
这很奇怪,因为我已经将spark-cassandra-connector_2.12-3.4.1.jar粘贴到.venv/lib/python3.11/site-packages/pyspark/jars中。也许我没有以正确的方式安装它,所以缺少依赖项或类似的东西。
我是否缺少任何依赖项? 我应该以其他方式安装这个罐子吗? 谢谢。
我同意您的评估,即您的构建中缺少依赖项。
我建议不要使用连接器 JAR 进行构建,而是使用
--packages
选项指定 Spark Cassandra 连接器的包坐标,以便所有依赖项都包含在您的应用程序中。
例如,尝试使用以下命令启动 PySpark shell:
$ bin/pyspark \
--master <spark_master_url> \
--spark.cassandra.connection.host=cassandra_host_ip \
--packages com.datastax.spark:spark-cassandra-connector_2.12:3.4.1
--conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions
请注意,我假设您是从 Spark 3.4 集群进行连接。
有关其他信息,请参阅 Spark Cassandra 连接器文档中的PySpark 页面。干杯!