我的应用程序正在创建多线程来读取来自 SQS 的消息
new Thread(() -> {
while (true) {
readMessages();
}
});
readMessage方法有这样的功能
public void readMessages() {
........
Object messages = new ArrayList();
try {
slingshotMessage = (new ReceiveMessageRequest()).withQueueUrl(this.queueUrl)
.withWaitTimeSeconds(this.subProps.getWaitTimeSeconds())
.withVisibilityTimeout(this.subProps.getVisibilityTimeoutSeconds())
.withMaxNumberOfMessages(this.subProps.getMaxNumberMessages());
messages = this.sqs.receiveMessage(slingshotMessage).getMessages();
} catch (Exception var6) {
log.error("An error occurred while reading messages for subscriber: '" + this.subProps.getSubscriberName() + "' queueUrl: '" + this.queueUrl + "'", var6);
}
}
我看到在 read message() 函数内的
this.sqs.receiveMessage
处引发了异常。我在这里捕获了异常,但是当抛出错误时,日志会堆积起来,并显示重复的异常:
java.lang.IllegalStateException: Connection pool shut down
at org.apache.http.util.Asserts.check(Asserts.java:34) ~[httpcore-4.4.12.jar:4.4.12]
at org.apache.http.pool.AbstractConnPool.lease(AbstractConnPool.java:196) ~[httpcore-4.4.12.jar:4.4.12]
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.requestConnection(PoolingHttpClientConnectionManager.java:268) ~[httpclient-4.5.10.jar:4.5.10]
at jdk.internal.reflect.GeneratedMethodAccessor14.invoke(Unknown Source) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76) ~[aws-java-sdk-core-1.11.408.jar:na]
at com.amazonaws.http.conn.$Proxy37.requestConnection(Unknown Source) ~[na:na]
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:176) ~[httpclient-4.5.10.jar:4.5.10]
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) ~[httpclient-4.5.10.jar:4.5.10]
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) ~[httpclient-4.5.10.jar:4.5.10]
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) ~[httpclient-4.5.10.jar:4.5.10]
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) ~[httpclient-4.5.10.jar:4.5.10]
at com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72) ~[aws-java-sdk-core-1.11.408.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1256) ~[aws-java-sdk-core-1.11.408.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1072) ~[aws-java-sdk-core-1.11.408.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:745) ~[aws-java-sdk-core-1.11.408.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:719) ~[aws-java-sdk-core-1.11.408.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:701) ~[aws-java-sdk-core-1.11.408.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:669) ~[aws-java-sdk-core-1.11.408.jar:na]
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:651) ~[aws-java-sdk-core-1.11.408.jar:na]
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:515) ~[aws-java-sdk-core-1.11.408.jar:na]
at com.amazonaws.services.sqs.AmazonSQSClient.doInvoke(AmazonSQSClient.java:2147) ~[aws-java-sdk-sqs-1.11.415.jar:na]
at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2116) ~[aws-java-sdk-sqs-1.11.415.jar:na]
at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2105) ~[aws-java-sdk-sqs-1.11.415.jar:na]
at com.amazonaws.services.sqs.AmazonSQSClient.executeReceiveMessage(AmazonSQSClient.java:1559) ~[aws-java-sdk-sqs-1.11.415.jar:na]
at com.amazonaws.services.sqs.AmazonSQSClient.receiveMessage(AmazonSQSClient.java:1530) ~[aws-java-sdk-sqs-1.11.415.jar:na]
at com.lmig.global.reuse.slingshot.subscriber.aws.AwsSubscriber.readMessages(AwsSubscriber.java:63) ~[slingshot-1.3.37-B3-RELEASE.jar:1.3.37-B3-RELEASE]
at com.lmig.global.reuse.slingshot.subscriber.Subscriber.lambda$newSubThread$0(Subscriber.java:61) ~[slingshot-1.3.37-B3-RELEASE.jar:1.3.37-B3-RELEASE]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]```
所以我的问题是
首先,不要为每条消息创建新线程,这是个坏主意。 创建一个线程,不断轮询您的 SQS,然后使用 Workstealing 线程池或其他一些池将检索到的数据发送到另一个线程以进行进一步处理。
这是伪代码,
public final class AsyncThreadUtils {
private static final ExecutorService WORKER_SERVICE = Executors.newWorkStealingPool();
private AsyncThreadUtils() {
}
public static void addTask(Runnable asyncThread) {
WORKER_SERVICE.submit(asyncThread);
}
public static void stopAllThreads() throws InterruptedException {
if (!WORKER_SERVICE.isShutdown() || !WORKER_SERVICE.isTerminated()) {
WORKER_SERVICE.awaitTermination(0, TimeUnit.NANOSECONDS);
WORKER_SERVICE.shutdownNow();
}
}
}
class DataConsumer implements Runnable {
while(true) {
// Get the message from SQS
// If data received then give it to another thread for processing
AsyncThreadUtils.addTask(data -> {
// Process goes here.
});
// sleep for some time like 400ms
}
}
// Initiate above thread during the bootup of your program.
Thread sqsDataConsumer = new Thread(new DataConsumer())
sqsDataConsumer.start();
在关闭java进程之前调用线程池的shutdown方法。