PySpark - 尝试在 AWS Glue 中使用 Delta Lake 和 saveAsTable

问题描述 投票:0回答:1

我正在尝试在 AWS Glue 中体验 Delta 数据。我正在使用 Glue 4.0 版本创建 AWS Glue 作业。

在作业详细信息中我添加了两个作业参数:

--conf 值:spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog

--datalake-formats 值:delta

代码:

import os
import sys
from datetime import datetime
from pathlib import Path
import boto3

from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql import functions as F
from pyspark.storagelevel import StorageLevel
from pyspark.sql.types import *

from delta import *
from delta.tables import *

# Define contexts and setting log level
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
logger = glueContext.get_logger()
sc = SparkContext.getOrCreate()
sc.setLogLevel("ERROR")

# Spark configurations
spark.conf.set("spark.sql.orc.filterPushdown", "true")
spark.conf.set("spark.sql.parquet.filterPushdown", "true")
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
spark.conf.set("spark.sql.crossJoin.enabled", "true")
#spark.conf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

job = Job(glueContext)
job.init('delta-lake-exp', args)
glue_client = boto3.client('glue')

EXECUTION_DATE = datetime.today().strftime('%Y-%m-%d')
db_name = "test_db"
final_table = "delta_table"


#DUMMY DATA THAT WILL BE REPLACED FOR A DYNAMIC SOURCE
data = [(1,'Lily'),(2,'Manuel'),(3,'Emily')]

schema = StructType([StructField("id",IntegerType(),False),StructField("name",StringType(),True)])

source_df = spark.createDataFrame(data,schema)

source_df = source_df.withColumn('country', F.lit('UK'))

try:
    response = glue_client.get_table(
        DatabaseName='test_db',
        Name=final_table)
    created_table = True
except:
    created_table = False

## MAIN GOAL HERE IS TO CREATE A FLAG TO GET THE UPDATE ROWS: NC - NOT CHANGED; I- INSERTED; U-UPDATED
source_df = source_df.withColumn('flag', F.lit('NC')).withColumn('modified_at',F.lit(EXECUTION_DATE))

if created_table:
    #not time to develop the merge logic yet
    print('test')
else:
# FIRST EXECUTION ALL ROWS ARE INSERTED
    final_df = source_df.withColumn('flag',F.lit('I'))

db_tab = '`' + f"{db_name}" + '`' + '.' + f"{final_table}"

additional_options = {
    "path": "s3://bucket_test_dl_0065/delta_table/"
}

final_df.repartition(1).write \
    .format("delta") \
    .options(**additional_options) \
    .mode("append") \
    .partitionBy("country") \
    .saveAsTable(db_tab)

job.commit()

我关注了 AWS 页面 -> https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-format-delta-lake.html

错误总是一样的

File "/tmp/delta-lake-exp.py", line, in <module>
    .saveAsTable(db_tab)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line, in saveAsTable
    self._jwrite.saveAsTable(name)
  File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line, in __call__
    return_value = get_return_value(
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line, in deco
    raise converted from None
pyspark.sql.utils.IllegalArgumentException: Can not create a Path from an empty string

我已经尝试在执行 AWS Glue 作业之前手动创建表,但错误保持不变。

有人遇到同样的问题吗?

谢谢

pyspark aws-glue delta-lake
1个回答
0
投票

我通过使用 save 直接将数据保存到 s3,然后创建并运行一个爬虫来爬取 s3 的所有子文件夹(文件夹和对象都应该在 s3 中生成),从而解决了类似的问题,并出现相同的错误,以便填充我的数据目录并在 Athena 中进行查询。

© www.soinside.com 2019 - 2024. All rights reserved.