我需要在使用 pyspark 进行批处理期间“检查点”下一批所需的某些信息。
对于这个用例,DataFrame.checkpoint 似乎适合。虽然我发现很多地方都解释了如何创建检查点,但我没有找到任何如何恢复或读取检查点的信息。
为了对此进行测试,我创建了一个包含两 (2) 个测试的简单测试类。第一个读取 CSV 并创建总和。第二个应该继续总结一下:
import pytest
from pyspark.sql import functions as f
class TestCheckpoint:
@pytest.fixture(autouse=True)
def init_test(self, spark_unit_test_fixture, data_dir, tmp_path):
self.spark = spark_unit_test_fixture
self.dir = data_dir("")
self.checkpoint_dir = tmp_path
def test_first(self):
df = (self.spark.read.format("csv")
.option("pathGlobFilter", "numbers.csv")
.load(self.dir))
sum = df.agg(f.sum("_c1").alias("sum"))
sum.checkpoint()
assert 1 == 1
def test_second(self):
df = (self.spark.read.format("csv")
.option("pathGlobFilter", "numbers2.csv")
.load(self.dir))
sum = # how to get back the sum?
在第一个测试中创建检查点工作正常(将 tmp_path 设置为检查点目录),我看到一个使用文件创建的文件夹。
但是我该如何阅读它?
如何处理多个检查点?例如,一个检查点用于总和,另一个检查点用于平均值?
是否有更好的方法来跨批次存储状态?
为了完整起见,CSV 如下所示:
1719228973,1
1719228974,2
这只是让它运行的一个最小示例 - 我的真实场景更复杂。
虽然从理论上讲,检查点在 Spark 作业之间保留,并且可以通过直接读取文件从其他 Spark 作业进行访问,而无需重新计算整个沿袭,但它们并没有使直接从其他 Spark 作业的存储文件中读取检查点变得不那么容易。如果您感兴趣,here 是一个答案,其中显示了检查点发生时文件名的外观。
因此,就您而言,我建议您自己存储到磁盘,并在其他工作需要时读取文件。您可以使用存储机制(例如镶木地板,其效率取决于您的数据和处理性质)。像这样的东西:
import pytest
from pyspark.sql import functions as f
class TestCheckpoint:
@pytest.fixture(autouse=True)
def init_test(self, spark_unit_test_fixture, data_dir, tmp_path):
self.spark = spark_unit_test_fixture
self.dir = data_dir("")
self.checkpoint_dir = tmp_path
def test_first(self):
df = (self.spark.read.format("csv")
.option("pathGlobFilter", "numbers.csv")
.load(self.dir))
sum_df = df.agg(f.sum("_c1").alias("sum"))
sum_df.write.mode("overwrite").parquet(str(self.checkpoint_dir / "sum"))
assert 1 == 1
def test_second(self):
previous_sum = self.spark.read.parquet(str(self.checkpoint_dir / "sum"))
previous_sum_value = previous_sum.collect()[0]["sum"]
df = (self.spark.read.format("csv")
.option("pathGlobFilter", "numbers2.csv")
.load(self.dir))
new_sum = df.agg(f.sum("_c1").alias("sum"))
total_sum = previous_sum_value + new_sum.collect()[0]["sum"]
assert 1 == 1
也就是说,如果您需要在同一个 Spark 作业中访问检查点数据,您可以像这样保留对数据帧的引用
sum = df.agg(f.sum("_c1").alias("sum"))
sum = sum.checkpoint() # hold on to this reference to access the checkpointed data
或者,您还可以使用
df.persist(StorageLevel.DISK_ONLY)
,它还允许您存储到磁盘,同时保留数据沿袭。但是,一旦作业结束,数据就会被清除。