我有一个
CacheConfig
,我已经在其中设置了ReactiveRedisConnectionFactory
,我可以连接到Redis。但是,当我尝试将任何内容保存到缓存中时,由于
Caused by: java.lang.IllegalArgumentException: DefaultSerializer requires a Serializable payload but received an object of type [reactor.core.publisher.MonoFlatMapMany]
at org.springframework.core.serializer.DefaultSerializer.serialize(DefaultSerializer.java:43) ~[spring-core-6.0.7.jar:6.0.7]
at org.springframework.core.serializer.Serializer.serializeToByteArray(Serializer.java:56) ~[spring-core-6.0.7.jar:6.0.7]
at org.springframework.core.serializer.support.SerializingConverter.convert(SerializingConverter.java:60) ~[spring-core-6.0.7.jar:6.0.7]
我认为问题是我无法序列化 Flux/Mono 对象,所以我尝试编写一个自定义序列化程序来自己手动处理这个问题,但我认为如果不调用我就无法做到这一点
.block()
在 Flux
对象上,这样我就可以将它们的内容放入 byte[]
.
我的缓存配置:
@EnableCaching
@Configuration
@RequiredArgsConstructor
public class CacheConfig implements CachingConfigurer {
private final ObjectMapper objectMapper;
@Bean
public LettuceConnectionFactory redisConnectionFactory() {
LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder()
.commandTimeout(Duration.ofSeconds(2))
.shutdownTimeout(Duration.ZERO)
.build();
RedisStandaloneConfiguration serverConfig = new RedisStandaloneConfiguration("localhost", 6379);
return new LettuceConnectionFactory(serverConfig, clientConfig);
}
@Bean
public CacheManager cacheManager() {
RedisCacheConfiguration cacheConfig = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofMinutes(5))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new ReactiveRedisSerializer<>(objectMapper)));
return RedisCacheManager.builder(redisConnectionFactory())
.cacheDefaults(cacheConfig)
.build();
}
@Override
public CacheResolver cacheResolver() {
return new SimpleCacheResolver(cacheManager());
}
}
我为
ReactiveRedisSerializer
得到的代码是阻塞的,会抛出 ``
@Slf4j
@RequiredArgsConstructor
public class ReactiveRedisSerializer<T> implements RedisSerializer<T> {
private final ObjectMapper objectMapper;
@Override
@Override
public byte[] serialize(T t) throws SerializationException {
if (t == null) {
return null;
}
if (t instanceof Mono<?> mono) {
try {
return objectMapper.writeValueAsBytes(mono.toFuture().get());
} catch (JsonProcessingException | InterruptedException | ExecutionException e) {
throw new SerializationException("Failed to serialize Mono", e);
}
}
if (t instanceof Flux<?> flux) {
try {
List<?> list = flux.collectList().block(); // How to avoid doing this?
return objectMapper.writeValueAsBytes(list);
} catch (JsonProcessingException e) {
throw new SerializationException("Failed to serialize Flux", e);
}
}
try {
return objectMapper.writeValueAsString(t).getBytes(StandardCharsets.UTF_8);
} catch (JsonProcessingException e) {
throw new SerializationException("Failed to serialize object", e);
}
}
@Override
public T deserialize(byte[] bytes) throws SerializationException {
if (bytes == null) {
return null;
}
try {
return (T) objectMapper.readValue(bytes, Object.class);
} catch (Exception e) {
throw new SerializationException("Failed to deserialize object", e);
}
}
}
感觉这不是正确的方法,那么这里是否有受支持的“开箱即用”方法?