我正在使用AWS Glue处理一些S3 TSV到S3 Parquet。由于非UTF-8传入文件,我被迫使用DataFrames而不是DynamicFrames来处理我的数据(这是一个已知的问题,没有任何工作,DynamicFrames完全失败,任何非UTF8字符)。这似乎也意味着我不能在Glue中使用Job Bookmarks来跟踪我已处理的S3 TSV文件。
我的代码看起来像这样:
# pylint: skip-file
# flake8: noqa
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.types import *
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import split
from awsglue.dynamicframe import DynamicFrame
# @params: [JOB_NAME, s3target]
args = getResolvedOptions(sys.argv, ['JOB_NAME', 's3target', 's3source'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Define massive list of fields in the schema
fields = [
StructField("accept_language", StringType(), True),
StructField("browser", LongType(), True),
.... huge list ...
StructField("yearly_visitor", ShortType(), True),
StructField("zip", StringType(), True)
]
schema = StructType(fields)
# Read in data using Spark DataFrame and not an AWS Glue DynamicFrame to avoid issues with non-UTF8 characters
df0 = spark.read.format("com.databricks.spark.csv").option("quote", "\"").option("delimiter", u'\u0009').option("charset", 'utf-8').schema(schema).load(args['s3source'] + "/*.tsv.gz")
# Remove all rows that are entirely nulls
df1 = df0.dropna(how = 'all')
# Generate a partitioning column
df2 = df1.withColumn('date', df1.date_time.cast('date'))
# Write out in parquet format, partitioned by date, to the S3 location specified in the arguments
ds2 = df2.write.format("parquet").partitionBy("date").mode("append").save(args['s3target'])
job.commit()
我的问题是 - 每次运行时都没有作业书签,它会一遍又一遍地处理相同的s3文件。如何将源s3存储桶中的已处理文件移动到子文件夹或其他内容,或者避免双重处理文件?
我不确定这里的诀窍是什么,Spark是一个并行系统,甚至不知道文件是什么。我想我可以使用Python Shell作业类型创建第二个Glue作业并在之后立即删除传入的文件,但即使这样我也不确定要删除哪些文件等。
谢谢,
克里斯
要从输入源前缀标记已处理的文件,您必须使用boto3
(或直接awscli)来移动文件或删除它们。
要确定要处理的文件,您可以通过两种不同的方式继续:
args['s3source'] + "/*.tsv.gz"
解析你的文件glob s3client.list_objects()
。您可以提供一系列已解析的文件,而不是一个glob到spark.read.load
。import boto3
client = boto3.client('s3')
# get all the available files
# Note: if you expect a lot of files, you need to iterate on the pages of results
response = client.list_objects_v2(Bucket=your_bucket_name,Prefix=your_path_prefix)
files=['s3://'+your_bucket_name+obj['Key'] for obj in response['Contents'] if obj.endswith('tsv.gz')]
... initialize your job as before ...
df0 = df0 = spark.read.format("com.databricks.spark.csv").option("quote", "\"").option("delimiter", u'\u0009').option("charset", 'utf-8').schema(schema).load(files)
... do your work as before ...
... process your files with pyspark as before...
# retrieve the tracked files from the initial DataFrame
# you need to access the java RDD instances to get to the partitions information
# The file URIs will be in the following format: u's3://mybucket/mypath/myfile.tsv.gz'
files = []
for p in df0.rdd._jrdd.partitions():
files.append([f.filePath() for f in p.files().array()])
获得文件列表后,删除,重命名或将它们添加到元数据存储区以在下一个作业中过滤它们非常简单。
例如,要删除它们:
# initialize a S3 client if not already done
from urlparse import urlparse # python 2
import boto3
client = boto3.client('s3')
# do what you want with the uris, for example delete them
for uri in files:
parsed = urlparse(uri)
client.delete_object(Bucket=parsed.netloc, Key=parsed.path)
如果您不关心再次处理相同的源文件(从时间限制)并且您的用例是在目标中没有重复数据,您可以考虑在编写数据帧时将保存模式更新为“覆盖”
https://spark.apache.org/docs/2.1.1/api/java/org/apache/spark/sql/DataFrameWriter.html
我用于通过AWS胶开发的ETL过程之一的解决方案是首先列出并使用boto3 API将s3中的文件移动到“WORK”文件夹。此过程不应该花费任何时间,因为您只更改s3对象名称而不是任何物理移动。
完成上述步骤后,您可以使用“WORK”文件夹作为SPARK数据框的输入,同时新文件可以继续推送到您的其他s3文件夹。
我不确定您的用例,但我们使用当前系统日期时间来创建“WORK”文件夹,以便我们可以调查或重新运行任何文件,如果我们发现过程或数天后我们加载的数据有任何问题。
最终工作代码:
# pylint: skip-file
# flake8: noqa
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.types import *
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import split
import boto3
from urlparse import urlparse
# Read arguments
args = getResolvedOptions(sys.argv, ['JOB_NAME', 's3target', 's3source'])
# Initialise boto3
client = boto3.client('s3')
# Get all the available files
response = client.list_objects_v2(Bucket = "xxx")
files = [ "s3://xxx/" + obj['Key'] for obj in response['Contents'] if obj['Key'].endswith('.tsv.gz') ]
# Initialise the glue job
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Define massive list of fields in the schema
fields = [
StructField("accept_language", StringType(), True),
StructField("browser", LongType(), True),
.... huge list ...
StructField("yearly_visitor", ShortType(), True),
StructField("zip", StringType(), True)
]
schema = StructType(fields)
# Read in data using Spark DataFrame and not an AWS Glue DynamicFrame to avoid issues with non-UTF8 characters
df0 = spark.read.format("com.databricks.spark.csv").option("quote", "\"").option("delimiter", u'\u0009').option("charset", 'utf-8').schema(schema).load(files)
# Remove all rows that are entirely nulls
df1 = df0.dropna(how = 'all')
# Generate a partitioning column
df2 = df1.withColumn('date', df1.date_time.cast('date'))
# Write out in parquet format, partitioned by date, to the S3 location specified in the arguments
ds2 = df2.write.format("parquet").partitionBy("date").mode("append").save(args['s3target'])
# retrieve the tracked files from the initial DataFrame
# you need to access the java RDD instances to get to the partitions information
# The file URIs will be in the following format: u's3://mybucket/mypath/myfile.tsv.gz'
files = []
for p in df0.rdd._jrdd.partitions():
files.extend([f.filePath() for f in p.files().array()])
# Move files to the processed folder
for uri in files:
parsed = urlparse(uri)
client.copy_object(CopySource = {'Bucket': parsed.netloc, 'Key': parsed.path.lstrip('/')}, Bucket = parsed.netloc, Key = 'processed' + parsed.path)
client.delete_object(Bucket = parsed.netloc, Key = parsed.path.lstrip('/'))
job.commit()