#这是我的胶水火花代码
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import requests
import json
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
s3_input = "s3://dynamodb-bucket-******/raw/"
dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="s3",connection_options={"paths": [s3_input]},format="csv",format_options={"withHeader":True})
df=dynamic_frame.toDF()
json_data=df.toJSON().collect()
api_url="https://*****.execute-api.us-east-1.amazonaws.com/dev"
for record in json_data:
headers={'Content-Type':'application/json'}
response=requests.post(api_url,data=record,headers=headers)
if response.status_code!=200:
print("Error sending data:{response.content}")
job.commit()
#这是我的 lambda 函数
import json
import boto3
dynamodb = boto3.resource('dynamodb')
table=dynamodb.Table('my_dynamodb_table')
def lambda_handler(event, context):
data=json.loads(event['data']['headers'])
table.put_item(Item=data)
return {
'statusCode': 200,
'body': json.dumps('Data inserted successfully')
}
#这是我在 lambda 函数中遇到的错误。你能帮忙吗?
[错误] KeyError:“数据” 回溯(最近一次调用最后一次): 文件“/var/task/lambda_function.py”,第 8 行,位于 lambda_handler 中 数据=json.loads(事件['数据']['标题'])
我创建了一个粘合作业,从 s3 中选取 csv 文件并将其转换为 json,并使用 api 网关和 lambda 函数创建 api 网关端点。如何使用 api gateway 和 lambda 将数据推送到 dynamo db 表
Glue 脚本以 Lambda 函数期望的格式构建 JSON 有效负载。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import requests
import json
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
s3_input = "s3://dynamodb-bucket-******/raw/"
dynamic_frame = glueContext.create_dynamic_frame.from_options(
connection_type="s3",
connection_options={"paths": [s3_input]},
format="csv",
format_options={"withHeader": True}
)
df = dynamic_frame.toDF()
json_data = df.toJSON().collect()
api_url = "https://*****.execute-api.us-east-1.amazonaws.com/dev"
headers = {'Content-Type': 'application/json'}
for record in json_data:
response = requests.post(api_url, data=record, headers=headers)
if response.status_code != 200:
print(f"Error sending data: {response.content}")
job.commit()
设置 API Gateway 以在发布数据时触发 Lambda 函数。 确保集成请求设置为“代理集成”模式,这允许 API Gateway 将整个请求正文传递给 Lambda。
Glue 作业发送的负载是一个 JSON 字符串,因此 Lambda 需要对其进行相应的解析
import json
import boto3
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('my_dynamodb_table')
def lambda_handler(event, context):
try:
data = json.loads(event['body']) # API Gateway sends data under 'body'
table.put_item(Item=data)
return {
'statusCode': 200,
'body': json.dumps('Data inserted successfully')
}
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps(f"Error processing data: {str(e)}")
}