哦不! IO!虚拟线程、HikariCP 和 Spring Boot JDBC (Postgres) 的并发问题

问题描述 投票:0回答:1

我在使用以下技术时遇到了问题。

  • PostgreSQL 通过 Docker 容器运行
  • 织布机项目
  • Spring Boot 3.3.4
  • Java 21
  • Spring Boot Starter JDBC(其中包括 HikariCp - 我认为他是问题的根源)
  • Spring Boot 入门 JPA

显然,还有一连串其他事情。我可以根据要求提供 POM,但我认为这没有帮助。我的 postgres 驱动程序是 UTD,我的所有版本也是如此。

核心问题是,当高速运行并发操作时,当我尝试将项目保存到数据库时,我遇到了 HikariCp 的 IO 问题。我不完全确定该怎么做。

在我的微服务日志中:

2024-09-19T18:50:51.617-04:00  WARN 10060 --- [virtual-ingest] [    virtual-342] com.zaxxer.hikari.pool.ProxyConnection   : HikariPool-1 - Connection org.postgresql.jdbc.PgConnection@28b26c3f marked as broken because of SQLSTATE(08006), ErrorCode(0)

org.postgresql.util.PSQLException: An I/O error occurred while sending to the backend.

**在 PG 的 Docker 日志中(不断重复): **

2024-09-19 22:50:51.807 UTC [412] LOG:  unexpected EOF on client connection with an open transaction

我的代码(我还没有回去清理它):

@EventListener(ApplicationReadyEvent.class)
    public void startStructures()
    {
        try (var outerScope = new StructuredTaskScope.ShutdownOnFailure())
        {
            sqsConfig.getQueues().forEach(queue -> outerScope.fork(() -> {
                processQueueMessages(queue);
                return null;
            }));
            outerScope.join();
        }
        catch (InterruptedException e)
        {
            Thread.currentThread().interrupt();
            log.error("Processing interrupted", e);
        }
    }

    private void processQueueMessages(String queue)
    {
        while (true)
        {
            try (var innerScope = new StructuredTaskScope.ShutdownOnFailure())
            {
                List<Message> messages = retrieveMessages(queue);
                if (messages.isEmpty())
                {
                    log.info("No messages in queue: {}, sleeping...", queue);
                    Thread.sleep(5000);
                }
                else
                {
                    for (Message message : messages)
                    {
                        innerScope.fork(() -> {
                            processMessage(queue, message);
                            return null;
                        });
                    }
                    innerScope.join();
                }
            }
            catch (InterruptedException e)
            {
                Thread.currentThread().interrupt();
                log.error("Queue processing interrupted", e);
                break;
            }
        }
    }

    public List<Message> retrieveMessages(String queue)
    {
        var url = awsConfig.getBaseUrl() + queue;
        ReceiveMessageRequest request = ReceiveMessageRequest.builder()
                                                             .queueUrl(url)
                                                             .maxNumberOfMessages(10)
                                                             .waitTimeSeconds(
                                                                 10)
                                                             .build();

        ReceiveMessageResponse response = sqsClient.receiveMessage(request);
        return response.messages();
    }

    private void processMessage(String queue, Message message) throws JsonProcessingException
    {

        log.info("Processing message from queue {}: {}", queue, message.messageId());

        JsonNode node = nodeBuilderService.buildNode(message);
        distributionService.handleSqsNotification(node);
        deleteMessage(queue, message);
    }

    private void deleteMessage(String queue, Message message)
    {
        var url = awsConfig.getBaseUrl() + queue;
        sqsClient.deleteMessage(builder -> builder.queueUrl(url).receiptHandle(message.receiptHandle()));

        int remainingMessages = getMessageCount(queue);
        log.info(
            "Deleted message: {}. Approximate {} messages remaining in the queue.", message.messageId(),
            remainingMessages
        );
    }

    public int getMessageCount(String queueName)
    {
        var url = awsConfig.getBaseUrl() + queueName;

        GetQueueAttributesRequest request = GetQueueAttributesRequest.builder()
                                                                     .queueUrl(url)
                                                                     .attributeNames(
                                                                         QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES)
                                                                     .build();

        return Integer.parseInt(
            sqsClient.getQueueAttributes(request)
                     .attributes()
                     .get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES)
        );
    }

在处理消息的过程中,我的代码最终命中了这个方法:

@Transactional
    public void process(String filename, InputStream stream)
    throws IOException
    {
        byte[] bytes = getBytesFromInputStream(stream, filename);

        List<SurfaceObservation> observations = decoder.beginSynopticDecoders(bytes, filename);
        repository.saveAllAndFlush(observations);
    }

这就是错误发生的地方。

我曾尝试将其强制到平台线程上,但我相信这与 HikariCP 缺乏对虚拟线程的支持有关,即使在当前版本中也是如此。我在我的瞄准镜上观看了 JEP 咖啡馆,我想知道是否有人可以帮助我回答几个问题。

  1. 我做错了什么吗?我正在从异步模式迁移,并且不想再使用它们,所以我很可能不知道我应该做什么。
  2. 我想知道 ExtentLocal 是否可以提供帮助。我认为 Hikari 中的问题可能与他们使用 ThreadLocal 有关。但同样,这也是新东西。
  3. 我必须放弃 Spring jdbc 吗?我会用什么来代替?截至(我很确定)写这篇文章时,R2DBC 仍然没有一个好的 ORM 系统,即使使用 Hibernate 反应式也是如此。

我尝试了平台线程,我尝试了仅用于数据库事务的范围。我花了几个小时尝试了很多东西,现在我什至不确定我尝试了什么。我什至尝试让 chatGPT 帮助我,正如预期的那样,它不能帮助我。

java spring-boot jdbc hikaricp virtual-threads
1个回答
0
投票

我将其标记为已回答,因为在实现 Redis 缓存后我不再遇到该问题,但我确实相信 Hikari 在处理虚拟线程利用、高并发应用程序方面存在问题,并将观察是否有更好的解决方案出现。

© www.soinside.com 2019 - 2024. All rights reserved.