pyspark.errors.exceptions.base.PySparkRuntimeError:[JAVA_GATEWAY_EXITED] Java网关进程在发送其端口号之前退出

问题描述 投票:0回答:1

我已经下载了 Spark (https://spark.apache.org/downloads.html) 并通过 pip 命令安装了 pyspark。我还尝试了几乎所有在线建议的解决方案,但问题仍然存在并且仍未解决。有人能帮我解决这个问题吗?我已在下面添加所有详细信息:

Spark 版本=spark-3.5.3-bin-hadoop3.tgz

Pyspark Scala 版本: enter image description here

.zshrc: enter image description here

回显$JAVA_HOME: enter image description here

pyspark 可通过终端访问: enter image description here

spark_setup.py: enter image description here enter image description here enter image description here

spark 初始化会话:

def init_spark_session():
    return SparkSession.builder \
        .appName("sensorForge") \
        .master("local[*]") \
        .config("spark.jars.packages", "file:///Users/perinban/spark/jars/spark-sql-kafka-0-10_2.13-3.5.3,file:///Users/perinban/spark/jars/spark-avro_2.13-3.5.3") \
        .config("spark.executor.extraLibrary", "file:///Users/perinban/spark/jars/spark-sql-kafka-0-10_2.13-3.5.3,file:///Users/perinban/spark/jars/spark-avro_2.13-3.5.3") \
        .config("spark.driver.extraClassPath", "file:///Users/perinban/spark/jars/spark-sql-kafka-0-10_2.13-3.5.3,file:///Users/perinban/spark/jars/spark-avro_2.13-3.5.3") \
        .getOrCreate()

jar 文件: enter image description here

尝试通过气流运行时出错: enter image description here

JAVA_HOME 在气流中也被正确调用: enter image description here

apache-spark pyspark
1个回答
0
投票

我发现了这个问题:将其定义为 python_callable 会使编译器将其视为 Python 应用程序而不是 Spark 提交应用程序。

fileMove_task = PythonOperator(
    task_id='fileoutbound',
    python_callable=process_files,
    dag=dag
)

目前,我已经实现了一个临时解决方案,通过外部子进程命令将其作为单独的文件运行,使用spark-submit,如下所示:

def trigger_spark_job(path, filename):
    try:

        combined_filename = os.path.join(base_path, path, f'{filename}.py')

        # Command to run the Spark job via spark-submit
        subprocess.run([
            "spark-submit",
            combined_filename
        ], check=True)
        print("Spark job triggered successfully.")
    except subprocess.CalledProcessError as e:
        print(f"Error triggering Spark job: {e}")

如果有人有永久的解决方案,我愿意接受建议并很高兴尝试一下。

© www.soinside.com 2019 - 2024. All rights reserved.