我正在尝试在 AWS Glue 中体验 Delta 数据。我正在使用 Glue 4.0 版本创建 AWS Glue 作业。
在作业详细信息中我添加了两个作业参数:
--conf 值:spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
--datalake-formats 值:delta
代码:
import os
import sys
from datetime import datetime
from pathlib import Path
import boto3
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql import functions as F
from pyspark.storagelevel import StorageLevel
from pyspark.sql.types import *
from delta import *
from delta.tables import *
# Define contexts and setting log level
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
logger = glueContext.get_logger()
sc = SparkContext.getOrCreate()
sc.setLogLevel("ERROR")
# Spark configurations
spark.conf.set("spark.sql.orc.filterPushdown", "true")
spark.conf.set("spark.sql.parquet.filterPushdown", "true")
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
spark.conf.set("spark.sql.crossJoin.enabled", "true")
#spark.conf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
job = Job(glueContext)
job.init('delta-lake-exp', args)
glue_client = boto3.client('glue')
EXECUTION_DATE = datetime.today().strftime('%Y-%m-%d')
db_name = "test_db"
final_table = "delta_table"
#DUMMY DATA THAT WILL BE REPLACED FOR A DYNAMIC SOURCE
data = [(1,'Lily'),(2,'Manuel'),(3,'Emily')]
schema = StructType([StructField("id",IntegerType(),False),StructField("name",StringType(),True)])
source_df = spark.createDataFrame(data,schema)
source_df = source_df.withColumn('country', F.lit('UK'))
try:
response = glue_client.get_table(
DatabaseName='test_db',
Name=final_table)
created_table = True
except:
created_table = False
## MAIN GOAL HERE IS TO CREATE A FLAG TO GET THE UPDATE ROWS: NC - NOT CHANGED; I- INSERTED; U-UPDATED
source_df = source_df.withColumn('flag', F.lit('NC')).withColumn('modified_at',F.lit(EXECUTION_DATE))
if created_table:
#not time to develop the merge logic yet
print('test')
else:
# FIRST EXECUTION ALL ROWS ARE INSERTED
final_df = source_df.withColumn('flag',F.lit('I'))
db_tab = '`' + f"{db_name}" + '`' + '.' + f"{final_table}"
additional_options = {
"path": "s3://bucket_test_dl_0065/delta_table/"
}
final_df.repartition(1).write \
.format("delta") \
.options(**additional_options) \
.mode("append") \
.partitionBy("country") \
.saveAsTable(db_tab)
job.commit()
我关注了 AWS 页面 -> https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format-delta-lake.html
错误总是一样的
File "/tmp/delta-lake-exp.py", line, in <module>
.saveAsTable(db_tab)
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line, in saveAsTable
self._jwrite.saveAsTable(name)
File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line, in __call__
return_value = get_return_value(
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line, in deco
raise converted from None
pyspark.sql.utils.IllegalArgumentException: Can not create a Path from an empty string
我已经尝试在执行 AWS Glue 作业之前手动创建表,但错误保持不变。
有人遇到同样的问题吗?
谢谢
我通过使用 save 直接将数据保存到 s3,然后创建并运行一个爬虫来爬取 s3 的所有子文件夹(文件夹和对象都应该在 s3 中生成),从而解决了类似的问题,并出现相同的错误,以便填充我的数据目录并在 Athena 中进行查询。