我在尝试在 Airflow 容器中运行 spark-submit 作业时遇到错误,希望得到您的帮助。 我构建了两张图片——一张用于 Airflow,一张用于 Spark,并为每个应用程序运行 docker-compose 文件。然后,我运行
docker-compose
并在 Airflow UI 中配置 Spark 连接,方法是在连接设置中使用主节点 URI 定义主机,并确保那里的 conn-id 与 DAG 中使用的相匹配。
阅读日志,似乎 DAG 似乎无法找到负责连接到 GCS 的 JAR 文件 (gcs-connector-hadoop3-2.2.5.jar)。我无法确定这是路径问题(相对/绝对)、权限问题还是完全不同的问题。
以下是Spark Job
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
if __name__ == '__main__':
conf = SparkConf() \
.setMaster('local') \
.setAppName('test') \
.set("spark.jars", "/spark-lib/gcs-connector-hadoop3-2.2.5.jar") \
.set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
.set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", './.google/credentials/google_credentials.json')
sc = SparkContext(conf=conf.set("spark.files.overwrite", "true"))
hadoop_conf = sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", './.google/credentials/google_credentials.json')
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")
spark = SparkSession.builder \
.config(conf=sc.getConf()) \
.getOrCreate()
path = "gs://tfl-cycling/pq/"
df_test = spark.read.option("recursiveFileLookup", "true").parquet(path)
df_test.printSchema()
DAG:
from datetime import timedelta
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
import pendulum
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkFiles
from pyspark.sql import types
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import functions as F
import pandas as pd
import datetime
default_args = {
'owner': 'airflow',
'retry_delay': timedelta(minutes=5)
}
spark_dag = DAG(
dag_id = "test_1",
default_args=default_args,
schedule=None,
dagrun_timeout=timedelta(minutes=60),
description='use case of sparkoperator in airflow',
start_date = pendulum.today('UTC').add(days=-1)
)
Etl = SparkSubmitOperator(
application = "/opt/airflow/dags/example.py",
conn_id= 'spark_default',#'spark_local',
task_id='spark_submit_task_load',
dag=spark_dag
)
Etl
错误信息:
2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - 23/04/18 08:50:05 INFO Utils: Successfully started service 'SparkUI' on port 4040.
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - 23/04/18 08:50:05 ERROR SparkContext: Failed to add ./../gcs-connector-hadoop3-2.2.5.jar to Spark environment
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - java.io.FileNotFoundException: Jar /opt/gcs-connector-hadoop3-2.2.5.jar not found
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at org.apache.spark.SparkContext.addLocalJarFile$1(SparkContext.scala:1968)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at org.apache.spark.SparkContext.addJar(SparkContext.scala:2023)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at org.apache.spark.SparkContext.$anonfun$new$12(SparkContext.scala:507)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at org.apache.spark.SparkContext.$anonfun$new$12$adapted(SparkContext.scala:507)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at org.apache.spark.SparkContext.<init>(SparkContext.scala:507)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at py4j.Gateway.invoke(Gateway.java:238)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - at java.base/java.lang.Thread.run(Thread.java:829)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - 23/04/18 08:50:05 INFO Executor: Starting executor ID driver on host 8923728d9f5c
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - 23/04/18 08:50:05 INFO Executor: Starting executor with user classpath (userClassPathFirst = false): ''
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - 23/04/18 08:50:05 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 35925.
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - 23/04/18 08:50:05 INFO NettyBlockTransferService: Server created on 8923728d9f5c:35925
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - 23/04/18 08:50:05 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - 23/04/18 08:50:05 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 8923728d9f5c, 35925, None)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - 23/04/18 08:50:05 INFO BlockManagerMasterEndpoint: Registering block manager 8923728d9f5c:35925 with 434.4 MiB RAM, BlockManagerId(driver, 8923728d9f5c, 35925, None)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - 23/04/18 08:50:05 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 8923728d9f5c, 35925, None)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - 23/04/18 08:50:05 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 8923728d9f5c, 35925, None)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - /home/***/.local/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/context.py:317: FutureWarning: Python 3.7 support is deprecated in Spark 3.4.
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - warnings.warn("Python 3.7 support is deprecated in Spark 3.4.", FutureWarning)
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - 23/04/18 08:50:05 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.
[2023-04-18, 08:50:05 UTC] {spark_submit.py:490} INFO - 23/04/18 08:50:05 INFO SharedState: Warehouse path is 'file:/opt/***/spark-warehouse'.
[2023-04-18, 08:50:06 UTC] {spark_submit.py:490} INFO - 23/04/18 08:50:06 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: gs://tfl-cycling/pq/.
[2023-04-18, 08:50:06 UTC] {spark_submit.py:490} INFO - java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem not found
我尝试将
spark.jars
值定义为绝对路径和相对路径
我试图提供额外的权限来读取文件。
我也尝试过使用 "spark.jars.packages", "com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.5"
而不是“本地”文件,但在这里我认为我的实现可能需要对如何使用脚本正确设置配置进行额外研究。
最后,我尝试直接将作业提交到 Spark 容器,并成功运行了您在上面看到的作业(这更像是一个示例)。
我对这份工作的期望是能够读取存储在 GCS 上的镶木地板文件,并使用 Spark 运行一些分析和转换,但首先只能连接到 GCS 并读取那里的文件。
已解决 - 当我将 JAR 文件复制到图像中时,我没有在相关服务中正确安装该卷(在本例中为 airflow-scheduler)。一旦我将挂载添加到 docker-compose 文件中的服务,该文件就被识别了。 旁注,之前在调试问题时,我使用
docker cp
手动将文件复制到容器中,但是,尽管命令有效,但气流似乎仍然无法访问该文件 - 因此他也需要设置一个挂载手动文件/目录。