从 GCP Composer (Airflow) 到 GCP PubSub 的连接错误

问题描述 投票:0回答:1

我尝试订阅 GCP PubSub 以从 GCP Composer 获取消息,但出现以下错误:

airflow.exceptions.AirflowException: The conn_id `google_cloud_default` isn't defined

我的DAG代码如下:

subscribe_pubsub_task = PubSubPullSensor(
  task_id="subscribe_pubsub",
  project_id=GCP_REPORTING_ID,
  subscription=PUBSUB_SUBSCRIPTION,
  ack_messages=True,
  deferrable=True,
)

我已经查看了 Airflow 文档:https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/sensors/pubsub/index.html

如何使用 GCP 服务帐户获取连接 ID 进行身份验证?

python google-cloud-platform airflow google-cloud-pubsub
1个回答
0
投票

通过 GCP 的自定义气流连接解决了问题。

连接实现如下代码所示:

PUBSUB_CONNECTION = 'gcp_pubsub_reporting'
PUBSUB_CLIENT = Connection(
  conn_id=PUBSUB_CONNECTION,
  conn_type='google_cloud_platform'
)
scopes = [
  "https://www.googleapis.com/auth/pubsub",
  "https://www.googleapis.com/auth/cloud-platform",
]
conn_extra = {
  "extra__google_cloud_platform__scope": ",".join(scopes),
  "extra__google_cloud_platform__project": GCP_REPORTING_ID,
  "extra__google_cloud_platform__keyfile_dict": json.loads(GCP_PUBSUB_CREDENTIALS),
}
conn_extra_json = json.dumps(conn_extra)
PUBSUB_CLIENT.set_extra(conn_extra_json)

if not (session.query(Connection).filter(Connection.conn_id == PUBSUB_CLIENT.conn_id).first()):
  session.add(PUBSUB_CLIENT)
  session.commit()
else:
  msg = f'Airflow connection with conn_id: {PUBSUB_CLIENT.conn_id} already exists'
  print(msg)

任务实现如下代码:

subscribe_pubsub_task = PubSubPullSensor(
    task_id="subscribe_pubsub",
    project_id=GCP_REPORTING_ID,
    subscription=PUBSUB_SUBSCRIPTION,
    gcp_conn_id=PUBSUB_CONNECTION,
    ack_messages=True,
    deferrable=False,
    max_messages=1,
    mode='poke',
    poke_interval=20
)
© www.soinside.com 2019 - 2024. All rights reserved.