当我尝试运行这个.py时:
import logging
from cassandra.cluster import Cluster
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType
def create_keyspace(session):
session.execute("""
CREATE KEYSPACE IF NOT EXISTS spark_streams
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
""")
print("Keyspace created successfully!")
def create_table(session):
session.execute("""
CREATE TABLE IF NOT EXISTS spark_streams.created_users (
id UUID PRIMARY KEY,
first_name TEXT,
last_name TEXT,
gender TEXT,
address TEXT,
post_code TEXT,
email TEXT,
username TEXT,
registered_date TEXT,
phone TEXT,
picture TEXT);
""")
print("Table created successfully!")
def insert_data(session, **kwargs):
print("inserting data...")
user_id = kwargs.get('id')
first_name = kwargs.get('first_name')
last_name = kwargs.get('last_name')
gender = kwargs.get('gender')
address = kwargs.get('address')
postcode = kwargs.get('post_code')
email = kwargs.get('email')
username = kwargs.get('username')
dob = kwargs.get('dob')
registered_date = kwargs.get('registered_date')
phone = kwargs.get('phone')
picture = kwargs.get('picture')
try:
session.execute("""
INSERT INTO spark_streams.created_users(id, first_name, last_name, gender, address,
post_code, email, username, dob, registered_date, phone, picture)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (user_id, first_name, last_name, gender, address,
postcode, email, username, dob, registered_date, phone, picture))
logging.info(f"Data inserted for {first_name} {last_name}")
except Exception as e:
logging.error(f'could not insert data due to {e}')
def create_spark_connection():
s_conn = None
try:
s_conn = SparkSession.builder \
.appName('SparkDataStreaming') \
.config('spark.jars.packages', "com.datastax.spark:spark-cassandra-connector_2.13:3.4.1,"
"org.apache.spark:spark-sql-kafka-0-10_2.13:3.4.1") \
.config('spark.cassandra.connection.host', 'localhost') \
.getOrCreate()
s_conn.sparkContext.setLogLevel("ERROR")
logging.info("Spark connection created successfully!")
except Exception as e:
logging.error(f"Couldn't create the spark session due to exception {e}")
return s_conn
def connect_to_kafka(spark_conn):
spark_df = None
try:
spark_df = spark_conn.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', 'localhost:9092') \
.option('subscribe', 'users_created') \
.option('startingOffsets', 'earliest') \
.load()
logging.info("kafka dataframe created successfully")
except Exception as e:
logging.warning(f"kafka dataframe could not be created because: {e}")
return spark_df
def create_cassandra_connection():
try:
# connecting to the cassandra cluster
cluster = Cluster(['localhost'])
cas_session = cluster.connect()
return cas_session
except Exception as e:
logging.error(f"Could not create cassandra connection due to {e}")
return None
def create_selection_df_from_kafka(spark_df):
schema = StructType([
StructField("id", StringType(), False),
StructField("first_name", StringType(), False),
StructField("last_name", StringType(), False),
StructField("gender", StringType(), False),
StructField("address", StringType(), False),
StructField("post_code", StringType(), False),
StructField("email", StringType(), False),
StructField("username", StringType(), False),
StructField("registered_date", StringType(), False),
StructField("phone", StringType(), False),
StructField("picture", StringType(), False)
])
sel = spark_df.selectExpr("CAST(value AS STRING)") \
.select(from_json(col('value'), schema).alias('data')).select("data.*")
print(sel)
return sel
if __name__ == "__main__":
# create spark connection
spark_conn = create_spark_connection()
if spark_conn is not None:
# connect to kafka with spark connection
spark_df = connect_to_kafka(spark_conn)
selection_df = create_selection_df_from_kafka(spark_df)
session = create_cassandra_connection()
if session is not None:
create_keyspace(session)
create_table(session)
logging.info("Streaming is being started...")
streaming_query = (selection_df.writeStream.format("org.apache.spark.sql.cassandra")
.option('checkpointLocation', '/tmp/checkpoint')
.option('keyspace', 'spark_streams')
.option('table', 'created_users')
.start())
streaming_query.awaitTermination()
我遇到这个问题:
(.venv) fran@fran-VirtualBox:~/PycharmProjects/data-engineering$ python spark_stream.py
24/05/12 18:54:01 WARN Utils: Your hostname, fran-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
24/05/12 18:54:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/fran/PycharmProjects/data-engineering/.venv/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/fran/.ivy2/cache
The jars for the packages stored in: /home/fran/.ivy2/jars
com.datastax.spark#spark-cassandra-connector_2.13 added as a dependency
org.apache.spark#spark-sql-kafka-0-10_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-8a294b11-ad47-470c-8f84-295222264cee;1.0
confs: [default]
found com.datastax.spark#spark-cassandra-connector_2.13;3.4.1 in central
found com.datastax.spark#spark-cassandra-connector-driver_2.13;3.4.1 in central
found org.scala-lang.modules#scala-collection-compat_2.13;2.11.0 in central
found org.scala-lang.modules#scala-parallel-collections_2.13;1.0.4 in central
found com.datastax.oss#java-driver-core-shaded;4.13.0 in central
found com.datastax.oss#native-protocol;1.5.0 in central
found com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 in central
found com.typesafe#config;1.4.1 in central
found org.slf4j#slf4j-api;1.7.26 in central
found io.dropwizard.metrics#metrics-core;4.1.18 in central
found org.hdrhistogram#HdrHistogram;2.1.12 in central
found org.reactivestreams#reactive-streams;1.0.3 in central
found com.github.stephenc.jcip#jcip-annotations;1.0-1 in central
found com.github.spotbugs#spotbugs-annotations;3.1.12 in central
found com.google.code.findbugs#jsr305;3.0.2 in central
found com.datastax.oss#java-driver-mapper-runtime;4.13.0 in central
found com.datastax.oss#java-driver-query-builder;4.13.0 in central
found org.apache.commons#commons-lang3;3.10 in central
found com.thoughtworks.paranamer#paranamer;2.8 in central
found org.scala-lang#scala-reflect;2.13.11 in central
found org.apache.spark#spark-sql-kafka-0-10_2.13;3.4.1 in central
found org.apache.spark#spark-token-provider-kafka-0-10_2.13;3.4.1 in central
found org.apache.kafka#kafka-clients;3.3.2 in central
found org.lz4#lz4-java;1.8.0 in central
found org.xerial.snappy#snappy-java;1.1.10.1 in central
found org.slf4j#slf4j-api;2.0.6 in central
found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
found org.apache.hadoop#hadoop-client-api;3.3.4 in central
found commons-logging#commons-logging;1.1.3 in central
found org.apache.commons#commons-pool2;2.11.1 in central
:: resolution report :: resolve 2530ms :: artifacts dl 40ms
:: modules in use:
com.datastax.oss#java-driver-core-shaded;4.13.0 from central in [default]
com.datastax.oss#java-driver-mapper-runtime;4.13.0 from central in [default]
com.datastax.oss#java-driver-query-builder;4.13.0 from central in [default]
com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 from central in [default]
com.datastax.oss#native-protocol;1.5.0 from central in [default]
com.datastax.spark#spark-cassandra-connector-driver_2.13;3.4.1 from central in [default]
com.datastax.spark#spark-cassandra-connector_2.13;3.4.1 from central in [default]
com.github.spotbugs#spotbugs-annotations;3.1.12 from central in [default]
com.github.stephenc.jcip#jcip-annotations;1.0-1 from central in [default]
com.google.code.findbugs#jsr305;3.0.2 from central in [default]
com.thoughtworks.paranamer#paranamer;2.8 from central in [default]
com.typesafe#config;1.4.1 from central in [default]
commons-logging#commons-logging;1.1.3 from central in [default]
io.dropwizard.metrics#metrics-core;4.1.18 from central in [default]
org.apache.commons#commons-lang3;3.10 from central in [default]
org.apache.commons#commons-pool2;2.11.1 from central in [default]
org.apache.hadoop#hadoop-client-api;3.3.4 from central in [default]
org.apache.hadoop#hadoop-client-runtime;3.3.4 from central in [default]
org.apache.kafka#kafka-clients;3.3.2 from central in [default]
org.apache.spark#spark-sql-kafka-0-10_2.13;3.4.1 from central in [default]
org.apache.spark#spark-token-provider-kafka-0-10_2.13;3.4.1 from central in [default]
org.hdrhistogram#HdrHistogram;2.1.12 from central in [default]
org.lz4#lz4-java;1.8.0 from central in [default]
org.reactivestreams#reactive-streams;1.0.3 from central in [default]
org.scala-lang#scala-reflect;2.13.11 from central in [default]
org.scala-lang.modules#scala-collection-compat_2.13;2.11.0 from central in [default]
org.scala-lang.modules#scala-parallel-collections_2.13;1.0.4 from central in [default]
org.slf4j#slf4j-api;2.0.6 from central in [default]
org.xerial.snappy#snappy-java;1.1.10.1 from central in [default]
:: evicted modules:
org.slf4j#slf4j-api;1.7.26 by [org.slf4j#slf4j-api;2.0.6] in [default]
com.google.code.findbugs#jsr305;3.0.0 by [com.google.code.findbugs#jsr305;3.0.2] in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 31 | 0 | 0 | 2 || 29 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-8a294b11-ad47-470c-8f84-295222264cee
confs: [default]
0 artifacts copied, 29 already retrieved (0kB/54ms)
24/05/12 18:54:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
WARNING:root:kafka dataframe could not be created because: An error occurred while calling o36.load.
: java.lang.NoClassDefFoundError: scala/$less$colon$less
at org.apache.spark.sql.kafka010.KafkaSourceProvider.org$apache$spark$sql$kafka010$KafkaSourceProvider$$validateStreamOptions(KafkaSourceProvider.scala:338)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:71)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:233)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:35)
at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:168)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:144)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: scala.$less$colon$less
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
... 20 more
Traceback (most recent call last):
File "/home/fran/PycharmProjects/data-engineering/spark_stream.py", line 143, in <module>
selection_df = create_selection_df_from_kafka(spark_df)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/fran/PycharmProjects/data-engineering/spark_stream.py", line 129, in create_selection_df_from_kafka
sel = spark_df.selectExpr("CAST(value AS STRING)") \
^^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'selectExpr'
我认为这个“**ClassNotFoundException for
scala.$less$colon$less"
**是Spark使用的Scala版本的问题:
Spark version 3.4.1
/_/
Using Scala version 2.12.17, OpenJDK 64-Bit Server VM, 11.0.22
这可能与 Kafka 使用的 Scala 版本不同(注意我使用的是 python 3.11 的 kafka-python 包)。
这可能是问题所在吗?如何对齐两个 Scala 版本?
根据输出
Using Scala version 2.12.17
,您需要使用 Spark 依赖项与 2.12
而不是 2.13
,例如 Cassandra 和 Kafka 连接器
注意我正在使用 python 3.11 的 kafka-python 包
这是一个原生 Python 包,因此不使用 Scala