如何跨批次读取/恢复检查点数据帧

问题描述 投票:0回答:1

我需要在使用 pyspark 进行批处理期间“检查点”下一批所需的某些信息。

对于这个用例,DataFrame.checkpoint 似乎适合。虽然我发现很多地方都解释了如何创建检查点,但我没有找到任何如何恢复或读取检查点的信息。

为了对此进行测试,我创建了一个包含两 (2) 个测试的简单测试类。第一个读取 CSV 并创建总和。第二个应该继续总结一下:

import pytest
from pyspark.sql import functions as f

class TestCheckpoint:

    @pytest.fixture(autouse=True)
    def init_test(self, spark_unit_test_fixture, data_dir, tmp_path):
        self.spark = spark_unit_test_fixture
        self.dir = data_dir("")
        self.checkpoint_dir = tmp_path

    def test_first(self):
        df = (self.spark.read.format("csv")
              .option("pathGlobFilter", "numbers.csv")
              .load(self.dir))

        sum = df.agg(f.sum("_c1").alias("sum"))
        sum.checkpoint()
        assert 1 == 1

    def test_second(self):
        df = (self.spark.read.format("csv")
              .option("pathGlobFilter", "numbers2.csv")
              .load(self.dir))

        sum = # how to get back the sum?

在第一个测试中创建检查点工作正常(将 tmp_path 设置为检查点目录),我看到一个使用文件创建的文件夹。

但是我该如何阅读它?

如何处理多个检查点?例如,一个检查点用于总和,另一个检查点用于平均值?

是否有更好的方法来跨批次存储状态?

为了完整起见,CSV 如下所示:

1719228973,1
1719228974,2

这只是让它运行的一个最小示例 - 我的真实场景更复杂。

python pyspark
1个回答
0
投票

虽然从理论上讲,检查点在 Spark 作业之间保留,并且可以通过直接读取文件从其他 Spark 作业进行访问,而无需重新计算整个沿袭,但它们并没有使直接从其他 Spark 作业的存储文件中读取检查点变得不那么容易。如果您感兴趣,here 是一个答案,其中显示了检查点发生时文件名的外观。

因此,就您而言,我建议您自己存储到磁盘,并在其他工作需要时读取文件。您可以使用存储机制(例如镶木地板,其效率取决于您的数据和处理性质)。像这样的东西:

import pytest
from pyspark.sql import functions as f

class TestCheckpoint:

    @pytest.fixture(autouse=True)
    def init_test(self, spark_unit_test_fixture, data_dir, tmp_path):
        self.spark = spark_unit_test_fixture
        self.dir = data_dir("")
        self.checkpoint_dir = tmp_path

    def test_first(self):
        df = (self.spark.read.format("csv")
              .option("pathGlobFilter", "numbers.csv")
              .load(self.dir))

        sum_df = df.agg(f.sum("_c1").alias("sum"))
        sum_df.write.mode("overwrite").parquet(str(self.checkpoint_dir / "sum"))
        assert 1 == 1

    def test_second(self):
        previous_sum = self.spark.read.parquet(str(self.checkpoint_dir / "sum"))
        previous_sum_value = previous_sum.collect()[0]["sum"]

        df = (self.spark.read.format("csv")
              .option("pathGlobFilter", "numbers2.csv")
              .load(self.dir))

        new_sum = df.agg(f.sum("_c1").alias("sum"))
        total_sum = previous_sum_value + new_sum.collect()[0]["sum"]

        assert 1 == 1

也就是说,如果您需要在同一个 Spark 作业中访问检查点数据,您可以像这样保留对数据帧的引用

sum = df.agg(f.sum("_c1").alias("sum"))
sum = sum.checkpoint() # hold on to this reference to access the checkpointed data

或者,您还可以使用

df.persist(StorageLevel.DISK_ONLY)
,它还允许您存储到磁盘,同时保留数据沿袭。但是,一旦作业结束,数据就会被清除。

© www.soinside.com 2019 - 2024. All rights reserved.