我已经下载了 Spark (https://spark.apache.org/downloads.html) 并通过 pip 命令安装了 pyspark。我还尝试了几乎所有在线建议的解决方案,但问题仍然存在并且仍未解决。有人能帮我解决这个问题吗?我已在下面添加所有详细信息:
Spark 版本=spark-3.5.3-bin-hadoop3.tgz
spark 初始化会话:
def init_spark_session():
return SparkSession.builder \
.appName("sensorForge") \
.master("local[*]") \
.config("spark.jars.packages", "file:///Users/perinban/spark/jars/spark-sql-kafka-0-10_2.13-3.5.3,file:///Users/perinban/spark/jars/spark-avro_2.13-3.5.3") \
.config("spark.executor.extraLibrary", "file:///Users/perinban/spark/jars/spark-sql-kafka-0-10_2.13-3.5.3,file:///Users/perinban/spark/jars/spark-avro_2.13-3.5.3") \
.config("spark.driver.extraClassPath", "file:///Users/perinban/spark/jars/spark-sql-kafka-0-10_2.13-3.5.3,file:///Users/perinban/spark/jars/spark-avro_2.13-3.5.3") \
.getOrCreate()
我发现了这个问题:将其定义为 python_callable 会使编译器将其视为 Python 应用程序而不是 Spark 提交应用程序。
fileMove_task = PythonOperator(
task_id='fileoutbound',
python_callable=process_files,
dag=dag
)
目前,我已经实现了一个临时解决方案,通过外部子进程命令将其作为单独的文件运行,使用spark-submit,如下所示:
def trigger_spark_job(path, filename):
try:
combined_filename = os.path.join(base_path, path, f'{filename}.py')
# Command to run the Spark job via spark-submit
subprocess.run([
"spark-submit",
combined_filename
], check=True)
print("Spark job triggered successfully.")
except subprocess.CalledProcessError as e:
print(f"Error triggering Spark job: {e}")
如果有人有永久的解决方案,我愿意接受建议并很高兴尝试一下。