我的应用程序从 AWS SQS 读取消息并处理它们。这是 sqs 适配器的配置:
@Configuration
@Slf4j
public class ApplicationConfig {
@Bean
public MessageChannel targetChannel() {
// return new ExecutorChannel(someTaskExecutor);
return new DirectChannel();
}
@Bean
@Primary
public AmazonSQSAsync amazonSQSAsync() {
return AmazonSQSAsyncClientBuilder.defaultClient();
}
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);
executor.setMaxPoolSize(30);
executor.setQueueCapacity(50);
executor.initialize();
return executor;
}
@Bean
public SqsMessageDrivenChannelAdapter sqsMDCA(
AmazonSQSAsync amazonSQSAsync,
MessageChannel targetChannel,
ThreadPoolTaskExecutor taskExecutor) {
SqsMessageDrivenChannelAdapter adapter =
new SqsMessageDrivenChannelAdapter(amazonSQSAsync, "sqs-queue-name");
adapter.setOutputChannel(targetChannel);
adapter.setMaxNumberOfMessages(10);
adapter.setTaskExecutor(taskExecutor);
return adapter;
}
}
在这种情况下 DirectChannel 会成为瓶颈吗?
您已经使用了
TaskExecutor
作为 SqsMessageDrivenChannelAdapter
,因此所有处理都是异步的。因此,当您使用 DirectChannel
下游时,仅当消息处理时间太长时,瓶子才会在那里。但是,您需要考虑这是否真的是您想要的:将工作转移到另一个线程,如果失败,您可能会丢失一条消息。当处理消息的线程将其控制权返回给 SQS 客户端时,该消息将被确认。因此,如果下游发生故障,该消息将会丢失。让持久队列耗尽性能并不总是好的,但随后由于转移到应用程序中的不同线程,您的数据会丢失。
DirectChannel
不能成为瓶颈 - 订阅该瓶颈的处理程序可能会成为瓶颈。但这已经超出了DirectChannel
逻辑。它只是在向该频道发送消息的线程中直接调用您的订阅者。从技术上讲,与您从当前方法调用另一个 Java 方法相同。