我是 Apache Airflow 的新手,并尝试使用 SQLite 数据库后端。
当我在 DAG 中运行 SQL 查询时,我收到成功代码 0,并且日志显示查询正在成功执行。问题是,当我检查安装时创建的默认数据库 sqlite:////home/ademusire/airflow/airflow.db 时,我从 DAG 创建的表不存在。由于我没有创建任何其他数据库,所以我不知道它可能在哪里。
请问如何找到正在执行查询的数据库文件?
我的 DAG.py 中的代码写在下面:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
default_args = {
"owner": "ademusire",
"retries": 0,
"retry_delay": timedelta(minutes=2)
}
with DAG(
dag_id="dag_with_sqlite_operator_v06",
default_args=default_args,
start_date=datetime(2023, 12, 23),
schedule_interval="@daily"
) as dag:
task1 = SqliteOperator(
task_id="create_table_sqlite",
sql=r"""
CREATE TABLE IF NOT EXISTS tripdata_monthly_statistics(
id INTEGER PRIMARY KEY AUTOINCREMENT,
month TEXT,
sat_mean_trip_count NUMERIC,
sat_mean_fare_per_trip NUMERIC,
sat_mean_duration_per_trip NUMERIC,
sun_mean_trip_count NUMERIC,
sun_mean_fare_per_trip NUMERIC,
sun_mean_duration_per_trip NUMERIC
);
""",
)
task2 = SqliteOperator(
task_id="insert_into_table",
sql=r"""
INSERT INTO tripdata_monthly_statistics(id, month,
sat_mean_trip_count, sat_mean_fare_per_trip, sat_mean_duration_per_trip,
sun_mean_trip_count, sun_mean_fare_per_trip, sun_mean_duration_per_trip)
VALUES(1, '2023-11', 7, 8, 9, 10, 11, 12);
""",
)
task3 = SqliteOperator(
task_id="select_from_table",
sql=r"""SELECT * FROM tripdata_monthly_statistics;""",
)
task4 = SqliteOperator(
task_id="show_tables",
sql=r"""
SELECT
name
FROM
sqlite_schema
WHERE
type ='table' AND
name NOT LIKE 'sqlite_%';
""",
)
task1 >> task2 >> task3 >> task4
当我运行
airflow config get-value database sql_alchemy_conn
时,它输出 sqlite:////home/ademusire/airflow/airflow.db 作为我连接到的数据库,但我创建的新表不存在。我不知道如何创建一个新的 SQLite 数据库,其中将执行我在 DAG 中运行的任务的查询,并且我找不到存储新表的当前数据库。谢谢你。
Airflow 元数据数据库和 SqliteOperator 连接的数据库是两个不同的数据库。您可能不想在 Airflow 的数据库上运行任何操作,除非安排一些在 Airflow 内部运行的维护 DAG。
运算符采用名为
sqlite_conn_id
的参数来定义数据库。如果您没有明确提供它,则它采用默认值 sqlite_default
。其详细信息可以在 UI(管理 -> 连接)或通过 API/CLI 找到:
$ airflow connections get sqlite_default
id | conn_id | conn_type | host
===+================+===========+========================
49 | sqlite_default | sqlite | /tmp/sqlite_default.db
现在,如果您打开此数据库,您可能会找到您的表。
最好的方法是不要依赖默认连接,而是显式定义您自己的连接(->docs)并在运算符中使用它。