我正在尝试使用

问题描述 投票:0回答:5

我很努力,很确定此行为是气流中的错误。我在这里为此创建了一张票:
https://issues.apache.org/jira/browse/airflow-2910

现在,您能做的最好的方法是覆盖SimpleHttpoperator和Httphook,以更改httphook.get_conn的工作方式(接受https)。我可能最终这样做,如果我这样做,我会发布一些代码。

update:

https airflow
5个回答
8
投票
操作员替代:

from airflow.operators.http_operator import SimpleHttpOperator from airflow.exceptions import AirflowException from operators.https_support.https_hook import HttpsHook class HttpsOperator(SimpleHttpOperator): def execute(self, context): http = HttpsHook(self.method, http_conn_id=self.http_conn_id) self.log.info("Calling HTTP method") response = http.run(self.endpoint, self.data, self.headers, self.extra_options) if self.response_check: if not self.response_check(response): raise AirflowException("Response check returned False.") if self.xcom_push_flag: return response.text

hook覆盖

from airflow.hooks.http_hook import HttpHook import requests class HttpsHook(HttpHook): def get_conn(self, headers): """ Returns http session for use with requests. Supports https. """ conn = self.get_connection(self.http_conn_id) session = requests.Session() if "://" in conn.host: self.base_url = conn.host elif conn.schema: self.base_url = conn.schema + "://" + conn.host elif conn.conn_type: # https support self.base_url = conn.conn_type + "://" + conn.host else: # schema defaults to HTTP self.base_url = "http://" + conn.host if conn.port: self.base_url = self.base_url + ":" + str(conn.port) + "/" if conn.login: session.auth = (conn.login, conn.password) if headers: session.headers.update(headers) return session

usage:

simplehttpoperator的Drop-In替换。

现在已经几个月大了,但是对于我没有任何问题,我没有任何问题。

在我的初始测试中,我正在向SendGrid要求模板请求,因此连接是这样的:

Conn Id : sendgrid_templates_test Conn Type : HTTP Host : https://api.sendgrid.com/ Extra : { "authorization": "Bearer [my token]"}

然后在DAG代码中:
get_templates = SimpleHttpOperator(
        task_id='get_templates',
        method='GET',
        endpoint='/v3/templates',
        http_conn_id = 'sendgrid_templates_test',
        trigger_rule="all_done",
        xcom_push=True
        dag=dag,
    )

有效。 另请注意,我的请求发生在分支机构运算符之后,因此我需要适当地设置触发规则(即使在一个分支机构被跳过时,“ all_done”以确保它发射),这与问题无关,但我只是想指出。

现在要清楚,我确实得到了不安全的请求警告,因为我没有启用证书验证。 但是您可以在下面看到结果日志

[2019-02-21 16:15:01,333] {http_operator.py:89} INFO - Calling HTTP method [2019-02-21 16:15:01,336] {logging_mixin.py:95} INFO - [2019-02-21 16:15:01,335] {base_hook.py:83} INFO - Using connection to: id: sendgrid_templates_test. Host: https://api.sendgrid.com/, Port: None, Schema: None, Login: None, Password: XXXXXXXX, extra: {'authorization': 'Bearer [my token]'} [2019-02-21 16:15:01,338] {logging_mixin.py:95} INFO - [2019-02-21 16:15:01,337] {http_hook.py:126} INFO - Sending 'GET' to url: https://api.sendgrid.com//v3/templates [2019-02-21 16:15:01,956] {logging_mixin.py:95} WARNING - /home/csconnell/.pyenv/versions/airflow/lib/python3.6/site-packages/urllib3/connectionpool.py:847: InsecureRequestWarning: Unverified HTTPS request is being made. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/latest/advanced-usage.html#ssl-warnings InsecureRequestWarning) [2019-02-21 16:15:05,242] {logging_mixin.py:95} INFO - [2019-02-21 16:15:05,241] {jobs.py:2527} INFO - Task exited with return code 0

6
投票

在气流2.x中,您可以通过在设置连接时传递

https

用于架构值来使用https url,并且仍然可以使用
SimpleHttpOperator
如下所示。

my_api = SimpleHttpOperator( task_id="my_api", http_conn_id="YOUR_CONN_ID", method="POST", endpoint="/base-path/end-point", data=get_data, headers={"Content-Type": "application/json"}, )

尝试使用环境变量设置连接时,HTTP/HTTPS也存在相同的问题(尽管当我在UI上设置连接时,它起作用)。 

6
投票
@melchoir55

opened (

https://issues.apache.org/jira/browse/AIRFLOW-2910
) and you don't need to make a custom operator for that, the problem is not that HttpHook or HttpOperator can't use HTTPS, the problem is the way get_hook parse the connection string when dealing with HTTP, it actually了解第一部分(http://或https://)是连接类型。
总而言之,您不需要自定义操作员,您可以在env中设置连接如下:

AIRFLOW_CONN_HTTP_EXAMPLE=http://https%3a%2f%2fexample.com/

instead: enter image description hereAIRFLOW_CONN_HTTP_EXAMPLE=https://example.com/


4
投票
或在UI上设置连接。

它不是建立连接的直观方式,但我认为他们正在努力使用Ariflow 2.0的更好的方法来解析连接。

我正在使用气流2.1.0,以下设置适用于httpsapi 在连接UI,像往常一样设置主机名,无需在架构字段中指定“ https”,如果您的API服务器请求请求,请不要忘记设置登录帐户和密码。 Connection UI设置

在构建任务时,在SimpleHttpoperator中添加

Extra_options

参数,然后将您的CA_Bundle认证文件路径作为键验证的值,如果您没有认证文件,请使用false来跳过验证。
任务定义

Reference:

Herey
    

aind instead实施HTTPShook,我们可以将一行代码放入httpsoperator(simplehttpoperator)@above @above中

... self.extra_options['verify'] = True response = http.run(self.endpoint, self.data, self.headers, self.extra_options) ...


1
投票

    host
  • 在连接UI表单中的名称,最终不要c

    'endpoint'参数/

    SimpleHttpOperator
  • 开头

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.