我有一个名为
Results
的 Delta 表,我有一个需要按顺序在 Results
上执行的操作列表。
给定的操作列表:
['op_1', 'op_2', 'op_3']
如果 op_1
列在某个 IP 范围内,则
value
会修改 network
列。
如果op_2
列在一定范围内,
value
会再次删除行列。
op_3
在 grouping
列的基础上修改 zone
。
作为此过程的一部分,我需要修改另一个名为
operation_log
的团队表,该表以某种方式格式化某些列,用于跟踪对哪些行执行了哪些操作。他们拥有这张桌子,我无法更改这部分。
使用
MERGE INTO
语句在增量表上执行操作,然后在 operation_log
中跟踪操作
目前我有该操作,然后该操作的日志作为两个单独的查询正在执行。我面临的问题是,当删除发生在结果上时,无法再从表或任何创建的视图访问该行。所以我必须先执行log sql。如果日志执行正常但实际删除sql失败,这就是一个问题。如果没有发生任何操作,我会记录一个操作。
我尝试将所有内容合并到一个要执行的sql语句中。删除语句示例:
MERGE INTO results
USING _vectors
ON results.vector_id = _vectors.vector_id
WHEN MATCHED THEN
DELETE
WHEN MATCHED THEN
INSERT INTO operation_log (
vector_id,
ip_range,
value,
grouping,
zone,
action
)
VALUES (
_vectors.vector_id,
_vectors.ip_range,
_vectors.value,
_vectors.grouping,
_vectors.zone,
"DELETE"
)
不幸的是,唯一的硬性要求就是这种挑剔
operation_log
。
根据 databricks delta 文档,我可以有多个匹配条件,但这是否可以在一个查询中对单独的表进行更新和插入?
您可以在 Results
表上
启用更改数据源,然后使用它来更新日志表。
更改数据源将包含所有受影响行的行级别信息(插入、更新、删除)。
要启用更改数据源:
ALTER TABLE Results SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
然后您可以批量或流式读取。例如:
-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('Results', 0, 10)
-- timestamp as string formatted timestamps
SELECT * FROM table_changes('Results', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
根据需要解析数据并更新您的更改日志表