如何使用awsglue、api网关和awss3中的lambda将数据推送到dynamo数据库表

问题描述 投票:0回答:1

#这是我的胶水火花代码

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import requests
import json
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

s3_input = "s3://dynamodb-bucket-******/raw/"
dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="s3",connection_options={"paths":     [s3_input]},format="csv",format_options={"withHeader":True})
df=dynamic_frame.toDF()
json_data=df.toJSON().collect()

api_url="https://*****.execute-api.us-east-1.amazonaws.com/dev"
for record in json_data:
    headers={'Content-Type':'application/json'}
    response=requests.post(api_url,data=record,headers=headers)
    if response.status_code!=200:
        print("Error sending data:{response.content}")

job.commit()

#这是我的 lambda 函数

import json
import boto3

dynamodb = boto3.resource('dynamodb')
table=dynamodb.Table('my_dynamodb_table')
def lambda_handler(event, context):
    data=json.loads(event['data']['headers'])
    table.put_item(Item=data)
    return {
        'statusCode': 200,
        'body': json.dumps('Data inserted successfully')
    }

#这是我在 lambda 函数中遇到的错误。你能帮忙吗?

[错误] KeyError:“数据” 回溯(最近一次调用最后一次): 文件“/var/task/lambda_function.py”,第 8 行,位于 lambda_handler 中 数据=json.loads(事件['数据']['标题'])

我创建了一个粘合作业,从 s3 中选取 csv 文件并将其转换为 json,并使用 api 网关和 lambda 函数创建 api 网关端点。如何使用 api gateway 和 lambda 将数据推送到 dynamo db 表

python-3.x pyspark aws-lambda aws-api-gateway aws-glue
1个回答
0
投票

Glue 脚本以 Lambda 函数期望的格式构建 JSON 有效负载。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import requests
import json

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

s3_input = "s3://dynamodb-bucket-******/raw/"
dynamic_frame = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": [s3_input]},
    format="csv",
    format_options={"withHeader": True}
)
df = dynamic_frame.toDF()
json_data = df.toJSON().collect()

api_url = "https://*****.execute-api.us-east-1.amazonaws.com/dev"
headers = {'Content-Type': 'application/json'}

for record in json_data:
    response = requests.post(api_url, data=record, headers=headers)
    if response.status_code != 200:
        print(f"Error sending data: {response.content}")

job.commit()

设置 API Gateway 以在发布数据时触发 Lambda 函数。 确保集成请求设置为“代理集成”模式,这允许 API Gateway 将整个请求正文传递给 Lambda。

Glue 作业发送的负载是一个 JSON 字符串,因此 Lambda 需要对其进行相应的解析

import json
import boto3

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('my_dynamodb_table')

def lambda_handler(event, context):
    try:

        data = json.loads(event['body'])  # API Gateway sends data under 'body'

        table.put_item(Item=data)
        
        return {
            'statusCode': 200,
            'body': json.dumps('Data inserted successfully')
        }
    except Exception as e:
        return {
            'statusCode': 500,
            'body': json.dumps(f"Error processing data: {str(e)}")
        }
© www.soinside.com 2019 - 2024. All rights reserved.