毫无疑问,剥猫皮的方法有很多种。但通常的一个比其他的好。
如果我有一个由规则捕获并发送到 lambda 函数进行处理的数据流。 lambda 函数转换数据。之后的数据必须是:
我的问题涉及第二个要求——如何最有效地实现这一点。让我分享我的想法,然后问 2 个问题...
将数据保存到发电机中,并设置输出流,以便所有新数据进入运动流/Firehose。有没有一种方法可以配置此 Firehose 以发送到其他帐户中的多个 s3 目的地,或者是否有必要创建多个 kinesis 流 - 每个最终端点一个?这里的 lambda 是附加到 Kinesis 实例的数据处理类型。
我阅读了一些有关 SiteWise 的内容。看起来它非常适合这个,但我使用它的经验为零。我添加了一些关于关注成本的想法 - 忽略那些东西!
概念 1 的变体 - 我们是否可以使用简单的运动流,并使用附加的数据处理 lambda 将输出分散到多个 SQS 实例,从而将数据直接推送到第 3 方帐户 S3?
我只提供这 3 个选项作为起点。我确信存在无限的可能性,更有经验的建筑师可以轻松找到更好的选择。所以 ! -
Q1)“最佳”选择是什么样的,考虑到:
Q2)数据交叉到另一个 AWS 账户的方面提出了明显的安全和身份验证问题。如何使用 IAM 巧妙地处理这个问题? FWIW,我在 cdk 中编码,但为了增加我的问题的未来相关性,最好更普遍地构建答案。
谢谢大家。
我会自己回答这个问题,花了一些时间并达到了可接受的结果。
简而言之,架构如下所示。事实证明,最简单的解决方案是最好的。
Sitewise 解决方案在纸面上会更便宜,但 CDK 尚不支持它 - 即使作为 alpha 或 beta 构造也不支持。
虽然 CDK 对 Firehose 的支持也不是很好,但至少有一个可以使用的构造。
其中最复杂的部分是 IAM 权限。这可以总结为以下两块代码:
// firehose permissions
lambdaDataIot.addToRolePolicy(new iam.PolicyStatement({ // data firehose streams
actions: [
"firehose:DeleteDeliveryStream",
"firehose:PutRecord",
"firehose:PutRecordBatch",
"firehose:UpdateDestination"
],
resources: firehoseCustomerData.map(obj => obj.attrArn), // [firehose[0].attrArn, firehose[1].attrArn, ... ]
effect: Effect.ALLOW
}));
对于实际的 Firehose 权限:
/* options =
{
roleName: string,
streamConfig: [
{
name: string,
id: number,
bucketArn: string,
email: string
}
]
}
*/
private buildFirehoseRole(options?: any): iam.Role {
// Create IAM Role for Data Firehose
const firehoseRole = new iam.Role(this, options.roleName, {
assumedBy: new iam.ServicePrincipal('firehose.amazonaws.com'),
roleName: options.roleName,
});
// Array of Resources so it can handle an array of streams
let policyResourcesBucketArns = [];
let policyResourcesLogStreams = [];
let policyResourcesKinesisStreams = [];
let policyResourcesRoles = [];
for (let index = 0; index < options.streamConfig.length; index++) {
const streamConfig = options.streamConfig[index];
// Array of arn & arn/* for s3 permissions
policyResourcesBucketArns.push(streamConfig.bucketArn);
policyResourcesBucketArns.push(`${streamConfig.bucketArn}/*`);
// Array of resources for logging permissions
policyResourcesLogStreams.push(`arn:aws:logs:::log-group:firehose-${streamConfig.name}:log-stream:firehose-${streamConfig.name}`);
// Array of resources for kinesis permissions
policyResourcesKinesisStreams.push(`arn:aws:kinesis:::stream/${streamConfig.name}`);
// Array of Roles for Policy Statement permissions
policyResourcesRoles.push(`arn:aws:iam::${streamConfig.id}:role/firehose_role`);
}
firehoseRole.addToPolicy(
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
resources: policyResourcesBucketArns,
actions: [
"s3:AbortMultipartUpload",
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:PutObject",
"s3:PutObjectAcl"
],
})
);
firehoseRole.addToPolicy(
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
resources: policyResourcesLogStreams,
actions: ["logs:PutLogEvents"]
})
);
firehoseRole.addToPolicy(
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
"kinesis:DescribeStream",
"kinesis:DescribeStreamSummary",
"kinesis:GetRecords",
"kinesis:GetShardIterator",
"kinesis:ListShards",
"kinesis:ListStreams",
"kinesis:SubscribeToShard",
],
resources: policyResourcesKinesisStreams
})
);
firehoseRole.addToPolicy(
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: ["sts:AssumeRole"],
resources: policyResourcesRoles // example for test account "firehose_role"
})
);
return firehoseRole;
}