从 s3 读取数据后将数据写入 DDB 时,失败并出现以下错误:
错误类别:UNCCLASSIFIED_ERROR;调用 o112.pyWriteDynamicFrame 时发生错误。提供的项目密钥列表包含重复项(服务:AmazonDynamoDBv2;状态代码:400;错误代码:ValidationException;请求 ID:K1F7N8KUCE2Q4QMBT8EF6CCM57VV4KQNSO5AEMVJF66Q9ASUAAJG;代理:null)
我的脚本如下:
import sys
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame
def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame:
for alias, frame in mapping.items():
frame.toDF().createOrReplaceTempView(alias)
result = spark.sql(query)
return DynamicFrame.fromDF(result, glueContext, transformation_ctx)
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Initialize the DynamoDB client
dynamodb_client = boto3.client('dynamodb')
# Define the DynamoDB table name
dynamodb_table_name = "TABLE_NAME"
# Script generated for node Amazon S3
AmazonS3_node1712735773212 = glueContext.create_dynamic_frame.from_options(
format_options={},
connection_type="s3",
format="parquet",
connection_options={"paths": ["S3_PATH"], "recurse": True,
"exclusions" : ["SPECIFIC_S3_PATH"]},
transformation_ctx="AmazonS3_node1712735773212")
# Create the DynamoDB table if it doesn't exist
try:
dynamodb_client.create_table(
TableName=dynamodb_table_name,
KeySchema=[
{'AttributeName': 'ATT1', 'KeyType': 'HASH'}, # Assuming artist_dmid is the partition key
{'AttributeName': 'ATT2', 'KeyType': 'RANGE'}
],
AttributeDefinitions=[
{'AttributeName': 'ATT1', 'AttributeType': 'N'},
{'AttributeName': 'ATT2', 'AttributeType': 'N'},
],
BillingMode= 'PAY_PER_REQUEST'
)
print(f"DynamoDB table '{dynamodb_table_name}' created successfully.")
except dynamodb_client.exceptions.ResourceInUseException:
print(f"DynamoDB table '{dynamodb_table_name}' already exists.")
# Script generated for node SQL Query
SqlQuery0 = '''
select DISTINCT ATT1,ATT2,ATT3,ATT4 from myDataSource
'''
SQLQuery_node1712736016679 = sparkSqlQuery(glueContext, query = SqlQuery0, mapping = {"myDataSource":AmazonS3_node1712735773212}, transformation_ctx = "SQLQuery_node1712736016679")
# SelectFields_node1711627341244 = SelectFields.apply(
# frame=AmazonS3_node1712735773212,
# paths=[
# "ATT1",
# "ATT2",
# "ATT3",
# "ATT4",
# ],
# transformation_ctx="SelectFields_node1711627341244",
# )
glueContext.write_dynamic_frame_from_options(
frame=SQLQuery_node1712736016679 ,
connection_type="dynamodb",
connection_options={ "dynamodb.region": "us-east-1","dynamodb.output.tableName": dynamodb_table_name,
"dynamodb.throughput.write.percent": "1.0"
}
)
job.commit()
在某些情况下,在将这些项目添加到 DDB 之前,如何更新上述脚本以过滤掉列表中的重复项目?
您已经有了
SELECT DISTINCT
查询,但缺少的重要部分是它必须仅基于表键,您包括第三个和第四个属性。