增量湖屋更新

问题描述 投票:0回答:1

我正在使用 Pyspark 实现 Lakehouse(Apache Iceberg),但遇到了一些问题。所以我来自 SQL 背景,所以最初尝试以与通常相同的方式实现这个解决方案,这在数据库中完成时很好,但在 Spark 中效率很低。

例如,假设我有 4 个冰山表,它们连接起来以获得最终的“转换后”数据。我们假设每个表都依赖于前一个表。

示例 A->B->C->D

因此,如果 D 上发生更改,C 上可能会有多条记录受到影响,这些记录也会一路向上传播。

最初我实现的方式是对于每个表,我都会连接回核心表(A)。因此,如果对 C 进行了更改,我会将 C 上的修改/新记录连接回 B,然后返回 A,以查看哪些 A 记录受到影响。这将对所有其他表重复。然后我将获得所有这些连接(A、B、C、D)的最终转换视图,并根据我们之前收集的 A 记录进行过滤。

这样做的问题是效率非常低,因为基本上我多次访问同一个完整的表。根据我的研究,您似乎可以缓存该表,但您不想在大数据集上执行此操作,因为它可能会溢出到磁盘存储中。

关于我应该如何在最佳实践方面解决这个问题有什么建议吗?

pyspark amazon-emr apache-iceberg
1个回答
0
投票

非常通用的主题和开放式问题。

需要考虑的事项:

  1. 步骤/作业/流的数量:我理所当然地认为运行 3 个不同的作业来移动 D->C、C->B 和 B->A 的更改是一个好主意。需要考虑的其他选项是您已经尝试过的选项,即执行 D->C->B->A 的单个作业。如果您处理少量数据,这将起作用。

    因此您将拥有 3 个可以独立运行的作业。

    # D->C
    sD = spark.readStream.format('iceberg').load('D')
    sC = sD.<transform>
    sC.writeStream.format("iceberg").trigger(..).options(..).toTable("C")
    # C->B
    sC = spark.readStream.format('iceberg').load('C')
    sB = sC.<transform>
    sB.writeStream.format("iceberg").trigger(..).options(..).toTable("B")
    # B->A
    ...
    
  2. 更改数据捕获:首先您需要弄清楚如何识别源表的“增量更改”。 Stock Iceberg 结构化流 将为您提供一个包含与源表相同的列的 DataFrame。 IE。你无法区分插入、更新和删除。鉴于 Iceberg 仅支持插入,这不是问题,除非您需要支持对源表的更新/删除。如果您这样做,那么您将需要了解如何在 Iceberg 之上实现 CDC,例如这是一个Delta 表支持 CDC 开箱即用。但这完全是一种新格式。

  3. 插入是独立的还是与源表中的其他/现有行相关?例如。如果你的

    • 源表包含
      employee, job_id, status, event_time
    • 目标表包含
      employee, avg_job_duration

    然后,当您在源表中获取某个员工的作业完成事件时,您需要从源表中读取与该员工对应的所有行,以便能够更新目标表。在这种情况下,您确实无法避免将源表的增量 DataFrame 与源表连接起来。但是您需要在设计源表时考虑到此类联接(正确分区)。

© www.soinside.com 2019 - 2024. All rights reserved.