错误 SparkContext:无法添加文件 java.io.FileNotFoundException:找不到 Spark 的 Jar

问题描述 投票:0回答:1

请根据我使用的代码帮我修复上述错误

proccesing_data.py代码用于使用spark-streaming处理数据

import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, explode, to_date, first, last, max, min, avg, sum, lit
from pyspark.sql.types import StructType, StructField, ArrayType, StringType, DoubleType, LongType, IntegerType
import time


# Cấu hình logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


# Các thông số kết nối
kafka_topic_name = "finnhub_data"
kafka_bootstrap_servers = "localhost:9092"


mysql_host_name = "localhost"
mysql_port_no = "3306"
mysql_database_name = "finnhub_processed"
mysql_driver_class = "com.mysql.cj.jdbc.Driver"  # Cập nhật driver class
mysql_table_name = "processed_trade_data"
mysql_user_name = "tuanle"
mysql_password = "123456"
mysql_jdbc_url = f"jdbc:mysql://{mysql_host_name}:{mysql_port_no}/{mysql_database_name}"

cassandra_host_name = "localhost"
cassandra_port_no = "9042"
cassandra_keyspace_name = "finnhub_data"
cassandra_table_name = "raw_trade_data"


def save_to_cassandra(df, epoch_id):
   logger.info(f"Saving batch {epoch_id} to Cassandra")
  
   df_to_save = df.select(
       col("symbol"),
       (col("trade_time") / 1000).cast("timestamp").alias("trade_time"),
       col("price"),
       col("volume"),
       col("conditions"),
       col("company")
   )
  
   df_to_save.write \
       .format("org.apache.spark.sql.cassandra") \
       .mode("append") \
       .options(table=cassandra_table_name, keyspace=cassandra_keyspace_name) \
       .save()
  
   logger.info(f"Batch {epoch_id} saved to Cassandra successfully")


def process_and_save_to_mysql(spark):
   logger.info("Starting to process data from Cassandra and save to MySQL")


   df_cassandra = spark.read \
       .format("org.apache.spark.sql.cassandra") \
       .options(table=cassandra_table_name, keyspace=cassandra_keyspace_name) \
       .load()


   df_processed = df_cassandra \
       .withColumn("trade_date", to_date(col("trade_time"))) \
       .groupBy("symbol", "trade_date") \
       .agg(
           first("price").alias("open_price"),
           last("price").alias("close_price"),
           max("price").alias("high_price"),
           min("price").alias("low_price"),
           avg("price").alias("avg_price"),
           sum("volume").alias("total_volume")
       ) \
       .withColumn("processed_time", lit(time.strftime("%Y-%m-%d %H:%M:%S")))


   df_processed.write \
       .jdbc(url=mysql_jdbc_url,
             table=mysql_table_name,
             mode="append",
             properties={
                 "user": mysql_user_name,
                 "password": mysql_password,
                 "driver": mysql_driver_class
             })


   logger.info("Data processed and saved to MySQL successfully")


if __name__ == "__main__":
   logger.info("Data Processing Application Started ...")


   # Khởi tạo Spark Session và định nghĩa cấu hình cần thiết
   spark = SparkSession.builder \
       .appName("PySpark Structured Streaming with Kafka, Cassandra, and MySQL") \
       .config("spark.streaming.stopGracefullyOnShutdown", True) \
       .config("spark.jars", f"file:///home/tuanle/areaapache/software/spark-3.5.2-bin-hadoop3/jars/jsr305-3.0.0.jar,file:///home/tuanle/areaapache/software/spark-3.5.2-bin-hadoop3/jars/spark-cassandra-connector_2.12-3.5.1.jar,file:///home/tuanle/areaapache/software/spark-3.5.2-bin-hadoop3/jars/spark-sql-kafka-0-10_2.12-3.5.2.jar,file:///usr/share/java/mysql-connector-java.jar,file:///home/tuanle/areaapache/software/spark-3.5.2-bin-hadoop3/jars/kafka-clients-3.8.0.jar") \
       .config("spark.sql.shuffle.partitions", 4) \
       .config("spark.cassandra.connection.host", cassandra_host_name) \
       .config("spark.cassandra.connection.port", cassandra_port_no) \
       .config("spark.sql.mysql.host", mysql_host_name) \
       .config("spark.sql.mysql.port", mysql_port_no) \
       .config("spark.cassandra.connection.keep_alive_ms", "60000") \
       .master("local[4]") \
       .getOrCreate()


   spark.sparkContext.setLogLevel("ERROR")


   logger.info("Spark Session initialized successfully")


   finnhub_df = spark \
       .readStream \
       .format("kafka") \
       .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
       .option("subscribe", kafka_topic_name) \
       .option("startingOffsets", "latest") \
       .load()


   logger.info("Printing Schema of finnhub_df:")
   finnhub_df.printSchema()


   finnhub_schema = StructType([
       StructField("data", ArrayType(StructType([
           StructField("c", ArrayType(StringType())),
           StructField("p", DoubleType()),
           StructField("s", StringType()),
           StructField("t", LongType()),
           StructField("v", IntegerType()),
           StructField("company", StringType())
       ]))),
       StructField("type", StringType())
   ])


   finnhub_df1 = finnhub_df.selectExpr("CAST(value AS STRING)", "timestamp")
   finnhub_df2 = finnhub_df1.select(from_json(col("value"), finnhub_schema).alias("finnhub"), "timestamp")
   finnhub_df3 = finnhub_df2.select("finnhub.data", "timestamp")
   finnhub_df4 = finnhub_df3.select(explode("data").alias("trade_data"), "timestamp")
   finnhub_df5 = finnhub_df4.select(
       col("trade_data.s").alias("symbol"),
       col("trade_data.t").alias("trade_time"),
       col("trade_data.p").alias("price"),
       col("trade_data.v").alias("volume"),
       col("trade_data.c").alias("conditions"),
       col("trade_data.company").alias("company"),
       col("timestamp")
   )


   query_cassandra = finnhub_df5 \
       .writeStream \
       .trigger(processingTime='15 seconds') \
       .outputMode("append") \
       .foreachBatch(save_to_cassandra) \
       .start()


   try:
       while True:
           time.sleep(300)  # Wait for 5 minutes
           process_and_save_to_mysql(spark)
   except KeyboardInterrupt:
       logger.info("Application interrupted. Stopping streams...")
       query_cassandra.stop()
       spark.stop()
       logger.info("Application stopped successfully")
   except Exception as e:
       logger.error(f"An error occurred: {str(e)}")
       query_cassandra.stop()
       spark.stop()
       logger.info("Application stopped due to an error")

运行spark_streaming后遇到错误:

以下是日志中遇到的错误的简要总结:

  1. Missing JAR files:在指定的Spark目录中找不到多个JAR文件,包括:

    • jsr305-3.0.0.jar

    • spark-cassandra-connector_2.13-3.5.1.jar

    • spark-sql-kafka-0-10_2.13-3.5.2.jar

    • spark-streaming-kafka-0-10_2.13-3.5.2.jar

    • kafka-clients-3.8.0.jar

  2. Kafka 相关错误:java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater 此错误表明 Kafka 相关依赖项丢失或未正确加载。

  3. Scala 相关错误:java.lang.NoClassDefFoundError: scala/$less$colon$less 此错误表明 Scala 依赖项存在问题,可能是由于版本不兼容造成的。

  4. Cassandra 相关错误:尝试从 Cassandra 加载数据时发生错误:java.lang.NoClassDefFoundError:scala/$less$colon$less

2024-10-01 10:11:54,931 - 信息 - 应用程序因错误而停止

2024-10-01 10:11:54,931 - 信息 - 关闭客户端服务器连接

我尝试下载所有jar:jsr305-3.0.0.jar,spark-cassandra-connector_2.12-3.5.1.jar,spark-sql-kafka-0-10_2.12-3.5.2.jar,kafka -clients-3.8.0.jar

然后将 JAR 文件移动到 Spark 的 jars 文件夹中并更新 CLASSPATH:

我运行命令:spark-shell --version结果如下:SPARK版本3.5.2使用Scala版本2.12.18,OpenJDK 64位服务器VM,1.8.0_422

我下载的设置: Cassandra 4.1.6、MySQL、Hadoop-3.4.0、 mysql-connector-j-9.0.0、 kafka_2.12-3.8.0、 sbt-1.10 .2,spark-3.5.2-bin-hadoop3,scala-2.13.14

但是运行 Spark 时我仍然收到错误。如何修复它。请详细帮助我。谢谢大家

如果可以的话,我可以重新安装什么?请给我建议,以避免删除所有内容并再次下载。请帮助我

python apache-spark-sql spark-streaming apache-kafka-connect spark-cassandra-connector
1个回答
0
投票

首先,作为一个友好的、建设性的反馈:您的帖子太宽泛,并且确实提出了多个问题,因此很可能会被投票结束

其次,我会专门回答与Spark Cassandra连接器相关的问题。在应用程序中提供连接器 JAR 通常不起作用,因为这意味着您可能会错过提供所有依赖项。

相反,我们建议您将 Cassandra 连接器的 Maven 坐标提供给

--packages
spark.jars.packages
,以便连接器及其所有依赖项都放置在 Spark 驱动程序和执行程序的路径上。例如,使用以下命令启动 Spark shell:

$ spark-shell
    --conf spark.cassandra.connection.host=127.0.0.1 \
    --packages com.datastax.spark:spark-cassandra-connector_2.12:3.5.1
    --conf spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions

同样适用于您的应用:

    spark = SparkSession.builder \
        ...
        .config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.12:3.5.1")
        ...

顺便说一句,如果我可以给你一些友好的建议,如果(1)你将你的问题分成多个单独的帖子,每个帖子只关注一个问题,并且(2)提供一个问题,那么你更有可能获得帮助。该组件的最小示例代码(不是您希望其他人为您排除故障的一大块代码)。

有关指导,请参阅如何提出好问题。干杯!

© www.soinside.com 2019 - 2024. All rights reserved.