无法使用 Glue 将数据从 s3 插入到 DDB

问题描述 投票:0回答:1

从 s3 读取数据后将数据写入 DDB 时,失败并出现以下错误:

错误类别:UNCCLASSIFIED_ERROR;调用 o112.pyWriteDynamicFrame 时发生错误。提供的项目密钥列表包含重复项(服务:AmazonDynamoDBv2;状态代码:400;错误代码:ValidationException;请求 ID:K1F7N8KUCE2Q4QMBT8EF6CCM57VV4KQNSO5AEMVJF66Q9ASUAAJG;代理:null)

我的脚本如下:

import sys
import boto3 
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame

def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame:
    for alias, frame in mapping.items():
        frame.toDF().createOrReplaceTempView(alias)
    result = spark.sql(query)
    return DynamicFrame.fromDF(result, glueContext, transformation_ctx)
    
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Initialize the DynamoDB client
dynamodb_client = boto3.client('dynamodb')

# Define the DynamoDB table name
dynamodb_table_name = "TABLE_NAME"


# Script generated for node Amazon S3
AmazonS3_node1712735773212 = glueContext.create_dynamic_frame.from_options(
    format_options={}, 
    connection_type="s3", 
    format="parquet", 
    connection_options={"paths": ["S3_PATH"], "recurse": True,
    "exclusions" : ["SPECIFIC_S3_PATH"]},
    transformation_ctx="AmazonS3_node1712735773212")

# Create the DynamoDB table if it doesn't exist
try:
    dynamodb_client.create_table(
        TableName=dynamodb_table_name,
        KeySchema=[
             {'AttributeName': 'ATT1', 'KeyType': 'HASH'},  # Assuming artist_dmid is the partition key
            {'AttributeName': 'ATT2', 'KeyType': 'RANGE'} 
            
        ],
        AttributeDefinitions=[
            {'AttributeName': 'ATT1', 'AttributeType': 'N'}, 
            {'AttributeName': 'ATT2', 'AttributeType': 'N'},
        ],
        BillingMode= 'PAY_PER_REQUEST'
    )
    print(f"DynamoDB table '{dynamodb_table_name}' created successfully.")
except dynamodb_client.exceptions.ResourceInUseException:
    print(f"DynamoDB table '{dynamodb_table_name}' already exists.")


# Script generated for node SQL Query
SqlQuery0 = '''
select DISTINCT ATT1,ATT2,ATT3,ATT4 from myDataSource

'''
SQLQuery_node1712736016679 = sparkSqlQuery(glueContext, query = SqlQuery0, mapping = {"myDataSource":AmazonS3_node1712735773212}, transformation_ctx = "SQLQuery_node1712736016679")

# SelectFields_node1711627341244 = SelectFields.apply(
#     frame=AmazonS3_node1712735773212,
#     paths=[
#         "ATT1",
#         "ATT2",
#         "ATT3",
#         "ATT4",
#     ],
#     transformation_ctx="SelectFields_node1711627341244",
# )

glueContext.write_dynamic_frame_from_options(
    frame=SQLQuery_node1712736016679 ,
    connection_type="dynamodb",
    connection_options={ "dynamodb.region": "us-east-1","dynamodb.output.tableName": dynamodb_table_name,
        "dynamodb.throughput.write.percent": "1.0"
    }
)

job.commit()

在某些情况下,在将这些项目添加到 DDB 之前,如何更新上述脚本以过滤掉列表中的重复项目?

sql amazon-dynamodb etl aws-glue
1个回答
0
投票

您已经有了

SELECT DISTINCT
查询,但缺少的重要部分是它必须仅基于表键,您包括第三个和第四个属性。

© www.soinside.com 2019 - 2024. All rights reserved.