我很努力,很确定此行为是气流中的错误。我在这里为此创建了一张票:https://issues.apache.org/jira/browse/airflow-2910
现在,您能做的最好的方法是覆盖SimpleHttpoperator和Httphook,以更改httphook.get_conn的工作方式(接受https)。我可能最终这样做,如果我这样做,我会发布一些代码。update:
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.exceptions import AirflowException
from operators.https_support.https_hook import HttpsHook
class HttpsOperator(SimpleHttpOperator):
def execute(self, context):
http = HttpsHook(self.method, http_conn_id=self.http_conn_id)
self.log.info("Calling HTTP method")
response = http.run(self.endpoint,
self.data,
self.headers,
self.extra_options)
if self.response_check:
if not self.response_check(response):
raise AirflowException("Response check returned False.")
if self.xcom_push_flag:
return response.text
from airflow.hooks.http_hook import HttpHook
import requests
class HttpsHook(HttpHook):
def get_conn(self, headers):
"""
Returns http session for use with requests. Supports https.
"""
conn = self.get_connection(self.http_conn_id)
session = requests.Session()
if "://" in conn.host:
self.base_url = conn.host
elif conn.schema:
self.base_url = conn.schema + "://" + conn.host
elif conn.conn_type: # https support
self.base_url = conn.conn_type + "://" + conn.host
else:
# schema defaults to HTTP
self.base_url = "http://" + conn.host
if conn.port:
self.base_url = self.base_url + ":" + str(conn.port) + "/"
if conn.login:
session.auth = (conn.login, conn.password)
if headers:
session.headers.update(headers)
return session
simplehttpoperator的Drop-In替换。
现在已经几个月大了,但是对于我没有任何问题,我没有任何问题。在我的初始测试中,我正在向SendGrid要求模板请求,因此连接是这样的:
Conn Id : sendgrid_templates_test
Conn Type : HTTP
Host : https://api.sendgrid.com/
Extra : { "authorization": "Bearer [my token]"}
get_templates = SimpleHttpOperator(
task_id='get_templates',
method='GET',
endpoint='/v3/templates',
http_conn_id = 'sendgrid_templates_test',
trigger_rule="all_done",
xcom_push=True
dag=dag,
)
有效。 另请注意,我的请求发生在分支机构运算符之后,因此我需要适当地设置触发规则(即使在一个分支机构被跳过时,“ all_done”以确保它发射),这与问题无关,但我只是想指出。
现在要清楚,我确实得到了不安全的请求警告,因为我没有启用证书验证。 但是您可以在下面看到结果日志
[2019-02-21 16:15:01,333] {http_operator.py:89} INFO - Calling HTTP method
[2019-02-21 16:15:01,336] {logging_mixin.py:95} INFO - [2019-02-21 16:15:01,335] {base_hook.py:83} INFO - Using connection to: id: sendgrid_templates_test. Host: https://api.sendgrid.com/, Port: None, Schema: None, Login: None, Password: XXXXXXXX, extra: {'authorization': 'Bearer [my token]'}
[2019-02-21 16:15:01,338] {logging_mixin.py:95} INFO - [2019-02-21 16:15:01,337] {http_hook.py:126} INFO - Sending 'GET' to url: https://api.sendgrid.com//v3/templates
[2019-02-21 16:15:01,956] {logging_mixin.py:95} WARNING - /home/csconnell/.pyenv/versions/airflow/lib/python3.6/site-packages/urllib3/connectionpool.py:847: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings
InsecureRequestWarning)
[2019-02-21 16:15:05,242] {logging_mixin.py:95} INFO - [2019-02-21 16:15:05,241] {jobs.py:2527} INFO - Task exited with return code 0
在气流2.x中,您可以通过在设置连接时传递
https
SimpleHttpOperator
如下所示。
my_api = SimpleHttpOperator(
task_id="my_api",
http_conn_id="YOUR_CONN_ID",
method="POST",
endpoint="/base-path/end-point",
data=get_data,
headers={"Content-Type": "application/json"},
)
尝试使用环境变量设置连接时,HTTP/HTTPS也存在相同的问题(尽管当我在UI上设置连接时,它起作用)。
opened (
https://issues.apache.org/jira/browse/AIRFLOW-2910) and you don't need to make a custom operator for that, the problem is not that HttpHook or HttpOperator can't use HTTPS, the problem is the way get_hook parse the connection string when dealing with HTTP, it actually了解第一部分(http://或https://)是连接类型。
总而言之,您不需要自定义操作员,您可以在env中设置连接如下:
AIRFLOW_CONN_HTTP_EXAMPLE=http://https%3a%2f%2fexample.com/
它不是建立连接的直观方式,但我认为他们正在努力使用Ariflow 2.0的更好的方法来解析连接。
我正在使用气流2.1.0,以下设置适用于httpsapi 在连接UI,像往常一样设置主机名,无需在架构字段中指定“ https”,如果您的API服务器请求请求,请不要忘记设置登录帐户和密码。 Connection UI设置
在构建任务时,在SimpleHttpoperator中添加Extra_options
参数,然后将您的CA_Bundle认证文件路径作为键验证的值,如果您没有认证文件,请使用false来跳过验证。任务定义
Reference:
Hereyaind instead实施HTTPShook,我们可以将一行代码放入httpsoperator(simplehttpoperator)@above @above中
...
self.extra_options['verify'] = True
response = http.run(self.endpoint,
self.data,
self.headers,
self.extra_options)
...
host
'endpoint'参数/
以
SimpleHttpOperator