我正在尝试使用JdbcOperator连接到配置单元表。我的代码如下:
import datetime as dt
from datetime import timedelta
import airflow
from airflow.models import DAG
from airflow.operators.jdbc_operator.JdbcOperator import JdbcOperator
args = {
'owner': 'Airflow',
'start_date': dt.datetime(2020, 3, 24),
'retries': 1,
'retry_delay': dt.timedelta(minutes=5),
}
dag_hive = DAG(dag_id="import_hive",default_args=args, schedule_interval= " 0 * * * *",dagrun_timeout=timedelta(minutes=60))
hql_query = """USE testdb;
CREATE TABLE airflow-test-table LIKE testtable;"""
hive_task = JdbcOperator(sql = hql_query, task_id="hive_script_task", jdbc_conn_id="hive_conn_default",dag=dag_hive)
hive_task
我出错了
ModuleNotFoundError:未命名模块'airflow.operators.jdbc_operator.JdbcOperator';'airflow.operators.jdbc_operator'不是软件包
我已经交叉检查了sitepackages文件夹中的软件包,可用。无法弄清楚为什么我会收到此错误。
导入JdbcOperator()模块的正确方法如下:
from airflow.operators.jdbc_operator import JdbcOperator
请记住,JDBCOperator
还需要从属的jaydebeapi
Python软件包,需要将其提供给当前的Airflow环境。
通过运行以下命令安装使用JDBC运算符的依赖项:
pip install 'apache-airflow[jdbc]'
然后将JdbcOperator
导入到您的DAG文件中,例如@mk_sta,如下所示:
from airflow.operators.jdbc_operator import JdbcOperator