我是 GCP 上的 apache-airflow 新手,我正在尝试在 Dataproc 无服务器内的 DAG 上使用 BigQueryToPostgresOperator 将表从 Bigquery 发送到 Cloud SQL,特别是发送到 postgresql 上的数据库,这来自 GCP 上的不同项目,但我收到错误ProgrammingError:无效的dsn:无效的连接选项“database_type”。
这是我拥有的 DAG 代码
from datetime import datetime
from airflow.models.dag import DAG
from airflow.providers.google.cloud.transfers.bigquery_to_postgres import BigQueryToPostgresOperator
DAG_ID = "BQTOPOSTGRES"
DATASET_NAME = "test_dataset"
TABLE = "test_table"
destination_table = "destination_test_table"
with DAG(
DAG_ID,
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example", "bigquery"],
) as dag:
bigquery_to_postgres = BigQueryToPostgresOperator(
# [START howto_operator_bigquery_to_postgres]
task_id="bigquery_to_postgres",
dataset_table=f"{DATASET_NAME}.{TABLE}", #the big query table of origin
postgres_conn_id="connection_test",
target_table_name=destination_table, # target postgres table (templated)
replace=False,
# [END howto_operator_bigquery_to_postgres]
)
bigquery_to_postgres
我试图从气流连接中更改 postgres_conn_id ,但未成功,我想知道如何建立连接并正确使用操作员。