我有一个正在运行的谷歌云功能,我正在尝试从 Airflow DAG 调用它。
到目前为止我尝试过的是使用 SimpleHttpOperator:
MY_TASK_NAME = SimpleHttpOperator(
task_id= "MY_TASK_NAME",
method='POST',
http_conn_id='http_default',
endpoint='https://us-central1-myprojectname.cloudfunctions.net/MyFunctionName',
data=({"schema": schema, "table": table}),
headers={"Content-Type": "application/json"},
xcom_push=False
)
但是深入日志,它说找不到资源:
{base_task_runner.py:98} 信息 - 子任务:
在此服务器上找不到请求的 URL
。 这就是我们所知道的。/https://us-central1-myprojectname.cloudfunctions.net/MyFunctionName
我还注意到它实际上发布到 https://www.google.com/ +我给出的网址:
Sending 'POST' to url: https://www.google.com/https://us-central1-myprojectname.cloudfunctions.net/MyFunctionName
调用该函数的正确方法是什么? 谢谢
这是因为您正在使用
http_conn_id='http_default'
。
http_default
连接如下所示:
如果您检查“主机”字段,它会显示
http://www.google.com/
。
使用
HTTP
连接类型创建新连接,或修改 http_default
连接并将主机更改为 https://us-central1-myprojectname.cloudfunctions.net/
然后将任务中的
endpoint
字段更新为:
MY_TASK_NAME = SimpleHttpOperator(
task_id= "MY_TASK_NAME",
method='POST',
http_conn_id='http_default',
endpoint='MyFunctionName',
data=({"schema": schema, "table": table}),
headers={"Content-Type": "application/json"},
xcom_push=False
)
编辑:在 URL 末尾添加 /
正如 @kaxil 指出的,您需要首先更改 http 连接。然后,您需要能够发送正确的身份验证以调用云功能。下面的链接有一个通过子类化来完成此操作的分步指南
SimpleHttpOperator
顺便说一句,谷歌应该让这个过程更加清晰。想要从 Google Cloud Composer 触发 Google Cloud Function (gcf) 是完全合理的。有关如何将 http 触发器发送到 gcf 的文档包括 Cloud Scheduler、Cloud Tasks、Cloud Pub/Sub 和许多其他文档(但不包括 Cloud Composer)
面临类似的问题,并使用 Wayback machine 从上述评论中获取分步指南。此人在 GitHub 存储库中仍然有完整版本(带有代码和屏幕截图): https://github.com/salrashid123/composer_gcf/tree/master
对于较新版本的 Airflow 来说已经有点过时了,但不需要太多更改。 您需要使用重载的execute()方法来子类化SimpleHttpOperator。核心结构应该是这样的
from google.oauth2 import id_token
import google.auth.transport.requests
target_audience = '{link_to_your_cloud_run_function}'
request = google.auth.transport.requests.Request()
idt = id_token.fetch_id_token(request, target_audience)
self.headers = {'Authorization': "Bearer " + idt,
'Content-Type': "application/octet-stream"}
注意。您需要使用 POST 请求才能从 Airflow 触发 Cloud Run Function (CRF)。 另外,请确保您使用的服务帐户具有 Cloud Run Invoker 角色才能使其正常工作。