我有一个 parquet 文件,数据大小从 5 到 100 Mb 不等。
当我尝试在日期列上创建分区时,会创建多个文件,这会降低读取性能,因为有许多小文件。
因此我使用重新分区(1)只创建单个文件。
现在使用 Spark sql 在此文件上使用的查询是日期范围查询,例如 x 和 y 之间的日期。
public DataFilter applyValuationDateRangeFilter() {
AlcyoneDate startDate = calculationContext.getDateSequence().getStartDate();
AlcyoneDate endDate = calculationContext.getDateSequence().getEndDate();
filterQueries.append(" AND ");
filterQueries.append("ValuationDate BETWEEN '").append(startDate).append("' AND '").append(endDate).append("'");
return this;
}
我尝试使用 repartition(1) 在 ValuationDate 列上创建分区,但没有获得太多好处。
如何提高阅读性能?
我在文件加载时应用过滤器:
public Dataset<Row> getDatasetForInputFileWithFiltering(String parquetFilePath, String filterQuery) {
return getSparkSession().read().format("parquet")
.option("inferSchema", "true")
.option("header", "true")
.load(parquetFilePath)
.filter(filterQuery);
}
我还需要迭代所有行以填充一些数据:
Iterator<Row> rowIterator = rows.toLocalIterator();
while (rowIterator.hasNext()) {
Row row = rowIterator.next();
我无法使用 rows.collectAsList() 因为它给了我 OOM。 也无法使用 rows.foreach 和 rows.foreachPartition 因为我的下游代码未处理多线程。
Spark 通常建议将文件保留在 128MB 左右,其中一行组(也是 128MB)。
在您的特定情况下,正如您所说的分区会适得其反,导致文件太小。因此,独特的镶木地板文件是可行的方法,使用coalesce(1)。
现在,对于时间戳过滤器的用例,您可能会尝试将行组大小(spark 有一个配置)减少到 5MB 之类的大小,并在时间戳列上进行排序。理论上,这会导致读者根据存储的最小/最大统计数据跳过行组,因此读取 5MB 而不是 100MB。
拥有更多行组有一些缺点,例如:
这也意味着您最终会得到比拥有一个 onkyvrow 组更多的并行性。
顺便说一句,如果你真的需要速度,spark 不是最好的选择,它是为容错 ETL 设计的,有更合适的基于镶木地板的引擎,但这超出了范围。