我已经检查了我的文件很多次,我想我已经在我的 lambda_function.py 中创建了 lambda_handler。还是没成功。
import json
import boto3
import psycopg2
from datetime import datetime, timedelta
import uuid
import os
# from dotenv import load_dotenv
import requests
# load_dotenv()
# Initialize AWS resources
sns = boto3.client('sns')
# db_url = os.environ['DB_URL'] # jdbc:postgresql://db_endpoint/db_name
# db_username = os.environ['DB_USERNAME']
# db_password = os.environ['DB_PASSWORD']
# db_host = db_url.split("://")[1].split("/")[0]
# db_name = db_url.split("/")[-1]
def send_verification_email(email, token):
# Generate the verification link
verification_link = f"https://{os.environ['DOMAIN_NAME']}/v1/user/verify?token={token}"
# Send the email via Mailgun
api_url = f"https://api.mailgun.net/v3/{os.environ['DOMAIN_NAME']}/messages"
api_key = os.environ['MAILGUN_API_KEY']
# Prepare the email data
email_data = {
"from": f"noreply@{os.environ['DOMAIN_NAME']}",
"to": [email],
"subject": "Verify Your Email Address",
"text": f"Please verify your email by clicking the link: {verification_link}",
# Optionally, you can include HTML content like this:
# "html": f"<html><body><p>Please verify your email by clicking the link: <a href='{verification_link}'>Click here</a></p></body></html>"
}
# Send the email
try:
response = requests.post(
api_url,
auth=("api", api_key),
data=email_data
)
if response.status_code == 200:
return True
else:
print(f"Failed to send email: {response.text}")
return False
except Exception as e:
print(f"Error sending email: {e}")
return False
# def store_email_tracking(email, token):
# Establish a connection to PostgreSQL
# try:
# conn = psycopg2.connect(
# host=db_host,
# dbname=db_name,
# user=db_username,
# password=db_password
# )
# cursor = conn.cursor()
# # Store the email and token with an expiry time (2 minutes)
# expiry_time = datetime.now() + timedelta(minutes=2)
# cursor.execute("INSERT INTO email_tracking (email, token, expiry_time) VALUES (%s, %s, %s)",
# (email, token, expiry_time))
# conn.commit()
# cursor.close()
# conn.close()
# except Exception as e:
# print(f"Error storing email tracking info: {e}")
def lambda_handler(event, context):
# Extract the email address from the SNS message
message = json.loads(event['Records'][0]['Sns']['Message'])
email = message['email']
# Generate a unique token for the user
token = str(uuid.uuid4())
# Send the verification email
if send_verification_email(email, token):
# Store email tracking information
# store_email_tracking(email, token)
return {
'statusCode': 200,
'body': json.dumps('Verification email sent successfully.')
}
else:
return {
'statusCode': 500,
'body': json.dumps('Failed to send verification email.')
}
回应: { "errorMessage": "模块'lambda_function'上缺少处理程序'lambda_handler'", "errorType": "Runtime.HandlerNotFound", “请求ID”:“”, “堆栈跟踪”:[] }
我已经上传了这个结构的文件
lambda_function.py
boto3/
psycopg2/
requests/
但在AWS
lambda_function/lambda_function.py
boto3/
psycopg2/
requests/
所以我将 lambda_function.py 移出尝试 并测试功能 它给了我一个错误 回复: { "errorMessage": "模块'lambda_function'上缺少处理程序'lambda_handler'", "errorType": "Runtime.HandlerNotFound", “请求ID”:“”, “堆栈跟踪”:[] }
尝试直接使用 CLI 压缩文件:
zip -r deployment_package.zip lambda_function.py boto3/ psycopg2/ requests/
这将确保您的 lambda_function 位于根目录中。 对于 lambda_handler,尝试从 aws 控制台定义:
def lambda_handler(event, context):
pass
看看你是否会继续遇到同样的错误。