我有一个创建基本气流环境的AWS Cloudformation模板(一个EC2 t3.small实例同时托管Web服务器和调度程序,没有外部DB,没有celery执行程序)。该环境连接到Snowflake数据仓库,以将文件从S3推入Snowflake上的数据库。我在EC2实例上成功创建了一个气流环境,而我在CFT中所做的最后一件事是激活气流调度程序。日志似乎指示干净的启动,并且Web服务器启动没有问题。当运行测试DAG以仅连接到Snowflake时,出现此错误:
[2020-02-11 11:23:25,422] {__init__.py:1580} ERROR - 250001 (08001): None: Failed to connect to DB: xxxxxxxxxxx.us-east-1.snowflakecomputing.com:443. IP [69995088867598] is not allowed to access Snowflake. Contact your local security administrator.
这是我们所知道的:
1)来自EC2的IP地址很好,它没有被列入黑名单或不在雪花一侧的白名单上,因此此错误有些令人困惑。
2)在Airflow外部手动运行Python脚本可以正常工作-与Snowflake的连接按预期发生。
3)终止Airflow Scheduler,通过“ airflow scheduler -D”启动它,然后重新运行DAG,将成功运行。
4)日志除了我上面发布的内容以外,没有其他任何错误。
这是我的CFT的userdata部分:
'Fn::Base64':
!Sub |
#!/bin/bash -x
exec > /tmp/user-data.log 2>&1
# Path settings
export HOME=/root
export AIRFLOW_HOME=$HOME/airflow
export PATH=$PATH:/root/airflow:/root/airflow/bin
# Egress proxy and exceptions
# Step 1: Install Python version
apt-get update -y
apt-get install build-essential checkinstall -y
apt-get install libssl-dev openssl zlib1g-dev libffi-dev libsqlite3-dev libpq-dev postgresql postgresql-client python3-psycopg2 -y
wget https://www.python.org/ftp/python/3.7.3/Python-3.7.3.tgz
tar xzvf Python-3.7.3.tgz
cd Python-3.7.3
./configure
make
make altinstall
# Step 2: Create virtual environment with new Python version
cd ~
python3.7 -m venv airflow
# Create pip.conf
echo '[global]
proxy = http://http.proxy.com:8000
index-url = https://${ArtifactoryUsername}:${ArtifactoryPassword}@artifactory.com/api/pypi/pypi-prereleases/simple
index = https://${ArtifactoryUsername}:${ArtifactoryPassword}@artifactory.com/api/pypi/pypi-prereleases/simple' > $AIRFLOW_HOME/pip.conf
# Allow these ports through the ufw firewall
sudo ufw allow 8080
# Upgrade Pip
$AIRFLOW_HOME/bin/pip install --upgrade pip
/usr/local/bin/aws s3 sync s3://${S3CDAPBucket}/dags $AIRFLOW_HOME/dags/
# Install required PIP packages into virtual environment
$AIRFLOW_HOME/bin/pip install -r $AIRFLOW_HOME/dags/requirements.txt --no-cache-dir --retries 10
# Setup airflow local db; edit config file as needed
$AIRFLOW_HOME/bin/airflow initdb
sed -i 's/load_examples\s=\sTrue/load_examples = False/g' $AIRFLOW_HOME/airflow.cfg
sed -i 's/default_dag_run_display_number\s=\s25/default_dag_run_display_number = 5/g' $AIRFLOW_HOME/airflow.cfg
sed -i "/remote_logging\s=\sFalse/c\remote_logging = True" $AIRFLOW_HOME/airflow.cfg
sed -i "/remote_base_log_folder\s=/c\remote_base_log_folder = s3://${S3CDAPBucket}/logs" $AIRFLOW_HOME/airflow.cfg
# Re-init the database to use the postgres instance; start scheduler and webserver
$AIRFLOW_HOME/bin/airflow pool -i $AIRFLOW_HOME/dags/config/airflow/airflow-pool-config.json
sed -i "/snowflake_batch_password/c\ \"snowflake_batch_password\" : \"${SnowflakeBatchPassword}\"," $AIRFLOW_HOME/dags/config/airflow/airflow-variables-dev.json
$AIRFLOW_HOME/bin/airflow variables -i $AIRFLOW_HOME/dags/config/airflow/airflow-variables-dev.json
sudo apt autoremove -y
chgrp -R cloud-user /root
chmod -R g+rwx /root
$AIRFLOW_HOME/bin/airflow webserver -D
$AIRFLOW_HOME/bin/airflow scheduler -D
与雪花的DAG测试连接看起来像这样:
from airflow import DAG
from datetime import datetime, timedelta
import json
import os
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow import AirflowException
from snowflake import snowflake_hook
from aws.s3 import FileOps
import boto3
from dag_admin import common_dag_tasks, landing_zone_dag_tasks, raw_zone_dag_tasks
from logger import logger
from functools import partial
import traceback
from data_validation import batch_validation
from snowflake.snowflake_hook import SnowflakeHook
def snowflake_task():
my_snowflake = SnowflakeHook(account='xxxxxxxxxxx.us-east-1',
username='SNOWFLAKE_DEV_BATCH_USER',
password='password',
warehouse='DEV_BATCH_XSMALL',
role='DEV_DB_DEVELOPER_ROLE')
conn = my_snowflake.get_conn()
cur = conn.cursor()
try:
cur.execute('select count(*) as row_cnt from DEV.LANDING__MST_DC_DBO.DC_ACCOUNTING_ITEM__BULK')
for (row_cnt) in cur:
print('Row Count = {}'.format(row_cnt))
finally:
cur.close()
conn.close()
dag_id = 'snowflake-test'
my_dag = DAG(dag_id=dag_id,
start_date=datetime.today(),
schedule_interval=None)
start_task = PythonOperator(
task_id="start-task",
dag=my_dag,
python_callable=snowflake_task)
Requirements.txt:
apache-airflow==1.10.3
Jinja2==2.10.0
Werkzeug==0.14.1
tzlocal==1.5.1
Flask==1.0.4
snowflake-connector-python==2.0.3
inhouse-snowflake==1.0.0
marshmallow-sqlalchemy==0.17.1
inhouse-aws==1.0.0
inhouse-dag-admin==1.0.0
psycopg2==2.8.4
boto3==1.9.253
inhouse-logging==1.0.0
inhouse-data-validation==1.0.0
基于该错误,您似乎在Snowflake帐户中启用了网络策略。您需要获取EC2计算机的公共IP并将其添加到允许的IP中。
Here完整教程。另外,您需要考虑使用Elastic-IP并连接到您的计算机,这样,即使您的计算机终止了,公共IP也不会更改,您仍然可以附加publicIP。