我的 Spring BFF 有一个功能,旨在使用“Scheduled”注释每隔 2 分钟定期清理 Redis 中的会话。
我读过的一个问题是,如果我将其放入 Docker 容器中并启动该容器的多个实例,那么该函数将在每个容器中运行,但这并不是真正必要的。我可以使用某种机制来防止这种情况发生吗?例如。什么是分配锁?有用吗?
Spring Redis 缓存会话清理功能
/**
* For cleanup operations (i.e. removing expired session from a ZSet (Sorted Sets) in Redis)
* Spring's scheduling mechanism will automatically call the cleanup method according to the schedule
* defined by the @Scheduled annotation.
*/
@Component
@EnableScheduling
internal class SessionEvicter(
private val redisOperations: ReactiveRedisOperations<String, String>,
springSessionProperties: SpringSessionProperties,
) {
private val logger = LoggerFactory.getLogger(SessionEvicter::class.java)
private val redisKeyLocation = springSessionProperties.redis?.expiredSessionsNamespace
?: "spring:session:sessions:expirations"
data class CleanupContext(
val now: Instant,
val pastFiveDays: Instant,
val range: Range<Double>,
val limit: Limit
)
// run every 120 seconds
@Scheduled(fixedRate = 120, timeUnit = TimeUnit.SECONDS)
fun cleanup(): Mono<Void> {
return Mono.fromCallable {
val now = Instant.now()
val pastFiveDays = now.minus(Duration.ofDays(5))
val range = Range.closed(
pastFiveDays.toEpochMilli().toDouble(),
now.toEpochMilli().toDouble()
)
val limit = Limit.limit().count(500)
CleanupContext(now, pastFiveDays, range, limit)
}
.doOnNext { context ->
logger.info("Scheduled cleanup execution started at ${Instant.now()}.")
logger.info("Current time (now): ${context.now}")
logger.info("Time range start: ${Date(context.pastFiveDays.toEpochMilli())}")
logger.info("Time range end: ${Date(context.now.toEpochMilli())}")
logger.info("Limit count: ${context.limit.count}")
logger.info("Redis key location: $redisKeyLocation")
}
.flatMap { context ->
val zSetOps = redisOperations.opsForZSet()
zSetOps.reverseRangeByScore(redisKeyLocation, context.range, context.limit)
.collectList()
.flatMap { sessionIdsList ->
if (sessionIdsList.isNotEmpty()) {
logger.info("Found ${sessionIdsList.size} sessions to remove.")
zSetOps.remove(
redisKeyLocation,
*sessionIdsList.toTypedArray()
).doOnSubscribe { logger.info("Started removal of sessions") }
.doOnSuccess { logger.info("Successfully removed sessions") }
.doOnError { e -> logger.error("Error during removal: ${e.message}") }
} else {
logger.info("No sessions found to remove.")
Mono.empty()
}
}
}
.doOnSuccess {
logger.info("Scheduled session cleanup check completed at ${Instant.now()}.")
}
.doOnError { e ->
logger.error("Error during session cleanup check: ${e.message}")
}
.then()
.doOnTerminate {
logger.info("Cleanup process terminated at ${Instant.now()}.")
}
.subscribeOn(Schedulers.boundedElastic()) // to ensure proper threading
}
}
根据您的项目和环境,有多种选项。实现此目的的一种方法是使用 Quartz 调度程序 https://www.quartz-scheduler.org/documentation/2.4.0-SNAPSHOT/introduction.html
或
您可以使用Redis分布式锁来达到同样的目的https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html
或
通过 Kubernetes 的领导者选举 API、Zookeeper 或 Consul 等工具使用领导者选举机制,其中一个实例成为领导者并运行计划任务。
示例代码 - 您可以将会话驱逐逻辑放入 try 块中
@Component
@EnableScheduling
public class SessionEvicter {
private final ReactiveRedisOperations<String, String> redisOperations;
private static final String LOCK_KEY = "session-cleanup-lock";
private static final Duration LOCK_EXPIRY = Duration.ofSeconds(120); // lock expiry time
public SessionEvicter(ReactiveRedisOperations<String, String> redisOperations) {
this.redisOperations = redisOperations;
}
@Scheduled(fixedRate = 120000)
public void cleanup() {
String lockValue = UUID.randomUUID().toString();
redisOperations.opsForValue()
.setIfAbsent(LOCK_KEY, lockValue, LOCK_EXPIRY)
.flatMap(acquired -> {
if (Boolean.TRUE.equals(acquired)) {
// Lock acquired, perform the cleanup task
return performCleanup()
.doFinally(signalType -> releaseLock(lockValue));
} else {
// Lock not acquired, skip cleanup
return Mono.empty();
}
})
.subscribe();
}
private Mono<Void> performCleanup() {
// Your existing cleanup code
// ...
return Mono.empty();
}
private Mono<Boolean> releaseLock(String lockValue) {
// Release the lock only if it was acquired by this instance
return redisOperations.opsForValue()
.get(LOCK_KEY)
.flatMap(currentValue -> {
if (lockValue.equals(currentValue)) {
return redisOperations.delete(LOCK_KEY);
}
return Mono.just(false);
});
}
}