Spark 生成了多个小 parquet 文件。如何在生产者和消费者 Spark 作业上有效处理少量 parquet 文件。
恕我直言,最直接的方法是在编写镶木地板文件之前使用重新分区/合并(除非数据倾斜并且您想要创建相同大小的输出,否则首选合并),这样您就不会从一开始就创建小文件。
df
.map(<some transformation>)
.filter(<some filter>)
///...
.coalesce(<number of partitions>)
.write
.parquet(<path>)
分区数可以根据数据帧中的总行数除以某个因子来计算,通过反复试验将为您提供合适的大小。
在大多数大数据框架中,最好的做法是选择少数较大的文件而不是许多小文件(我通常使用的文件大小是 100-500MB)
如果您已经有小文件中的数据,并且据我所知您想要合并它,您将必须使用 Spark 重新分区到更少的分区来读取它,然后再次写入。
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReaderWrapper;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import parquet.avro.AvroReadSupport;
import parquet.hadoop.ParquetInputFormat;
import java.io.IOException;
public class CombineParquetInputFormat<T> extends CombineFileInputFormat<Void, T> {
@Override
public RecordReader<Void, T> createRecordReader(InputSplit split, TaskAttemptContext
context) throws IOException {
CombineFileSplit combineSplit = (CombineFileSplit) split;
return new CombineFileRecordReader(combineSplit, context, CombineParquetrecordReader.class);
}
private static class CombineParquetrecordReader<T> extends CombineFileRecordReaderWrapper<Void, T> {
public CombineParquetrecordReader(CombineFileSplit split, TaskAttemptContext context, Integer idx) throws
IOException, InterruptedException {
super(new ParquetInputFormat<T>(AvroReadSupport.class), split, context, idx);
}
}
}
在消费者端,请使用CombinedParquetInputFile:这将强制从单个任务中读取多个小文件。
生产者方面: 用户合并(numFiles)以获得足够数量的文件作为输出。
如何在spark中使用customInputFileFormat并形成RDD和Dataframes:
JavaRDD<Row> javaRDD = sc.newAPIHadoopFile(hdfsInputPath, CombineParquetInputFormat.class, Void.class, "AvroPojo.class", sc.hadoopConfiguration())
.values()
.map(p -> {
Row row = RowFactory.create(avroPojoToObjectArray((p));
return row;
});
sc.hadoopConfiguration().setBoolean(FileInputFormat.INPUT_DIR_RECURSIVE,true);
//set max split size else only 1 task wil be spawned
sc.hadoopConfiguration().setLong("mapreduce.input.fileinputformat.split.maxsize", (long) (128 * 1024 * 1024));
StructType outputSchema = (StructType) SchemaConverters.toSqlType(Profile.getClassSchema()).dataType();
final DataFrame requiredDataFrame = sqlContext.createDataFrame(javaRDD, outputSchema);