从 Airflow DAG 任务调用的 Redshift [Insert into.. select..] 遇到一个奇怪的问题。我们有一个包含 5 个连续任务的 DAG 定义
任务 1:截断 temp_table
SQL:`截断临时表; `
任务 2: 使用
source_table1
将 1 天的数据从 view_1
加载到 temp_table
SQL:`INSERT INTO temp_table SELECT * FROM view_1`
任务 3:将
temp_table
数据加载到 fact_table_1
SQL; ` INSERT INTO fact_table_1 SELECT * FROM temp_table `
任务 4:使用
view_2
(由 source_1
和 source_2
构建)将 1 天数据加载到 target_table_1
SQL:`INSERT INTO target_table_1 SELECT * FROM view_2`
任务 5:使用
view_3
(由 source_2
和 source_3
构建)将 1 天的数据加载到 target_table_2
SQL:`INSERT INTO target_table_2 SELECT * FROM view_3`
问题:
DAG 和所有任务在 Airflow 中都标记为成功。
Redshift 查询历史记录显示查询 ID 成功。
但是,即使
target_table_1
返回数据,view_2
在任务 4 之后仍为空。
从 SQL 客户端运行相同的
INSERT INTO ... SELECT ...
查询可成功将记录插入到 target_table_1
。
采取的故障排除步骤:
验证
target_table_1
的架构(列和数据类型)是否与 view_2
的输出匹配。
确保
view_2
定义中的过滤器在 DAG 运行时不会限制数据。
请求帮助:
有人在 Airflow 中使用 Redshift 插入时遇到过类似的问题吗?任何有关进一步故障排除步骤的建议将不胜感激。我们特别有兴趣了解为什么插入可以从 SQL 客户端运行,但不能在 Airflow DAG 中运行。
我怀疑问题在于这些运行的顺序性质。 Redshift 使用 MVCC(多版本并发控制 - https://en.wikipedia.org/wiki/Multiversion_concurrency_control),其中事务即使在同时运行时也可以看到不同的数据。
根据定义,您的基准会话是单个事务或至少严格执行提交顺序。 这是关键 - 相对于下一个任务的事务开始,这些任务何时提交到数据库?
去查看查询日志,看看运行这些任务的事务是否在时间上重叠。 其他事务将在启动时看到表的版本,但在这些事务提交其结果之前,来自其他会话的数据更新不可用。 如果时间非常接近,您将需要查看采购任务的更新在数据库中完全提交的时间与消费事务开始的时间相比。
Airflow 灵活而强大,但这可能会导致事情的执行方式与您预期的不完全一样。