单个小文件的Parquet分区策略及读取优化

问题描述 投票:0回答:1

我有一个 parquet 文件,数据大小从 5 到 100 Mb 不等。

当我尝试在日期列上创建分区时,会创建多个文件,这会降低读取性能,因为有许多小文件。

因此我使用重新分区(1)只创建单个文件。

现在使用 Spark sql 在此文件上使用的查询是日期范围查询,例如 x 和 y 之间的日期。

public DataFilter applyValuationDateRangeFilter() {
    AlcyoneDate startDate = calculationContext.getDateSequence().getStartDate();
    AlcyoneDate endDate = calculationContext.getDateSequence().getEndDate();
    filterQueries.append(" AND ");
    filterQueries.append("ValuationDate BETWEEN '").append(startDate).append("' AND     '").append(endDate).append("'");
    return this;
}

我尝试使用 repartition(1) 在 ValuationDate 列上创建分区,但没有获得太多好处。

如何提高阅读性能?

我在文件加载时应用过滤器:

public  Dataset<Row> getDatasetForInputFileWithFiltering(String parquetFilePath, String filterQuery) {
    return getSparkSession().read().format("parquet")
            .option("inferSchema", "true")
            .option("header", "true")
            .load(parquetFilePath)
            .filter(filterQuery);
}

我还需要迭代所有行以填充一些数据:

Iterator<Row> rowIterator = rows.toLocalIterator();
        while (rowIterator.hasNext()) {
            Row row = rowIterator.next();

我无法使用 rows.collectAsList() 因为它给了我 OOM。 也无法使用 rows.foreach 和 rows.foreachPartition 因为我的下游代码未处理多线程。

java apache-spark apache-spark-sql parquet
1个回答
0
投票

Spark 通常建议将文件保留在 128MB 左右,其中一行组(也是 128MB)。

在您的特定情况下,正如您所说的分区会适得其反,导致文件太小。因此,独特的镶木地板文件是可行的方法,使用coalesce(1)。

现在,对于时间戳过滤器的用例,您可能会尝试将行组大小(spark 有一个配置)减少到 5MB 之类的大小,并在时间戳列上进行排序。理论上,这会导致读者根据存储的最小/最大统计数据跳过行组,因此读取 5MB 而不是 100MB。

拥有更多行组有一些缺点,例如:

  • 更多文件系统调用(Spark 将为每个行组至少创建一个任务)
  • 压缩效率较低(要压缩的数据较少通常会导致压缩效率较低)

这也意味着您最终会得到比拥有一个 onkyvrow 组更多的并行性。

顺便说一句,如果你真的需要速度,spark 不是最好的选择,它是为容错 ETL 设计的,有更合适的基于镶木地板的引擎,但这超出了范围。

© www.soinside.com 2019 - 2024. All rights reserved.