Airflow 如何将 API 请求结果写入 BigQuery 表或 GCS 文件

问题描述 投票:0回答:1

我是一名新手,尝试使用 Airflow 创建一个简单的管道,该管道从 API 获取数据并将其存储在 BigQuery 表中。我已经成功解码 API 返回的 json 并将其打印出来。我一直在试图弄清楚如何将该数据存储到 GCS 或其他地方的文件中,以便我可以使用 GCStoBigqueryOperator 将数据加载到 BigQuery 中。我还没有找到任何允许我创建文件、填充文件并将其存储在存储桶中的运算符。

这也是该管道的正确方法吗?

谢谢你

from airflow import DAG

from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.email import EmailOperator
from datetime import datetime, timedelta
from urllib.request import urlopen
from urllib.request import Request
import json

default_args = {
    "owner": "airflow",
    "email_on_failure": False,
    "email_on_retry": False,
    "email": "[email protected]",
    "retries": 1,
    "retry_delay": timedelta(minutes=5)
}

headers = {
        'Content-Type': 'application/json',
        'api-auth-accountid': '00000-0rererekl09rtlkjescgy0-0-ete90-et',
        'api-auth-applicationkey': 'rre9gr45jk34594gft-3it30it'
    }

  


def download_sales():
    
    request1 = Request(
        'https://inventory.dvvvvvvtems.com/ExternalApi/v2/saleList?Limit=50&CreatedSince=2021-1-1', headers=headers)

    sale_list = urlopen(request1).read()
    dec_sale_list = json.loads(sale_list)
    for sale in dec_sale_list['SaleList']:
        print("sale ID: " + sale['SaleID'] + " Customer:" + sale['Customer'] + " Order Date: " + sale['OrderDate'])

        s_id = sale['SaleID']
        request2 = Request(f'https://inventory.dfakesysjhghtems.com/ExternalApi/t3/sale/order?SaleID={s_id}', headers=headers)
        order_list = urlopen(request2).read()
        dec_item_list = json.loads(order_list)
        for sku in dec_item_list['Lines']:
            print("SKU: " + sku['SKU'] + " Product: " + sku['Name'] + " Quantity: " + str(sku['Quantity']) + " Price:  " + str(sku['Price'])+ " Total:  " + str(sku['Total']))

with DAG("sales_data_pipeline", start_date=datetime(2021, 1 ,1),
    schedule_interval="@daily", default_args=default_args, catchup=False) as dag:


    downloading_sales = PythonOperator(
        task_id="downloading_sales",
        python_callable=download_sales

 
google-bigquery cloud airflow pipeline
1个回答
0
投票

您可以利用Airflow中的localtoGCSOperator将API响应临时存储在本地内存中并将其传输到GCS上。 下一步相当简单,从 GCS 加载文件

© www.soinside.com 2019 - 2024. All rights reserved.