无法在EC2上使用新创建的气流环境连接到Snowflake

问题描述 投票:0回答:1

我有一个创建基本气流环境的AWS Cloudformation模板(一个EC2 t3.small实例同时托管Web服务器和调度程序,没有外部DB,没有celery执行程序)。该环境连接到Snowflake数据仓库,以将文件从S3推入Snowflake上的数据库。我在EC2实例上成功创建了一个气流环境,而我在CFT中所做的最后一件事是激活气流调度程序。日志似乎指示干净的启动,并且Web服务器启动没有问题。当运行测试DAG以仅连接到Snowflake时,出现此错误:

[2020-02-11 11:23:25,422] {__init__.py:1580} ERROR - 250001 (08001): None: Failed to connect to DB: xxxxxxxxxxx.us-east-1.snowflakecomputing.com:443. IP [69995088867598] is not allowed to access Snowflake.  Contact your local security administrator.

这是我们所知道的:

1)来自EC2的IP地址很好,它没有被列入黑名单或不在雪花一侧的白名单上,因此此错误有些令人困惑。

2)在Airflow外部手动运行Python脚本可以正常工作-与Snowflake的连接按预期发生。

3)终止Airflow Scheduler,通过“ airflow scheduler -D”启动它,然后重新运行DAG,将成功运行。

4)日志除了我上面发布的内容以外,没有其他任何错误。

这是我的CFT的userdata部分:

    'Fn::Base64':
      !Sub |
      #!/bin/bash -x
      exec > /tmp/user-data.log 2>&1

      # Path settings
      export HOME=/root
      export AIRFLOW_HOME=$HOME/airflow
      export PATH=$PATH:/root/airflow:/root/airflow/bin

      # Egress proxy and exceptions

      # Step 1: Install Python version
      apt-get update -y
      apt-get install build-essential checkinstall -y
      apt-get install libssl-dev openssl zlib1g-dev libffi-dev libsqlite3-dev libpq-dev postgresql postgresql-client python3-psycopg2 -y
      wget https://www.python.org/ftp/python/3.7.3/Python-3.7.3.tgz
      tar xzvf Python-3.7.3.tgz
      cd Python-3.7.3
      ./configure
      make
      make altinstall

      # Step 2: Create virtual environment with new Python version
      cd ~
      python3.7 -m venv airflow

      # Create pip.conf
      echo '[global]
      proxy = http://http.proxy.com:8000
      index-url = https://${ArtifactoryUsername}:${ArtifactoryPassword}@artifactory.com/api/pypi/pypi-prereleases/simple
      index = https://${ArtifactoryUsername}:${ArtifactoryPassword}@artifactory.com/api/pypi/pypi-prereleases/simple' > $AIRFLOW_HOME/pip.conf

      # Allow these ports through the ufw firewall
      sudo ufw allow 8080

      # Upgrade Pip
      $AIRFLOW_HOME/bin/pip install --upgrade pip
      /usr/local/bin/aws s3 sync s3://${S3CDAPBucket}/dags $AIRFLOW_HOME/dags/

      # Install required PIP packages into virtual environment
      $AIRFLOW_HOME/bin/pip install -r $AIRFLOW_HOME/dags/requirements.txt --no-cache-dir --retries 10

      # Setup airflow local db; edit config file as needed
      $AIRFLOW_HOME/bin/airflow initdb
      sed -i 's/load_examples\s=\sTrue/load_examples = False/g' $AIRFLOW_HOME/airflow.cfg
      sed -i 's/default_dag_run_display_number\s=\s25/default_dag_run_display_number = 5/g' $AIRFLOW_HOME/airflow.cfg
      sed -i "/remote_logging\s=\sFalse/c\remote_logging = True" $AIRFLOW_HOME/airflow.cfg
      sed -i "/remote_base_log_folder\s=/c\remote_base_log_folder = s3://${S3CDAPBucket}/logs" $AIRFLOW_HOME/airflow.cfg

      # Re-init the database to use the postgres instance; start scheduler and webserver
      $AIRFLOW_HOME/bin/airflow pool -i $AIRFLOW_HOME/dags/config/airflow/airflow-pool-config.json
      sed -i "/snowflake_batch_password/c\   \"snowflake_batch_password\" : \"${SnowflakeBatchPassword}\"," $AIRFLOW_HOME/dags/config/airflow/airflow-variables-dev.json

      $AIRFLOW_HOME/bin/airflow variables -i $AIRFLOW_HOME/dags/config/airflow/airflow-variables-dev.json

      sudo apt autoremove -y
      chgrp -R cloud-user /root
      chmod -R g+rwx /root
      $AIRFLOW_HOME/bin/airflow webserver -D
      $AIRFLOW_HOME/bin/airflow scheduler -D

与雪花的DAG测试连接看起来像这样:

from airflow import DAG
from datetime import datetime, timedelta
import json
import os
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow import AirflowException
from snowflake import snowflake_hook
from aws.s3 import FileOps
import boto3
from dag_admin import common_dag_tasks, landing_zone_dag_tasks, raw_zone_dag_tasks
from logger import logger
from functools import partial
import traceback
from data_validation import batch_validation
from snowflake.snowflake_hook import SnowflakeHook

def snowflake_task():
    my_snowflake = SnowflakeHook(account='xxxxxxxxxxx.us-east-1',
                                            username='SNOWFLAKE_DEV_BATCH_USER',
                                            password='password',
                                            warehouse='DEV_BATCH_XSMALL',
                                            role='DEV_DB_DEVELOPER_ROLE')
    conn = my_snowflake.get_conn()
    cur = conn.cursor()
    try:
            cur.execute('select count(*) as row_cnt from DEV.LANDING__MST_DC_DBO.DC_ACCOUNTING_ITEM__BULK')
            for (row_cnt) in cur:
                print('Row Count = {}'.format(row_cnt))
    finally:
            cur.close()

    conn.close()


dag_id = 'snowflake-test'
my_dag = DAG(dag_id=dag_id,
             start_date=datetime.today(),
             schedule_interval=None)

start_task = PythonOperator(
                        task_id="start-task",
                        dag=my_dag,
                        python_callable=snowflake_task)

Requirements.txt:

apache-airflow==1.10.3
Jinja2==2.10.0
Werkzeug==0.14.1
tzlocal==1.5.1
Flask==1.0.4
snowflake-connector-python==2.0.3
inhouse-snowflake==1.0.0
marshmallow-sqlalchemy==0.17.1
inhouse-aws==1.0.0
inhouse-dag-admin==1.0.0
psycopg2==2.8.4
boto3==1.9.253
inhouse-logging==1.0.0
inhouse-data-validation==1.0.0
snowflake-data-warehouse airflow-scheduler
1个回答
0
投票

基于该错误,您似乎在Snowflake帐户中启用了网络策略。您需要获取EC2计算机的公共IP并将其添加到允许的IP中。

Here完整教程。另外,您需要考虑使用Elastic-IP并连接到您的计算机,这样,即使您的计算机终止了,公共IP也不会更改,您仍然可以附加publicIP。

© www.soinside.com 2019 - 2024. All rights reserved.