如何在 Apache Airflow 中找到本地 SQLite 数据库文件?

问题描述 投票:0回答:1

我是 Apache Airflow 的新手,并尝试使用 SQLite 数据库后端。

当我在 DAG 中运行 SQL 查询时,我收到成功代码 0,并且日志显示查询正在成功执行。问题是,当我检查安装时创建的默认数据库 sqlite:////home/ademusire/airflow/airflow.db 时,我从 DAG 创建的表不存在。由于我没有创建任何其他数据库,所以我不知道它可能在哪里。

请问如何找到正在执行查询的数据库文件?

我的 DAG.py 中的代码写在下面:

from datetime import datetime, timedelta 

from airflow import DAG 

from airflow.providers.sqlite.operators.sqlite import SqliteOperator

default_args = {
    "owner": "ademusire",
    "retries": 0,
    "retry_delay": timedelta(minutes=2)
}

with DAG(
    dag_id="dag_with_sqlite_operator_v06",
    default_args=default_args,
    start_date=datetime(2023, 12, 23),
    schedule_interval="@daily"
) as dag:

    task1 = SqliteOperator(
        task_id="create_table_sqlite",
        sql=r"""
            CREATE TABLE IF NOT EXISTS tripdata_monthly_statistics(
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            month TEXT,
            sat_mean_trip_count NUMERIC,
            sat_mean_fare_per_trip NUMERIC,
            sat_mean_duration_per_trip NUMERIC,
            sun_mean_trip_count NUMERIC,
            sun_mean_fare_per_trip NUMERIC,
            sun_mean_duration_per_trip NUMERIC
            );
        """,
    )
    
    task2 = SqliteOperator(
        task_id="insert_into_table",
        sql=r"""
            INSERT INTO tripdata_monthly_statistics(id, month, 
            sat_mean_trip_count, sat_mean_fare_per_trip, sat_mean_duration_per_trip,
            sun_mean_trip_count, sun_mean_fare_per_trip, sun_mean_duration_per_trip)
            VALUES(1, '2023-11', 7, 8, 9, 10, 11, 12);
        """,
    ) 

    task3 = SqliteOperator(
        task_id="select_from_table",
        sql=r"""SELECT * FROM tripdata_monthly_statistics;""",
    )

    task4 = SqliteOperator(
        task_id="show_tables",
        sql=r"""
            SELECT 
                name
            FROM 
                sqlite_schema
            WHERE 
                type ='table' AND 
                name NOT LIKE 'sqlite_%';
        """,
    )

    task1 >> task2 >> task3 >> task4

当我运行

airflow config get-value database sql_alchemy_conn
时,它输出 sqlite:////home/ademusire/airflow/airflow.db 作为我连接到的数据库,但我创建的新表不存在。我不知道如何创建一个新的 SQLite 数据库,其中将执行我在 DAG 中运行的任务的查询,并且我找不到存储新表的当前数据库。谢谢你。

sqlite airflow
1个回答
0
投票

Airflow 元数据数据库和 SqliteOperator 连接的数据库是两个不同的数据库。您可能不想在 Airflow 的数据库上运行任何操作,除非安排一些在 Airflow 内部运行的维护 DAG。

运算符采用名为

sqlite_conn_id
的参数来定义数据库。如果您没有明确提供它,则它采用默认值
sqlite_default
。其详细信息可以在 UI(管理 -> 连接)或通过 API/CLI 找到:

$ airflow connections get sqlite_default
id | conn_id        | conn_type | host                   
===+================+===========+========================
49 | sqlite_default | sqlite    | /tmp/sqlite_default.db 

现在,如果您打开此数据库,您可能会找到您的表。

最好的方法是不要依赖默认连接,而是显式定义您自己的连接(->docs)并在运算符中使用它。

© www.soinside.com 2019 - 2024. All rights reserved.