我在使用以下技术时遇到了问题。
显然,还有一连串其他事情。我可以根据要求提供 POM,但我认为这没有帮助。我的 postgres 驱动程序是 UTD,我的所有版本也是如此。
核心问题是,当高速运行并发操作时,当我尝试将项目保存到数据库时,我遇到了 HikariCp 的 IO 问题。我不完全确定该怎么做。
在我的微服务日志中:
2024-09-19T18:50:51.617-04:00 WARN 10060 --- [virtual-ingest] [ virtual-342] com.zaxxer.hikari.pool.ProxyConnection : HikariPool-1 - Connection org.postgresql.jdbc.PgConnection@28b26c3f marked as broken because of SQLSTATE(08006), ErrorCode(0)
org.postgresql.util.PSQLException: An I/O error occurred while sending to the backend.
**在 PG 的 Docker 日志中(不断重复): **
2024-09-19 22:50:51.807 UTC [412] LOG: unexpected EOF on client connection with an open transaction
我的代码(我还没有回去清理它):
@EventListener(ApplicationReadyEvent.class)
public void startStructures()
{
try (var outerScope = new StructuredTaskScope.ShutdownOnFailure())
{
sqsConfig.getQueues().forEach(queue -> outerScope.fork(() -> {
processQueueMessages(queue);
return null;
}));
outerScope.join();
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
log.error("Processing interrupted", e);
}
}
private void processQueueMessages(String queue)
{
while (true)
{
try (var innerScope = new StructuredTaskScope.ShutdownOnFailure())
{
List<Message> messages = retrieveMessages(queue);
if (messages.isEmpty())
{
log.info("No messages in queue: {}, sleeping...", queue);
Thread.sleep(5000);
}
else
{
for (Message message : messages)
{
innerScope.fork(() -> {
processMessage(queue, message);
return null;
});
}
innerScope.join();
}
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
log.error("Queue processing interrupted", e);
break;
}
}
}
public List<Message> retrieveMessages(String queue)
{
var url = awsConfig.getBaseUrl() + queue;
ReceiveMessageRequest request = ReceiveMessageRequest.builder()
.queueUrl(url)
.maxNumberOfMessages(10)
.waitTimeSeconds(
10)
.build();
ReceiveMessageResponse response = sqsClient.receiveMessage(request);
return response.messages();
}
private void processMessage(String queue, Message message) throws JsonProcessingException
{
log.info("Processing message from queue {}: {}", queue, message.messageId());
JsonNode node = nodeBuilderService.buildNode(message);
distributionService.handleSqsNotification(node);
deleteMessage(queue, message);
}
private void deleteMessage(String queue, Message message)
{
var url = awsConfig.getBaseUrl() + queue;
sqsClient.deleteMessage(builder -> builder.queueUrl(url).receiptHandle(message.receiptHandle()));
int remainingMessages = getMessageCount(queue);
log.info(
"Deleted message: {}. Approximate {} messages remaining in the queue.", message.messageId(),
remainingMessages
);
}
public int getMessageCount(String queueName)
{
var url = awsConfig.getBaseUrl() + queueName;
GetQueueAttributesRequest request = GetQueueAttributesRequest.builder()
.queueUrl(url)
.attributeNames(
QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES)
.build();
return Integer.parseInt(
sqsClient.getQueueAttributes(request)
.attributes()
.get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES)
);
}
在处理消息的过程中,我的代码最终命中了这个方法:
@Transactional
public void process(String filename, InputStream stream)
throws IOException
{
byte[] bytes = getBytesFromInputStream(stream, filename);
List<SurfaceObservation> observations = decoder.beginSynopticDecoders(bytes, filename);
repository.saveAllAndFlush(observations);
}
这就是错误发生的地方。
我曾尝试将其强制到平台线程上,但我相信这与 HikariCP 缺乏对虚拟线程的支持有关,即使在当前版本中也是如此。我在我的瞄准镜上观看了 JEP 咖啡馆,我想知道是否有人可以帮助我回答几个问题。
我尝试了平台线程,我尝试了仅用于数据库事务的范围。我花了几个小时尝试了很多东西,现在我什至不确定我尝试了什么。我什至尝试让 chatGPT 帮助我,正如预期的那样,它不能帮助我。
我将其标记为已回答,因为在实现 Redis 缓存后我不再遇到该问题,但我确实相信 Hikari 在处理虚拟线程利用、高并发应用程序方面存在问题,并将观察是否有更好的解决方案出现。