Databricks 合并到 - 添加插入另一个表的条件

问题描述 投票:0回答:1

我有一个名为

Results
的 Delta 表,我有一个需要按顺序在
Results
上执行的操作列表。

给定的操作列表:

['op_1', 'op_2', 'op_3']

如果

op_1

 列在某个 IP 范围内,则
value
会修改
network
列。

如果

op_2

列在一定范围内,
value
会再次删除行列。

op_3
grouping
列的基础上修改
zone

作为此过程的一部分,我需要修改另一个名为

operation_log
的团队表,该表以某种方式格式化某些列,用于跟踪对哪些行执行了哪些操作。他们拥有这张桌子,我无法更改这部分。

使用

MERGE INTO
语句在增量表上执行操作,然后在
operation_log

中跟踪操作

目前我有该操作,然后该操作的日志作为两个单独的查询正在执行。我面临的问题是,当删除发生在结果上时,无法再从表或任何创建的视图访问该行。所以我必须先执行log sql。如果日志执行正常但实际删除sql失败,这就是一个问题。如果没有发生任何操作,我会记录一个操作。

我尝试将所有内容合并到一个要执行的sql语句中。删除语句示例:

MERGE INTO results
USING  _vectors
    ON results.vector_id = _vectors.vector_id
WHEN MATCHED THEN
    DELETE
WHEN MATCHED THEN
    INSERT INTO operation_log (
        vector_id,
        ip_range,
        value,
        grouping,
        zone,
        action
        )
    VALUES (
        _vectors.vector_id,
        _vectors.ip_range,
        _vectors.value,
        _vectors.grouping,
        _vectors.zone,
        "DELETE"
    )

不幸的是,唯一的硬性要求就是这种挑剔

operation_log

根据 databricks delta 文档,我可以有多个匹配条件,但这是否可以在一个查询中对单独的表进行更新和插入?

pyspark apache-spark-sql databricks delta-lake
1个回答
0
投票

您可以在 Results 表上

启用更改数据源
,然后使用它来更新日志表。

更改数据源将包含所有受影响行的行级别信息(插入、更新、删除)。

要启用更改数据源:

ALTER TABLE Results SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

然后您可以批量或流式读取。例如:

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('Results', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('Results', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

根据需要解析数据并更新您的更改日志表

© www.soinside.com 2019 - 2024. All rights reserved.