这是 lambda 函数,我在 stepfunction 中使用它,并尝试将 csv 文件加载到 redshift 表,它需要层,我也尝试通过 pip 安装,但仍然收到以下错误
import redshift_connector
import pandas as pd
import boto3
import sys
def lambda_handler(event, context):
print(event)
# Initialize Redshift Connector
host ='REDSHIFT ARN'
port = ******
database = '*********'
user = '*******'
password = '*******'
# Initialize Redshift Connector
conn = redshift_connector.connect(
host=host,
port=port,
database=database,
user=user,
password=password
)
print(f"connection success {host}")
# Access the values of the arguments
bucket_name = event['validation_result']['Payload']['bucket_name']
print(bucket_name)
source_folder = event['validation_result']['Payload']['folder_name']
file_name = event['validation_result']['Payload']['file_name']
result={}
result['bucket_name']=bucket_name
result['file_name']=file_name
# Load CSV file into a Pandas DataFrame
csv_file_path = f's3://{bucket_name}/{source_folder}/{file_name}'
print(csv_file_path)
df = pd.read_csv(csv_file_path)
# Define the Redshift table name
table_name = 'club_games' # Change this to your actual Redshift table name
# Insert data into the Redshift table
for _, row in df[columns].iterrows():
values = ', '.join([f"'{value}'" if isinstance(value, str) else str(value) for value in row])
insert_sql = f"INSERT INTO {table_name} VALUES ({values})"
cursor.execute(insert_sql)
conn.commit()
# Archive the processed CSV file
s3 = boto3.client('s3')
archive_key='archive' + '/' + file_name
object_key= folder_name + '/' + file_name
# Copy the file to the archive folder
s3.copy_object(Bucket=bucket_name, CopySource={'Bucket': bucket_name, 'Key': object_key}, Key=archive_key)
# Delete the original file from the input folder
s3.delete_object(Bucket=bucket_name, Key=object_key)
conn.commit()
# Close the Redshift connection
conn.close()
return result
但是我尝试为 redshift_connector 安装 pip 库,但仍然显示是否有另一种方法可以解决此问题
{
"errorMessage": "Unable to import module 'lambda_function': No module named 'redshift_connector'",
"errorType": "Runtime.ImportModuleError",
"requestId": "c0aa62a9-e77d-44d3-b92b-acb9e7362ebd",
"stackTrace": []
}```
我一直使用 psycopg2 库从 Python Lambda 函数连接到 Redshift。下面的代码示例。
import psycopg2
...
dbconn = psycopg2.connect("dbname='%s' port='%s' user='%s' host='%s'
password='%s'" % (dbname, dbport, dbuser, dbhost, dbpass))