我正在尝试编写一些 python 脚本,这些脚本将获取 Snowflake 中的表并将它们传输到 postgreSQL 数据库。我必须将雪花表中的每一行转换为 json 字符串,因此在 postgreSQL 中,该表将只有索引列和一个包含雪花表中整行的 json 字符串。
我可以处理这部分,但我很难缩短加载时间。我当前的方法涉及使用 Snowflake COPY 命令将 snwoflake 表作为 csv.gz 文件提取到 s3 存储桶中。然后,在 s3 中,我使用 psycopg2 库连接到 postgreSQL 数据库,并使用 copy_expert 方法和另一个 COPY 查询将所有 csv 文件从 s3 加载到我的 postgreSQL 数据库中的特定表中。
整个过程需要我 2-3 小时才能加载大约 50-60 GB 的数据。通过谷歌搜索,我得到的印象是这可以更快地完成,并且想知道我是否可以使用 python 从雪花直接连接到 postgreSQL 并执行一些命令来复制数据?任何建议表示赞赏,谢谢!
当前方法涉及使用 python 对 Snowflake 和 postgreSQL 数据库执行查询。对雪花中的表进行一些操作后,我使用以下方法将其复制到 s3 中:
import os
import psycopg2
import boto3
from connections import SnFlDWH
# copy table from snowflake into s3
query = f"COPY INTO 's3://{bucket_name}/{s3_key}'
FROM (
SELECT *
FROM {db_name}.{schema_name}.{table_name}
)
FILE_FORMAT = (
TYPE = CSV,
FIELD_OPTIONALLY_ENCLOSED_BY = '\"'
)
OVERWRITE = TRUE
CREDENTIALS = (
AWS_KEY_ID='{aws_key}',
AWS_SECRET_KEY='{aws_secret}'
);"
# establish a connection to snowflake and execute the query
snfl_conn = SnFlDWH()
snfl_conn.execqute_sql(query)
# establish a connection to postgreSQL db
pstg_conn = psycopg2.connect(
host=host,
database=database,
user=user,
password=password
)
cursor = conn.cursor()
# connect to s3 and get a list of all files on specific s3 bucket
# download each csv.gz file and read the file in locally
# load each csv file into the specified postgreSQL table using the COPY command
s3_client = boto3.client('s3', aws_key=aws_key, aws_secret=aws_secret, region_name=region_name)
response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=s3_key)
for obj in response.get('Contents', []):
file_name = obj['Key']
if file_name.endswith('.csv.gz'):
# Download files from S3
local_file_name = f"/tmp/{os.path.basename(file_name)}"
s3_client.download_file(bucket_name, file_name, local_file_name)
#Load data from local file into Postgres table
with gzip.open(local_file_name, 'rb') as f:
cursor.copy_expert(f"COPY {postgres_table_name} FROM STDIN WITH CSV DELIMITER ','", f)
# Clean up: Delete the downloaded file
os.remove(local_file_name)
#Delete file from S3
s3_client.delete_object(Bucket=bucket_name, Key=file_name)
# Commit the transaction and close the cursor and connection
conn.commit()
cursor.close()
conn.close()
访问 Snowflake 之外的数据库或其他服务需要使用外部网络访问。请参阅从 Snowflake 中连接到本地数据库,了解更多信息以及连接到另一个数据库的示例。