Airflow DAG - 如何首先检查BQ(必要时删除)然后运行数据流作业?

问题描述 投票:3回答:1

我正在使用云编写器来协调ETL,以便将到达GCS的文件转到BigQuery。我有一个云函数,当文件到达时触发dag并且云函数将文件名/位置传递给DAG。在我的DAG中,我有两个任务:

1)使用DataflowPythonOperator运行数据流作业,该作业从GCS中的文本读取数据并对其进行转换并将其输入BQ,以及2)根据作业是失败还是成功将文件移动到失败/成功桶。每个文件都有一个文件ID,它是bigquery表中的一列。有时文件将被编辑一次或两次(它不是经常出现的流式传输),我希望能够首先删除该文件的现有记录。

我调查了其他气流操作员,但是在运行数据流作业之前,我希望在DAG中有2个任务:

  1. 根据文件名获取文件ID(现在我有一个bigquery表映射文件名 - >文件ID但我也可以引入一个json作为地图我想如果这更容易)
  2. 如果文件ID已存在于bigquery表(从数据流作业输出转换数据的表)中,请将其删除,然后运行数据流作业,以便获得最新信息。我知道一个选项是只添加一个时间戳并且只使用最新的记录,但因为每个文件可能有100万条记录而且我不像每天删除100个文件(可能是1-2个顶部)这似乎可能是混乱和混乱。

在数据流作业之后,理想情况下,在将文件移动到成功/失败文件夹之前,我想附加一些“记录”表,说这个游戏是在此时输入的。这将是我查看发生的所有插入的方法。我试图寻找不同的方法来做到这一点,我是云作曲家的新手,所以我没有清楚地知道在经过10多个小时的研究后这将如何工作,否则我会发布代码进行输入。

谢谢,我非常感谢大家的帮助并道歉,如果这不是你想要的那么清楚,关于气流的文档是非常强大的,但鉴于云作曲家和bigquery相对较新,很难学会如何彻底地做一些GCP的具体任务。

python google-cloud-platform google-bigquery airflow google-cloud-composer
1个回答
2
投票

听起来有点复杂。很高兴,每个GCP服务都有运营商。另一件事是何时触发DAG执行。你有没有意识到这一点?每当新文件进入该GCS存储桶时,您都希望触发Google Cloud功能。

  1. 触发您的DAG

要触发DAG,您需要使用依赖于Object FinalizeMetadata Update触发器的Google Cloud功能来调用它。

  1. 将数据加载到BigQuery

如果您的文件已经在GCS中,并且采用JSON或CSV格式,那么使用Dataflow作业就太过分了。您可以使用GoogleCloudStorageToBigQueryOperator将文件加载到BQ。

  1. 跟踪文件ID

计算文件ID的最好的事情可能是使用Airflow的Bash或Python运算符。你能直接从文件名中导出它吗?

如果是这样,那么您可以在GoogleCloudStorageObjectSensor上游使用Python运算符来检查文件是否在成功目录中。

如果是,则可以使用BigQueryOperator在BQ上运行删除查询。

之后,您运行GoogleCloudStorageToBigQueryOperator。

  1. 移动文件

如果您要将文件从GCS移动到GCS位置,那么GoogleCloudStorageToGoogleCloudStorageOperator应该可以完成您需要的技巧。如果您的BQ加载操作员失败,则移至失败的文件位置,如果成功,则移至成功的作业位置。

  1. 记录任务日志

也许您需要跟踪插入的所有内容都是将任务信息记录到GCS。看看how to log task information to GCS

这有帮助吗?

© www.soinside.com 2019 - 2024. All rights reserved.