我正在使用 Pyspark 实现 Lakehouse(Apache Iceberg),但遇到了一些问题。所以我来自 SQL 背景,所以最初尝试以与通常相同的方式实现这个解决方案,这在数据库中完成时很好,但在 Spark 中效率很低。
例如,假设我有 4 个冰山表,它们连接起来以获得最终的“转换后”数据。我们假设每个表都依赖于前一个表。
示例 A->B->C->D
因此,如果 D 上发生更改,C 上可能会有多条记录受到影响,这些记录也会一路向上传播。
最初我实现的方式是对于每个表,我都会连接回核心表(A)。因此,如果对 C 进行了更改,我会将 C 上的修改/新记录连接回 B,然后返回 A,以查看哪些 A 记录受到影响。这将对所有其他表重复。然后我将获得所有这些连接(A、B、C、D)的最终转换视图,并根据我们之前收集的 A 记录进行过滤。
这样做的问题是效率非常低,因为基本上我多次访问同一个完整的表。根据我的研究,您似乎可以缓存该表,但您不想在大数据集上执行此操作,因为它可能会溢出到磁盘存储中。
关于我应该如何在最佳实践方面解决这个问题有什么建议吗?
非常通用的主题和开放式问题。
需要考虑的事项:
步骤/作业/流的数量:我理所当然地认为运行 3 个不同的作业来移动 D->C、C->B 和 B->A 的更改是一个好主意。需要考虑的其他选项是您已经尝试过的选项,即执行 D->C->B->A 的单个作业。如果您处理少量数据,这将起作用。
因此您将拥有 3 个可以独立运行的作业。
# D->C
sD = spark.readStream.format('iceberg').load('D')
sC = sD.<transform>
sC.writeStream.format("iceberg").trigger(..).options(..).toTable("C")
# C->B
sC = spark.readStream.format('iceberg').load('C')
sB = sC.<transform>
sB.writeStream.format("iceberg").trigger(..).options(..).toTable("B")
# B->A
...
更改数据捕获:首先您需要弄清楚如何识别源表的“增量更改”。 Stock Iceberg 结构化流 将为您提供一个包含与源表相同的列的 DataFrame。 IE。你无法区分插入、更新和删除。鉴于 Iceberg 仅支持插入,这不是问题,除非您需要支持对源表的更新/删除。如果您这样做,那么您将需要了解如何在 Iceberg 之上实现 CDC,例如这是一个。 Delta 表支持 CDC 开箱即用。但这完全是一种新格式。
插入是独立的还是与源表中的其他/现有行相关?例如。如果你的
employee, job_id, status, event_time
和 employee, avg_job_duration
然后,当您在源表中获取某个员工的作业完成事件时,您需要从源表中读取与该员工对应的所有行,以便能够更新目标表。在这种情况下,您确实无法避免将源表的增量 DataFrame 与源表连接起来。但是您需要在设计源表时考虑到此类联接(正确分区)。