我如何将在AWS EC2上设置的Airflow正确连接到RDS?

问题描述 投票:0回答:1

我在运行Web服务器/调度程序的EC2上有一个气流实例。我想将MySQL RDS实例连接为后端元数据数据库,而不是本机SQLite。我替换了通过sql_alchemy连接到数据库的Airflow.cfg中的一行,以使用pymysql驱动程序连接到RDS:

#sql_alchemy_conn = sqlite:////home/cloud-user/airflow/airflow.db
sql_alchemy_conn = mysql+pymysql://admin:<PASSWORD>@airflow-db.xxxxxxxxxxxx.us-east-1.rds.amazonaws.com:3306/airflow

连接似乎工作正常,我能够通过在我的EC2实例上设置的MySQL客户端进入RDS实例并查询表。

当我打开或关闭DAG时,在我的shell中得到了这个讨厌的python堆栈跟踪:

[2019-09-30 14:00:51,774] {app.py:1891} ERROR - Exception on /admin/airflow/paused [POST]
Traceback (most recent call last):
  File "/home/cloud-user/.local/lib/python2.7/site-packages/flask/app.py", line 2446, in wsgi_app
    response = self.full_dispatch_request()
  File "/home/cloud-user/.local/lib/python2.7/site-packages/flask/app.py", line 1951, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/flask/app.py", line 1820, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/flask/app.py", line 1949, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/cloud-user/.local/lib/python2.7/site-packages/flask/app.py", line 1935, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/flask_admin/base.py", line 69, in inner
    return self._run_view(f, *args, **kwargs)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/flask_admin/base.py", line 368, in _run_view
    return fn(self, *args, **kwargs)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/flask_login/utils.py", line 258, in decorated_view
    return func(*args, **kwargs)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/airflow/www/utils.py", line 279, in wrapper
    session.commit()
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1027, in commit
    self.transaction.commit()
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 494, in commit
    self._prepare_impl()
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 473, in _prepare_impl
    self.session.flush()
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2459, in flush
    self._flush(objects)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2597, in _flush
    transaction.rollback(_capture_exception=True)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
    compat.reraise(exc_type, exc_value, exc_tb)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2557, in _flush
    flush_context.execute()
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
    rec.execute(self)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute
    uow,
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
    insert,
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/orm/persistence.py", line 1138, in _emit_insert_statements
    statement, params
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 988, in execute
    return meth(self, multiparams, params)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/sql/elements.py", line 287, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1107, in _execute_clauseelement
    distilled_params,
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1253, in _execute_context
    e, statement, parameters, cursor, context
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1473, in _handle_dbapi_exception
    util.raise_from_cause(sqlalchemy_exception, exc_info)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 398, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", line 1249, in _execute_context
    cursor, statement, parameters, context
  File "/home/cloud-user/.local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 552, in do_execute
    cursor.execute(statement, parameters)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/pymysql/cursors.py", line 170, in execute
    result = self._query(query)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/pymysql/cursors.py", line 328, in _query
    conn.query(q)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/pymysql/connections.py", line 517, in query
    self._affected_rows = self._read_query_result(unbuffered=unbuffered)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/pymysql/connections.py", line 732, in _read_query_result
    result.read()
  File "/home/cloud-user/.local/lib/python2.7/site-packages/pymysql/connections.py", line 1075, in read
    first_packet = self.connection._read_packet()
  File "/home/cloud-user/.local/lib/python2.7/site-packages/pymysql/connections.py", line 684, in _read_packet
    packet.check_error()
  File "/home/cloud-user/.local/lib/python2.7/site-packages/pymysql/protocol.py", line 220, in check_error
    err.raise_mysql_exception(self._data)
  File "/home/cloud-user/.local/lib/python2.7/site-packages/pymysql/err.py", line 109, in raise_mysql_exception
    raise errorclass(errno, errval)
ProgrammingError: (pymysql.err.ProgrammingError) (1064, u"You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '[(\\'is_paused\\', u\\'true\\'), (\\'dag_id\\', u\\'test\\')]'')' at line 1")
[SQL: INSERT INTO log (dttm, dag_id, task_id, event, execution_date, owner, extra) VALUES (%(dttm)s, %(dag_id)s, %(task_id)s, %(event)s, %(execution_date)s, %(owner)s, %(extra)s)]
[parameters: {'task_id': None, 'extra': "[('is_paused', u'true'), ('dag_id', u'test')]", 'execution_date': None, 'event': 'paused', 'owner': 'anonymous', 'dttm': datetime.datetime(2019, 9, 30, 18, 0, 51, 768073, tzinfo=<Timezone [UTC]>), 'dag_id': u'test'}]
(Background on this error at: http://sqlalche.me/e/f405) 

根据我在Airflow文档上看到的内容,我正在正确进行更改,并且'airflow initdb / resetdb'命令执行没有错误。

我已经花了很多时间来搜索该错误,但是对于该问题没有明确的答案。我真的不确定是否缺少先决条件,还是应该使用其他连接器?

EDIT:我正在使用python 2.7,如堆栈跟踪所示。 Airflow声称在短期内具有兼容性,但是我看到升级到python 3.6之后[Link to other solution],另一个SO用户的问题消失了。如果可以使用,我会尝试并进行更新。

mysql amazon-rds airflow airflow-scheduler
1个回答
0
投票

看来解决方案的确是利用虚拟环境升级到python 3.6,这是由于python 2.x和3.y具有Linux系统和Airflow所要求的双重性。具体来说,我遵循此guide,而我的DAG似乎已成功执行。

© www.soinside.com 2019 - 2024. All rights reserved.