错误SparkContext:无法添加spark-streaming-kafka-0-10_2.13-3.5.2.jar

问题描述 投票:0回答:1
ERROR SparkContext: Failed to add home/areaapache/software/spark-3.5.2-bin-hadoop3/jars/spark-streaming-kafka-0-10_2.13-3.5.2.jar to Spark environment     s
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, explode, to_date, first, last, max, min, avg,     sum, lit
from pyspark.sql.types import StructType, StructField, ArrayType, StringType, DoubleType, LongType,   IntegerType
import time
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


kafka_topic_name = "finnhub_data"
kafka_bootstrap_servers = "localhost:9092"

mysql_host_name = "localhost"
mysql_port_no = "3306"
mysql_database_name = "finnhub_processed"
mysql_driver_class = "com.mysql.cj.jdbc.Driver"  # Cập nhật driver class
mysql_table_name = "processed_trade_data"
mysql_user_name = "tuanle"
mysql_password = "123456"
mysql_jdbc_url = f"jdbc:mysql://{mysql_host_name}:{mysql_port_no}/{mysql_database_name}"

cassandra_host_name = "localhost"
cassandra_port_no = "9042"
cassandra_keyspace_name = "finnhub_data"
cassandra_table_name = "raw_trade_data"

spark_jar = "file:///home/tuanle/areaapache/software/spark-3.5.2-bin-hadoop3/jars"
def save_to_cassandra(df, epoch_id):
    logger.info(f"Saving batch {epoch_id} to Cassandra")
    
    df_to_save = df.select(
        col("symbol"),
        (col("trade_time") / 1000).cast("timestamp").alias("trade_time"),
        col("price"),
        col("volume"),
        col("conditions"),
        col("company")
    )
    
    df_to_save.write \
        .format("org.apache.spark.sql.cassandra") \
        .mode("append") \
        .options(table=cassandra_table_name, keyspace=cassandra_keyspace_name) \
        .save()
    
    logger.info(f"Batch {epoch_id} saved to Cassandra successfully")

def process_and_save_to_mysql(spark):
    logger.info("Starting to process data from Cassandra and save to MySQL")

    df_cassandra = spark.read \
        .format("org.apache.spark.sql.cassandra") \
        .options(table=cassandra_table_name, keyspace=cassandra_keyspace_name) \
        .load()

    df_processed = df_cassandra \
        .withColumn("trade_date", to_date(col("trade_time"))) \
        .groupBy("symbol", "trade_date") \
        .agg(
            first("price").alias("open_price"),
            last("price").alias("close_price"),
            max("price").alias("high_price"),
            min("price").alias("low_price"),
            avg("price").alias("avg_price"),
            sum("volume").alias("total_volume")
        ) \
        .withColumn("processed_time", lit(time.strftime("%Y-%m-%d %H:%M:%S")))

    df_processed.write \
        .jdbc(url=mysql_jdbc_url,
            table=mysql_table_name,
            mode="append",
            properties={
                "user": mysql_user_name,
                "password": mysql_password,
                "driver": mysql_driver_class
            })

    logger.info("Data processed and saved to MySQL successfully")

if __name__ == "__main__":
    logger.info("Data Processing Application Started ...")

    # Khởi tạo Spark Session và định nghĩa cấu hình cần thiết
    spark = SparkSession.builder \
        .appName("PySpark Structured Streaming with Kafka, Cassandra, and MySQL") \
        .config("spark.streaming.stopGracefullyOnShutdown", True) \
        .config("spark.jars", f"{spark_jar}/jsr305-3.0.0.jar,{spark_jar}/spark-cassandra-connector_2.13-3.5.1.jar,{spark_jar}/spark-sql-kafka-0-10_2.13-3.5.2.jar,{spark_jar}/spark-streaming-kafka-0-10_2.13-3.5.2.jar,file:///usr/share/java/mysql-connector-java.jar,{spark_jar}/kafka-clients-3.8.0.jar") \
        .config("spark.sql.shuffle.partitions", 4) \
        .config("spark.cassandra.connection.host", cassandra_host_name) \
        .config("spark.cassandra.connection.port", cassandra_port_no) \
        .config("spark.sql.mysql.host", mysql_host_name) \
        .config("spark.sql.mysql.port", mysql_port_no) \
        .config("spark.cassandra.connection.keep_alive_ms", "60000") \
        .master("local[4]") \
        .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    logger.info("Spark Session initialized successfully")

    finnhub_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", kafka_topic_name) \
        .option("startingOffsets", "latest") \
        .load()

    logger.info("Printing Schema of finnhub_df:")
    finnhub_df.printSchema()

    finnhub_schema = StructType([
        StructField("data", ArrayType(StructType([
            StructField("c", ArrayType(StringType())),
            StructField("p", DoubleType()),
            StructField("s", StringType()),
            StructField("t", LongType()),
            StructField("v", IntegerType()),
            StructField("company", StringType())
        ]))),
        StructField("type", StringType())
    ])

    finnhub_df1 = finnhub_df.selectExpr("CAST(value AS STRING)", "timestamp")
    finnhub_df2 = finnhub_df1.select(from_json(col("value"), finnhub_schema).alias("finnhub"), "timestamp")
    finnhub_df3 = finnhub_df2.select("finnhub.data", "timestamp")
    finnhub_df4 = finnhub_df3.select(explode("data").alias("trade_data"), "timestamp")
    finnhub_df5 = finnhub_df4.select(
        col("trade_data.s").alias("symbol"),
        col("trade_data.t").alias("trade_time"),
        col("trade_data.p").alias("price"),
        col("trade_data.v").alias("volume"),
        col("trade_data.c").alias("conditions"),
        col("trade_data.company").alias("company"),
        col("timestamp")
    )

    query_cassandra = finnhub_df5 \
        .writeStream \
        .trigger(processingTime='15 seconds') \
        .outputMode("append") \
        .foreachBatch(save_to_cassandra) \
        .start()

    try:
        while True:
            time.sleep(300)  # Wait for 5 minutes
            process_and_save_to_mysql(spark)
    except KeyboardInterrupt:
        logger.info("Application interrupted. Stopping streams...")
        query_cassandra.stop()
        spark.stop()
        logger.info("Application stopped successfully")
    except Exception as e:
        logger.error(f"An error occurred: {str(e)}")
        query_cassandra.stop()
        spark.stop()
        logger.info("Application stopped due to an error")
File "/home/tuanle/apachearea/software/spark-3.5.2-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o47.load. : java.lang.NoClassDefFoundError: scala/$less$colon$less         at org.apache.spark.sql.kafka010.KafkaSourceProvider.org$apache$spark$sql$kafka010$KafkaSourceProvider$$validateStreamOptions(KafkaSourceProvider.scala:338)         at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:71)         at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:233)         at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118)         at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118)         at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:36)         at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:169)         at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:145)         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)         at java.lang.reflect.Method.invoke(Method.java:498)         at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)         at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)         at py4j.Gateway.invoke(Gateway.java:282)         at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)         at py4j.commands.CallCommand.execute(CallCommand.java:79)         at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)         at py4j.ClientServerConnection.run(ClientServerConnection.java:106)         at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.ClassNotFoundException: scala.$less$colon$less         at java.net.URLClassLoader.findClass(URLClassLoader.java:387)         at java.lang.ClassLoader.loadClass(ClassLoader.java:418)         at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)         at java.lang.ClassLoader.loadClass(ClassLoader.java:351)         ... 20 more 2024-09-26 22:42:24,971 - INFO - Closing down clientserver connection`

我不知道为什么我已经下载了与版本匹配的所有必需的 jar 文件。 然后保存到jars文件夹下但是spark还是找不到。

python apache-spark pyspark spark-structured-streaming spark-kafka-integration
1个回答
0
投票

这是你的 Scala 版本错误。

例如,您可能有 Scala 2.12 的 Spark,但您的 kafka 包是为 2.13 编译的,这是不兼容的。

这是堆栈跟踪中出现此消息的常见原因

java.lang.NoClassDefFoundError: scala/$less$colon$less

© www.soinside.com 2019 - 2024. All rights reserved.