我有一个 Spring Boot 应用程序,它使用 Amazon Kinesis 来使用数据并将其保存到 PostgreSQL。 由于我已经在我的应用程序中使用一个数据库(PostgreSQL),我想避免仅出于检查点和锁定目的而使用另一个数据库(Dynamo db)。从而可以降低资源成本。
期望:我想将默认的 dynamo 数据库更改为 PostgreSQL 数据库以进行检查点和锁定。
在我的项目实现中使用以下依赖项 'org.springframework.cloud:spring-cloud-stream-binder-kinesis:4.0.2'
我的application.yml文件
spring:
cloud:
aws:
credentials:
sts:
web-identity-token-file: <Where i had given the token file path>
role-arn: <Where i had given the assume role arn>
role-session-name: RoleSessionName
region:
static: <where i had given my aws region>
dualstack-enabled: false
stream:
kinesis:
binder:
auto-create-stream: false
min-shard-count: 1
bindings:
input-in-0:
destination: test-test.tst.v1
content-type: text/json
下面是包含用于处理 Kinesis 数据的 bean 的 java 类
@Configuration
public class KinesisConsumerBinder{
@Bean
public Consumer<Message<String>> input(){
return message ->{
System.out.println("Data from Kinesis:"+message.getPayload());
//Process the message got from Kinesis
}
}
}
我尝试了很多方法,但未能找到正确的解决方案
我也尝试过以下步骤
为表“KinesisCheckpoint”创建了一个表及其实体类,其中包含字段分片 ID、序列号和流名称
创建了 Spring Data JPA 存储库接口,用于处理检查点数据的 CRUD 操作。
更新了消费者代码如下,用于在成功处理一批记录后将检查点信息(如分片 ID、序列号和流名称)保存到上面创建的表:“KinesisCheckpoint”。
@Configuration
public class KinesisConsumerBinder{
@Bean
public Consumer<Message<String>> input(){
return message ->{
System.out.println("Data from Kinesis:"+message.getPayload());
String strmNme = (String) message.getHeader().get("aws_receivedStream");
String shrdId = (String) message.getHeader().get("aws_shard");
String seqNo = (String) message.getHeader().get("aws_shard");
String lstChckPntDta = checkPointRepo.findLastChckpoint(strmNme,shrdId);// query that written in repo for getting the last chek point info for the given stream name and shard id
//Process the message got from Kinesis
if(null == lstChckPntDta){
// save the new checkpoint info(shard Id, sequence number and stream name ) to "KinesisCheckpoint" table
}else{
// update the checkpoint data(sequence no) for the filter Stream name and shardId
}
}
}
}
作为最后一步,当应用程序启动/重新启动时,我从表“KinesisCheckpoint”中检索了保存的检查点信息,我想使用它从中断处恢复处理。
我们如何使用我在表“KinesisCheckpoint”中收集的信息来从中断处恢复处理。
上述方法不是更好的方法。是否有其他更好的方法来使用 PostgreSQL 数据库实现检查点和锁定。
有人可以帮我解决这个问题吗
KinesisMessageDrivenChannelAdapter
中的锁定逻辑基于LockRegistry
。检查点是基于 ConcurrentMetadataStore
的。
是的,默认情况下它们分别是 DynamoDbLockRegistry
和 DynamoDbMetadataStore
。我们在文档中没有说明这一点,但是这些 LockRegistry
和 ConcurrentMetadataStore
的自动配置 bean 可以从最终用户配置中覆盖。这样您就可以在 PostgreSQL 中使用基于 JdbcLockRegistry
的 JdbcMetadataStore
和 DataSource
。
您需要
schema-postgresql.sql
jar 中的 spring-integration-jdbc
才能正确初始化数据库。这样您就不需要您现在正在讨论的任何自定义逻辑。
在文档中查看更多信息:
https://docs.spring.io/spring-integration/reference/jdbc/lock-registry.html
https://docs.spring.io/spring-integration/reference/jdbc/metadata-store.html