如何使用数据目录表检索在 AWS Glue Visual ETL 中读取的 CSV 文件的文件名并处理单个文件的验证错误?

问题描述 投票:0回答:1

我正在使用 AWS Glue Visual Studio ETL 处理存储在 S3 存储桶中的 CSV 文件。这些文件在 Glue 数据目录中注册,并在 ETL 作业期间作为单个 DynamicFrame 读取。我想要:

- Retrieve the file names for the source files being processed.
- Perform data validation (e.g., schema checks or custom rules) on the data.
- Localize errors to the specific file that caused the validation failure.
- Move the file with errors to a failed folder in S3 for further inspection, while continuing to process valid files.

但是,由于输入数据被合并到单个 DynamicFrame 中,因此我无法直接将验证错误与特定文件关联起来。 问题:

- How can I extract the file names for each row or batch of data in the DynamicFrame?
- Is there a way to run validation on a per-file basis in Glue Visual ETL?
- How can I isolate a file with errors and move it to a failed folder programmatically in Glue?

任何有关如何实现这一目标的指导或示例将不胜感激!

validation amazon-s3 etl aws-glue
1个回答
0
投票
def add_file_name (glueContext, dfc) -> DynamicFrameCollection:
  from awsglue.dynamicframe import DynamicFrame, DynamicFrameCollection
  from pyspark.sql.functions import input_file_name, element_at, split

  transformed_frames = {}
  # Iterate over the key-value pairs in the DynamicFrameCollection
  for key in dfc.keys():
      dynamic_frame = dfc.select(key)
      spark_df = dynamic_frame.toDF()
      spark_df = spark_df.withColumn("file_name", input_file_name())
      transformed_dynamic_frame = DynamicFrame.fromDF(spark_df, glueContext, f"{key}_transformed")
      transformed_frames[key] = transformed_dynamic_frame
  return DynamicFrameCollection(transformed_frames, glueContext)
© www.soinside.com 2019 - 2024. All rights reserved.