我正在使用春季云流+兔子mq绑定器。
在我的[[@StreaListener]中]我想使用RetryTemplate在特定的异常上应用重试逻辑。重试用尽或抛出不可重试的错误后,我想添加一个恢复回调,该回调将一条带有错误消息的新记录保存到我的Postgres DB中,并完成该消息(移至下一条)。这是我到目前为止所得到的: @StreamListener(Sink.INPUT)
public void saveUser(User user) {
User user = userService.saveUser(user); //could throw exceptions
log.info(">>>>>>User is created successfully: {}", user);
}
@StreamRetryTemplate
public RetryTemplate myRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(new ExponentialBackOffPolicy());
Map<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>();
retryableExceptions.put(ConnectionException.class, true);
retryTemplate.registerListener(new RetryListener() {
@Override
public <T, E extends Throwable> boolean open(RetryContext context,
RetryCallback<T, E> callback) {
return true;
}
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
Throwable throwable) {
//could add recovery logic here, like save error to db why sertain user was not saved
log.info("retries exausted");
}
@Override
public <T, E extends Throwable> void onError(RetryContext context,
RetryCallback<T, E> callback, Throwable throwable) {
log.error("Error on retry", throwable);
}
});
retryTemplate.setRetryPolicy(
new SimpleRetryPolicy(properties.getRetriesCount(), retryableExceptions, true));
return retryTemplate;
}
从属性,我只有这些(没有任何dlq配置)
spring.cloud.stream.bindings.input.destination = user-topic
spring.cloud.stream.bindings.input.group = user-consumer
重试用尽后,我得到此日志。
2020-06-01 20:05:58.674 INFO 18524 --- [idge-consumer-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:56722]
2020-06-01 20:05:58.685 INFO 18524 --- [idge-consumer-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory.publisher#319c51b0:0/SimpleConnection@2a060201 [delegate=amqp://[email protected]:56722/, localPort= 50728]
2020-06-01 20:05:58.697 INFO 18524 --- [idge-consumer-1] c.e.i.o.b.c.RetryConfiguration : retry finish
2020-06-01 20:05:58.702 ERROR 18524 --- [127.0.0.1:56722] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DLX' in vhost '/', class-id=60, method-id=40)
在触发RetryListener close方法之后,我可以看到侦听器尝试连接到DLX可能发布了错误消息。而且我不希望它这样做并且每次都在日志中观察到此错误消息。
所以我的问题是:
1)在哪里为我的retryTemplate添加RecoveryCalback?大概我可以在RetryListener#close方法中将保存错误的恢复逻辑写入db,但是肯定应该有更合适的方法来做到这一点。
2)如何配置Rabbit-MQ活页夹不向DLQ发送消息,也许我可以重写某些方法?当前,重试用尽(或出现不可重试的错误)之后,侦听器将尝试向DLX发送消息并记录找不到该错误的错误。我不需要将任何消息发送到应用程序范围内的dlq,只需要将其保存到DB。
我正在使用Spring Cloud Stream + Rabbit MQ活页夹。在我的@StreaListener中,我想使用RetryTemplate在特定异常上应用重试逻辑。重试用尽或无法重试后,错误是...