我有多个名为 f1.zip、f2.zip、...f7.zip 的 zip 文件,每个文件包含大约 200k xml 文件,我使用此代码进行多处理以并行解压缩它们,但即使每个文件都非常小,读/写过程使得解压缩非常慢,我想知道是否存在使用 AWS EMR 来加速解压缩过程的方法。
import os, boto3, zipfile
import sys
import multiprocessing
from io import BytesIO
sys.path.insert(0, './src')
def list_s3_obj(bucket, prefix, extension):
s3 = boto3.resource('s3')
raw_bucket = s3.Bucket(bucket)
file_list = []
for file in raw_bucket.objects.filter(Prefix=prefix):
if os.path.join(file.key).endswith(extension):
file_list.append((bucket, file.key))
return file_list
def unzip_s3_obj(bucket, zip_key, unzip_key):
print('Unzip | ' + zip_key)
MB = 1024 ** 2
config = boto3.s3.transfer.TransferConfig(
use_threads = True,
multipart_threshold = 1024*MB,
max_concurrency = 4,
io_chunksize = 300*MB,
)
s3 = boto3.resource('s3')
buffer = BytesIO()
zip_obj = s3.Object(bucket_name=bucket ,key=zip_key)
zip_obj.download_fileobj(Fileobj=buffer, Config=config)
z = zipfile.ZipFile(buffer)
for filename in z.namelist():
print('Extracting | ' + filename)
file_info = z.getinfo(filename)
s3.meta.client.upload_fileobj(
z.open(filename),
Bucket=bucket,
Key=unzip_key + filename,
Config=config
)
current_key = 'Folder_spot'
bucket = 's3-bucket'
raw_key = f'{current_key}/zip/'
unzip_f1_key = f'{current_key}/unzip/f1/'
unzip_f2_key = f'{current_key}/unzip/f2/'
unzip_f3_key = f'{current_key}/unzip/f3/'
unzip_f4_key = f'{current_key}/unzip/f4/'
unzip_f5_key = f'{current_key}/unzip/f5/'
unzip_f6_key = f'{current_key}/unzip/f6/'
unzip_f7_key = f'{current_key}/unzip/f7/'
def unzip_s3():
file_list = list_s3_obj(bucket, raw_key, '.zip')
subprocess_map = [ (*e, unzip_f1_key) for e in file_list if 'f1' in e[1].lower()]
subprocess_map.extend([ (*e, unzip_f2_key) for e in file_list if 'f2' in e[1].lower()])
subprocess_map.extend([ (*e, unzip_f3_key) for e in file_list if 'f3' in e[1].lower()])
subprocess_map.extend([ (*e, unzip_f4_key) for e in file_list if 'f4' in e[1].lower()])
subprocess_map.extend([ (*e, unzip_f5_key) for e in file_list if 'f5' in e[1].lower()])
subprocess_map.extend([ (*e, unzip_f6_key) for e in file_list if 'f6' in e[1].lower()])
subprocess_map.extend([ (*e, unzip_f7_key) for e in file_list if 'f7' in e[1].lower()])
p = multiprocessing.Pool(7)
p.starmap(unzip_s3_obj, subprocess_map)
unzip_s3()
此外,我稍后使用Python中的库xmltodict将xml文件转换为json并将它们连接成20k记录,将文件数量从200k文件减少到10个文件,这加快了转换和稍后操作它们的过程,但是再次读取 200k xml 文件的过程非常慢,并且找不到使用 EMR 的代码示例或解决方案。
以下是scala中的解决方案。我以前在工作中必须这样做。所以我在这里提取相关部分。
需要记住的一些重要事项。
如果在您的工作流程中可能,请尝试对文件进行 tar.gz 而不是 zip。因为我只尝试过那种格式。
其次,将 rdd
numPartitionsProvided
重新分区到适当大的数量,以便使用所有执行器。
ZipFileReader.scala
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream
import org.apache.spark.input.PortableDataStream
import org.apache.spark.sql.SparkSession
import java.nio.charset.StandardCharsets
import scala.util.Try
object ZipFileReader {
def decode(bytes: Array[Byte]) =
new String(bytes, StandardCharsets.UTF_8)
def extractFilesCSV(ps: PortableDataStream, n: Int = 1024) = Try {
val tar = new TarArchiveInputStream(new GzipCompressorInputStream(ps.open))
Stream.continually(Option(tar.getNextTarEntry))
// Read until next exntry is null
.takeWhile(_.isDefined)
// flatten
.flatMap(x => x)
// Drop directories
.filter(!_.isDirectory)
.filter(ele => ele.getName.split("/")(1).endsWith(".csv"))
.map(e => {
println("This could be name", e.getName)
(e.getName.split("/")(1),
Stream.continually {
// Read n bytes
val buffer = Array.fill[Byte](n)(-1)
val i = tar.read(buffer, 0, n)
(i, buffer.take(i))}
// Take as long as we have read something
.takeWhile(_._1 > 0)
.map(_._2)
.flatten
.toArray)})
.toArray
}
def extractFilesJSON(ps: PortableDataStream, n: Int = 1024) = Try {
val tar = new TarArchiveInputStream(new GzipCompressorInputStream(ps.open))
Stream.continually(Option(tar.getNextTarEntry))
// Read until next exntry is null
.takeWhile(_.isDefined)
// flatten
.flatMap(x => x)
// Drop directories
.filter(!_.isDirectory)
.filter(ele => ele.getName.split("/")(1).endsWith(".json"))
.map(e => {
println("This could be name", e.getName)
(e.getName.split("/")(1),
Stream.continually {
// Read n bytes
val buffer = Array.fill[Byte](n)(-1)
val i = tar.read(buffer, 0, n)
(i, buffer.take(i))}
// Take as long as we have read something
.takeWhile(_._1 > 0)
.map(_._2)
.flatten
.toArray)})
.toArray
}
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().getOrCreate()
spark.sparkContext.setLogLevel("WARN")
val datafilename = "/zip-loc/myfirst.tar.gz"
val output_path = "/zip-loc/output1/"
val readInRawCSVFiles = spark.sparkContext.binaryFiles(datafilename).flatMapValues(x =>
extractFilesCSV(x).toOption).map(ele => ele._2.map(inner_ele => (inner_ele._1, decode(inner_ele._2) )))
val rawDataCSVParallelized = spark.sparkContext.parallelize(readInRawCSVFiles.first())
//make this a large number so that all your executors are utilized
val numPartitionsProvided = 3
println("Writing out the files to output_path")
rawDataCSVParallelized.repartition(numPartitions = numPartitionsProvided).map(ele => ele._2).saveAsTextFile(output_path)
// The following code is only for sanity checking and testing everything is going smoothly
// The following code can be removed from production file.
rawDataCSVParallelized.foreach(ele => println("fileName within .tar.gz", ele._1))
val countRawDataCSVParallelized = rawDataCSVParallelized.count()
println(s"Count of the csv input files = $countRawDataCSVParallelized")
val dataCollected = rawDataCSVParallelized.collect()
val username1_csv = dataCollected.filter(ele => ele._1.equalsIgnoreCase("username1.csv")).head
println("username1_csv")
println(username1_csv)
val username2_csv = dataCollected.filter(ele => ele._1.equalsIgnoreCase("username2.csv")).head
println("username2_csv")
println(username2_csv)
}
}
我使用以下命令创建了一个 tar.gz 文件。
tar -czvf myfirst.tar.gz username_files
username_files
文件夹包含以下文件作为示例。
username1.csv
username2.csv
username3.csv
username4.csv
username5.csv
username6.csv
将 myfirst.tar.gz 上传到本地 hdfs 进行尝试并检查是否一切正常。