我尝试使用用户 A 承担角色 R1,并使用 R1 在不同的帐户中承担 R2。我已经创建了一个 StsClient 来获取我需要的 accessKeyId、secretKeyId 和 sessionToken,但是根据 Flink 的文档,我无法提供此属性,这导致我收到以下错误:
Caused by: org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.AmazonKinesisException: The security token included in the request is invalid.
这是我的代码:
public class TestKinesisConnection {
public static void main(String[] args) throws Exception {
// AWS Code
StsClient stsClient = StsClient.builder()
.region(Region.US_WEST_1)
.credentialsProvider(ProfileCredentialsProvider.create("someProfile"))
.build();
AssumeRoleResponse assumeRoleResponse = stsClient.assumeRole(
AssumeRoleRequest.builder()
.roleArn("arn:aws:iam::1234567890:role/R1")
.roleSessionName("AssumeRole")
.build()
);
stsClient = StsClient.builder()
.region(Region.US_WEST_1)
.credentialsProvider(StaticCredentialsProvider.create(AwsSessionCredentials.create(
assumeRoleResponse.credentials().accessKeyId(),
assumeRoleResponse.credentials().secretAccessKey(),
assumeRoleResponse.credentials().sessionToken()
))).build();
assumeRoleResponse = stsClient.assumeRole(
AssumeRoleRequest.builder()
.roleArn("arn:aws:iam::09876543210:role/R2")
.roleSessionName("CrossAccount")
.build()
);
// Flink Code
Properties consumerConfig = new Properties();
consumerConfig.setProperty(AWSConfigConstants.AWS_REGION, Region.US_WEST_1);
consumerConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, assumeRoleResponse.credentials().accessKeyId());
consumerConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, assumeRoleResponse.credentials().secretAccessKey());
consumerConfig.setProperty(AWSConfigConstants.AWS_ROLE_SESSION_NAME, "CrossAccount");
consumerConfig.setProperty(AWSConfigConstants.AWS_ROLE_ARN, "arn:aws:iam::09876543210:role/R2");
consumerConfig.setProperty("flink.stream.initpos", "TRIM_HORIZON");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKinesisConsumer<String> kinesis = new FlinkKinesisConsumer<>(
"TestFlinkConsumerStream", new SimpleStringSchema(), consumerConfig);
env.addSource(kinesis).addSink(new PrintSinkFunction<>());
env.execute("TestFlinkConsumer");
}
}
我希望程序在成功承担 R2 角色后输出另一个帐户中 Kinesis 流的内容。
你成功了吗?我也面临同样的问题