在 Airflow DAG 中调用 Google 云函数

问题描述 投票:0回答:3

我有一个正在运行的谷歌云功能,我正在尝试从 Airflow DAG 调用它。

到目前为止我尝试过的是使用 SimpleHttpOperator:

MY_TASK_NAME = SimpleHttpOperator(
        task_id= "MY_TASK_NAME",
        method='POST',
        http_conn_id='http_default',
        endpoint='https://us-central1-myprojectname.cloudfunctions.net/MyFunctionName',
        data=({"schema": schema, "table": table}),
        headers={"Content-Type": "application/json"},
        xcom_push=False
    )

但是深入日志,它说找不到资源:

{base_task_runner.py:98} 信息 - 子任务:

在此服务器上找不到请求的 URL

/https://us-central1-myprojectname.cloudfunctions.net/MyFunctionName
。 这就是我们所知道的。

我还注意到它实际上发布到 https://www.google.com/ +我给出的网址:

Sending 'POST' to url: https://www.google.com/https://us-central1-myprojectname.cloudfunctions.net/MyFunctionName

调用该函数的正确方法是什么? 谢谢

python google-cloud-functions airflow directed-acyclic-graphs
3个回答
2
投票

这是因为您正在使用

http_conn_id='http_default'

http_default
连接如下所示:

enter image description here

如果您检查“主机”字段,它会显示

http://www.google.com/

使用

HTTP
连接类型创建新连接,或修改
http_default
连接并将主机更改为
https://us-central1-myprojectname.cloudfunctions.net/

然后将任务中的

endpoint
字段更新为:

MY_TASK_NAME = SimpleHttpOperator(
        task_id= "MY_TASK_NAME",
        method='POST',
        http_conn_id='http_default',
        endpoint='MyFunctionName',
        data=({"schema": schema, "table": table}),
        headers={"Content-Type": "application/json"},
        xcom_push=False
    )

编辑:在 URL 末尾添加 /


0
投票

正如 @kaxil 指出的,您需要首先更改 http 连接。然后,您需要能够发送正确的身份验证以调用云功能。下面的链接有一个通过子类化来完成此操作的分步指南

SimpleHttpOperator

https://medium.com/google-cloud/calling-cloud-composer-to-cloud-functions-and-back-again-securely-8e65d783acce


顺便说一句,谷歌应该让这个过程更加清晰。想要从 Google Cloud Composer 触发 Google Cloud Function (gcf) 是完全合理的。有关如何将 http 触发器发送到 gcf 的文档包括 Cloud Scheduler、Cloud Tasks、Cloud Pub/Sub 和许多其他文档(但不包括 Cloud Composer)


0
投票

面临类似的问题,并使用 Wayback machine 从上述评论中获取分步指南。此人在 GitHub 存储库中仍然有完整版本(带有代码和屏幕截图): https://github.com/salrashid123/composer_gcf/tree/master

对于较新版本的 Airflow 来说已经有点过时了,但不需要太多更改。 您需要使用重载的execute()方法来子类化SimpleHttpOperator。核心结构应该是这样的

from google.oauth2 import id_token
import google.auth.transport.requests
target_audience = '{link_to_your_cloud_run_function}'
request = google.auth.transport.requests.Request()
idt = id_token.fetch_id_token(request, target_audience)
self.headers = {'Authorization': "Bearer " + idt,
                'Content-Type': "application/octet-stream"}

注意。您需要使用 POST 请求才能从 Airflow 触发 Cloud Run Function (CRF)。 另外,请确保您使用的服务帐户具有 Cloud Run Invoker 角色才能使其正常工作。

© www.soinside.com 2019 - 2024. All rights reserved.