用于将数据拆分和流式传输到多个第 3 方账户的 AWS 架构

问题描述 投票:0回答:1

毫无疑问,剥猫皮的方法有很多种。但通常的一个比其他的好。

如果我有一个由规则捕获并发送到 lambda 函数进行处理的数据流。 lambda 函数转换数据。之后的数据必须是:

  • 放入发电机数据库中。
  • 分为多个流并可供其他 AWS 账户使用。

我的问题涉及第二个要求——如何最有效地实现这一点。让我分享我的想法,然后问 2 个问题...

概念1:

将数据保存到发电机中,并设置输出流,以便所有新数据进入运动流/Firehose。有没有一种方法可以配置此 Firehose 以发送到其他帐户中的多个 s3 目的地,或者是否有必要创建多个 kinesis 流 - 每个最终端点一个?这里的 lambda 是附加到 Kinesis 实例的数据处理类型。

enter image description here

概念2:

我阅读了一些有关 SiteWise 的内容。看起来它非常适合这个,但我使用它的经验为零。我添加了一些关于关注成本的想法 - 忽略那些东西!

enter image description here

概念3:

概念 1 的变体 - 我们是否可以使用简单的运动流,并使用附加的数据处理 lambda 将输出分散到多个 SQS 实例,从而将数据直接推送到第 3 方帐户 S3?

enter image description here

问题:

我只提供这 3 个选项作为起点。我确信存在无限的可能性,更有经验的建筑师可以轻松找到更好的选择。所以 ! -

Q1)“最佳”选择是什么样的,考虑到:

  • 灵活性(如果我有 10 个目的地,其中一些仅接收 1% 的数据,而另一些则高达 50%,该怎么办)...
  • 成本(特别是低成本!)

Q2)数据交叉到另一个 AWS 账户的方面提出了明显的安全和身份验证问题。如何使用 IAM 巧妙地处理这个问题? FWIW,我在 cdk 中编码,但为了增加我的问题的未来相关性,最好更普遍地构建答案。

谢谢大家。

amazon-web-services aws-lambda amazon-sqs amazon-kinesis amazon-kinesis-firehose
1个回答
0
投票

我会自己回答这个问题,花了一些时间并达到了可接受的结果。

简而言之,架构如下所示。事实证明,最简单的解决方案是最好的。

enter image description here

Sitewise 解决方案在纸面上会更便宜,但 CDK 尚不支持它 - 即使作为 alpha 或 beta 构造也不支持。

虽然 CDK 对 Firehose 的支持也不是很好,但至少有一个可以使用的构造。

其中最复杂的部分是 IAM 权限。这可以总结为以下两块代码:

    // firehose permissions
    lambdaDataIot.addToRolePolicy(new iam.PolicyStatement({    // data firehose streams
      actions: [
            "firehose:DeleteDeliveryStream",
            "firehose:PutRecord",
            "firehose:PutRecordBatch",
            "firehose:UpdateDestination"
      ],
      resources: firehoseCustomerData.map(obj => obj.attrArn), // [firehose[0].attrArn, firehose[1].attrArn, ... ] 
      effect: Effect.ALLOW
    }));

对于实际的 Firehose 权限:

    /*  options = 
      {
        roleName: string,
        streamConfig: [
          { 
            name: string, 
            id: number, 
            bucketArn: string, 
            email: string
          }
        ]
      }
  */
  private buildFirehoseRole(options?: any): iam.Role {

    // Create IAM Role for Data Firehose
    const firehoseRole = new iam.Role(this, options.roleName, {
      assumedBy: new iam.ServicePrincipal('firehose.amazonaws.com'),
      roleName: options.roleName,
    });
      
    // Array of Resources so it can handle an array of streams
    let policyResourcesBucketArns     = [];
    let policyResourcesLogStreams     = [];
    let policyResourcesKinesisStreams = [];
    let policyResourcesRoles          = [];
    

    for (let index = 0; index < options.streamConfig.length; index++) {
      const streamConfig = options.streamConfig[index];

      // Array of arn & arn/* for s3 permissions
      policyResourcesBucketArns.push(streamConfig.bucketArn);
      policyResourcesBucketArns.push(`${streamConfig.bucketArn}/*`);

      // Array of resources for logging permissions
      policyResourcesLogStreams.push(`arn:aws:logs:::log-group:firehose-${streamConfig.name}:log-stream:firehose-${streamConfig.name}`);

      // Array of resources for kinesis permissions
      policyResourcesKinesisStreams.push(`arn:aws:kinesis:::stream/${streamConfig.name}`);

      // Array of Roles for Policy Statement permissions
      policyResourcesRoles.push(`arn:aws:iam::${streamConfig.id}:role/firehose_role`);
    }



    firehoseRole.addToPolicy(
      new iam.PolicyStatement({
        effect: iam.Effect.ALLOW,
        resources: policyResourcesBucketArns,
        actions: [
          "s3:AbortMultipartUpload",
          "s3:GetBucketLocation",
          "s3:GetObject",
          "s3:ListBucket",
          "s3:ListBucketMultipartUploads",
          "s3:PutObject",
          "s3:PutObjectAcl"
        ],
      })
    );


    firehoseRole.addToPolicy(
      new iam.PolicyStatement({
        effect: iam.Effect.ALLOW,
        resources: policyResourcesLogStreams,
        actions:   ["logs:PutLogEvents"]
      })
    );


    firehoseRole.addToPolicy(
      new iam.PolicyStatement({
        effect: iam.Effect.ALLOW,
        actions: [
          "kinesis:DescribeStream",
          "kinesis:DescribeStreamSummary",
          "kinesis:GetRecords",
          "kinesis:GetShardIterator",
          "kinesis:ListShards",
          "kinesis:ListStreams",
          "kinesis:SubscribeToShard",
        ],
        resources: policyResourcesKinesisStreams
      })
    );


    firehoseRole.addToPolicy(
      new iam.PolicyStatement({
        effect: iam.Effect.ALLOW,
        actions: ["sts:AssumeRole"],
        resources: policyResourcesRoles // example for test account "firehose_role"
      })
    );

    return firehoseRole;
  }
© www.soinside.com 2019 - 2024. All rights reserved.