我正在尝试使用AWS管道将CSV数据从S3存储桶传输到DynamoDB,以下是我的管道线脚本,它无法正常工作,
CSV文件结构
Name, Designation,Company
A,TL,C1
B,Prog, C2
DynamoDb:N_Table,名称为哈希值
{
"objects": [
{
"id": "Default",
"scheduleType": "cron",
"name": "Default",
"role": "DataPipelineDefaultRole",
"resourceRole": "DataPipelineDefaultResourceRole"
},
{
"id": "DynamoDBDataNodeId635",
"schedule": {
"ref": "ScheduleId639"
},
"tableName": "N_Table",
"name": "MyDynamoDBData",
"type": "DynamoDBDataNode"
},
{
"emrLogUri": "s3://onlycsv/error",
"id": "EmrClusterId636",
"schedule": {
"ref": "ScheduleId639"
},
"masterInstanceType": "m1.small",
"coreInstanceType": "m1.xlarge",
"enableDebugging": "true",
"installHive": "latest",
"name": "ImportCluster",
"coreInstanceCount": "1",
"logUri": "s3://onlycsv/error1",
"type": "EmrCluster"
},
{
"id": "S3DataNodeId643",
"schedule": {
"ref": "ScheduleId639"
},
"directoryPath": "s3://onlycsv/data.csv",
"name": "MyS3Data",
"dataFormat": {
"ref": "DataFormatId1"
},
"type": "S3DataNode"
},
{
"id": "ScheduleId639",
"startDateTime": "2013-08-03T00:00:00",
"name": "ImportSchedule",
"period": "1 Hours",
"type": "Schedule",
"endDateTime": "2013-08-04T00:00:00"
},
{
"id": "EmrActivityId637",
"input": {
"ref": "S3DataNodeId643"
},
"schedule": {
"ref": "ScheduleId639"
},
"name": "MyImportJob",
"runsOn": {
"ref": "EmrClusterId636"
},
"maximumRetries": "0",
"myDynamoDBWriteThroughputRatio": "0.25",
"attemptTimeout": "24 hours",
"type": "EmrActivity",
"output": {
"ref": "DynamoDBDataNodeId635"
},
"step": "s3://elasticmapreduce/libs/script-runner/script-runner.jar,s3://elasticmapreduce/libs/hive/hive-script,--run-hive-script,--hive-versions,latest,--args,-f,s3://elasticmapreduce/libs/hive/dynamodb/importDynamoDBTableFromS3,-d,DYNAMODB_OUTPUT_TABLE=#{output.tableName},-d,S3_INPUT_BUCKET=#{input.directoryPath},-d,DYNAMODB_WRITE_PERCENT=#{myDynamoDBWriteThroughputRatio},-d,DYNAMODB_ENDPOINT=dynamodb.us-east-1.amazonaws.com"
},
{
"id": "DataFormatId1",
"name": "DefaultDataFormat1",
"column": [
"Name",
"Designation",
"Company"
],
"columnSeparator": ",",
"recordSeparator": "\n",
"type": "Custom"
}
]
}
在执行管道时,有四个步骤完成,但是它没有完全执行
目前(2015-04)默认导入管道模板不支持导入CSV文件。
如果您的CSV文件不是太大(大小不超过1GB),您可以创建一个ShellCommandActivity来首先将CSV转换为DynamoDB JSON格式,将源代码转换为将生成的JSON文件导入到表中的EmrActivity。
作为第一步,您可以创建示例DynamoDB表,包括您需要的所有字段类型,填充虚拟值,然后使用管道导出记录(DynamoDB控制台中的导出/导入按钮)。这将为您提供有关Import管道所期望的格式的想法。类型名称不明显,导入活动对正确的情况非常敏感(例如,你应该为布尔字段设置bOOL)。
之后,应该很容易创建一个awk脚本(或任何其他文本转换器,至少使用awk,您可以使用默认的AMI映像进行shell活动),您可以将其提供给shellCommandActivity。不要忘记启用“staging”标志,这样您的输出就会上传回S3,以便Import活动将其拾取。
如果您使用模板数据管道将数据从S3导入DynamoDB,则这些数据格式将不起作用。而是使用下面链接中的格式来存储输入S3数据文件http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-importexport-ddb-pipelinejson-verifydata2.html
这种格式的输出文件由模板数据管道生成,该数据管道将数据从DynamoDB导出到S3。
希望有所帮助。
我建议使用datapipeline提供的CSV数据格式而不是自定义。
要调试群集上的错误,可以在EMR控制台中查找作业流,并查看失败任务的日志文件。
请参阅下面的链接,了解有效的解决方案(在问题部分),尽管是EMR 3.x.只需将分隔符更改为"columnSeparator": ","
。就个人而言,除非您确定数据已正确清理,否则我不会执行CSV。
How to upgrade Data Pipeline definition from EMR 3.x to 4.x/5.x?