在 Java 中使用 FlinkKinesisConsumer 时如何提供会话令牌?

问题描述 投票:0回答:1

我尝试使用用户 A 承担角色 R1,并使用 R1 在不同的帐户中承担 R2。我已经创建了一个 StsClient 来获取我需要的 accessKeyId、secretKeyId 和 sessionToken,但是根据 Flink 的文档,我无法提供此属性,这导致我收到以下错误:

Caused by: org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.AmazonKinesisException: The security token included in the request is invalid.

这是我的代码:

public class TestKinesisConnection {
    public static void main(String[] args) throws Exception {
        // AWS Code
        StsClient stsClient = StsClient.builder()
                .region(Region.US_WEST_1)
                .credentialsProvider(ProfileCredentialsProvider.create("someProfile"))
                .build();
        AssumeRoleResponse assumeRoleResponse = stsClient.assumeRole(
                AssumeRoleRequest.builder()
                .roleArn("arn:aws:iam::1234567890:role/R1")
                .roleSessionName("AssumeRole")
                .build()
        );
        stsClient = StsClient.builder()
                .region(Region.US_WEST_1)
                .credentialsProvider(StaticCredentialsProvider.create(AwsSessionCredentials.create(
                        assumeRoleResponse.credentials().accessKeyId(),
                        assumeRoleResponse.credentials().secretAccessKey(),
                        assumeRoleResponse.credentials().sessionToken()
                ))).build();
        assumeRoleResponse = stsClient.assumeRole(
                AssumeRoleRequest.builder()
                        .roleArn("arn:aws:iam::09876543210:role/R2")
                        .roleSessionName("CrossAccount")
                        .build()
        );
        
        // Flink Code
        Properties consumerConfig = new Properties();

        consumerConfig.setProperty(AWSConfigConstants.AWS_REGION, Region.US_WEST_1);
        consumerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, assumeRoleResponse.credentials().accessKeyId());
        consumerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, assumeRoleResponse.credentials().secretAccessKey());
        consumerConfig.setProperty(AWSConfigConstants.AWS_ROLE_SESSION_NAME, "CrossAccount");
        consumerConfig.setProperty(AWSConfigConstants.AWS_ROLE_ARN, "arn:aws:iam::09876543210:role/R2");

        consumerConfig.setProperty("flink.stream.initpos", "TRIM_HORIZON");

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        FlinkKinesisConsumer<String> kinesis = new FlinkKinesisConsumer<>(
                "TestFlinkConsumerStream", new SimpleStringSchema(), consumerConfig);

        env.addSource(kinesis).addSink(new PrintSinkFunction<>());
        env.execute("TestFlinkConsumer");
    }
}

我希望程序在成功承担 R2 角色后输出另一个帐户中 Kinesis 流的内容。

java amazon-web-services apache-flink flink-streaming amazon-kinesis
1个回答
0
投票

你成功了吗?我也面临同样的问题

© www.soinside.com 2019 - 2024. All rights reserved.