我正在使用 AWS Glue Visual Studio ETL 处理存储在 S3 存储桶中的 CSV 文件。这些文件在 Glue 数据目录中注册,并在 ETL 作业期间作为单个 DynamicFrame 读取。我想要:
- Retrieve the file names for the source files being processed.
- Perform data validation (e.g., schema checks or custom rules) on the data.
- Localize errors to the specific file that caused the validation failure.
- Move the file with errors to a failed folder in S3 for further inspection, while continuing to process valid files.
但是,由于输入数据被合并到单个 DynamicFrame 中,因此我无法直接将验证错误与特定文件关联起来。 问题:
- How can I extract the file names for each row or batch of data in the DynamicFrame?
- Is there a way to run validation on a per-file basis in Glue Visual ETL?
- How can I isolate a file with errors and move it to a failed folder programmatically in Glue?
任何有关如何实现这一目标的指导或示例将不胜感激!
def add_file_name (glueContext, dfc) -> DynamicFrameCollection:
from awsglue.dynamicframe import DynamicFrame, DynamicFrameCollection
from pyspark.sql.functions import input_file_name, element_at, split
transformed_frames = {}
# Iterate over the key-value pairs in the DynamicFrameCollection
for key in dfc.keys():
dynamic_frame = dfc.select(key)
spark_df = dynamic_frame.toDF()
spark_df = spark_df.withColumn("file_name", input_file_name())
transformed_dynamic_frame = DynamicFrame.fromDF(spark_df, glueContext, f"{key}_transformed")
transformed_frames[key] = transformed_dynamic_frame
return DynamicFrameCollection(transformed_frames, glueContext)