我尝试订阅 GCP PubSub 以从 GCP Composer 获取消息,但出现以下错误:
airflow.exceptions.AirflowException: The conn_id `google_cloud_default` isn't defined
我的DAG代码如下:
subscribe_pubsub_task = PubSubPullSensor(
task_id="subscribe_pubsub",
project_id=GCP_REPORTING_ID,
subscription=PUBSUB_SUBSCRIPTION,
ack_messages=True,
deferrable=True,
)
我已经查看了 Airflow 文档:https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/sensors/pubsub/index.html
如何使用 GCP 服务帐户获取连接 ID 进行身份验证?
通过 GCP 的自定义气流连接解决了问题。
连接实现如下代码所示:
PUBSUB_CONNECTION = 'gcp_pubsub_reporting'
PUBSUB_CLIENT = Connection(
conn_id=PUBSUB_CONNECTION,
conn_type='google_cloud_platform'
)
scopes = [
"https://www.googleapis.com/auth/pubsub",
"https://www.googleapis.com/auth/cloud-platform",
]
conn_extra = {
"extra__google_cloud_platform__scope": ",".join(scopes),
"extra__google_cloud_platform__project": GCP_REPORTING_ID,
"extra__google_cloud_platform__keyfile_dict": json.loads(GCP_PUBSUB_CREDENTIALS),
}
conn_extra_json = json.dumps(conn_extra)
PUBSUB_CLIENT.set_extra(conn_extra_json)
if not (session.query(Connection).filter(Connection.conn_id == PUBSUB_CLIENT.conn_id).first()):
session.add(PUBSUB_CLIENT)
session.commit()
else:
msg = f'Airflow connection with conn_id: {PUBSUB_CLIENT.conn_id} already exists'
print(msg)
任务实现如下代码:
subscribe_pubsub_task = PubSubPullSensor(
task_id="subscribe_pubsub",
project_id=GCP_REPORTING_ID,
subscription=PUBSUB_SUBSCRIPTION,
gcp_conn_id=PUBSUB_CONNECTION,
ack_messages=True,
deferrable=False,
max_messages=1,
mode='poke',
poke_interval=20
)