尝试使用 Spring Integration 和 MySQL 实现发件箱模式时出现“超出锁定等待超时”

问题描述 投票:0回答:1

我正在尝试使用 Spring Integration 实现发件箱模式。我配置了以下 bean:

@Configuration
public class SpringIntegrationTestApplicationConfiguration {
    private static final Logger LOGGER = LoggerFactory.getLogger(SpringIntegrationTestApplicationConfiguration.class);

    public static final String CONCURRENT_METADATA_STORE_PREFIX = "_spring_integration_";

    @MessagingGateway
    public interface EmailGateway {
        @Gateway(requestChannel = "mailbox")
        void sendEmail(String mailBody,
                       @Header String target);
    }

    @Bean
    public JdbcChannelMessageStore messageStore(DataSource dataSource) {
        JdbcChannelMessageStore jdbcChannelMessageStore = new JdbcChannelMessageStore(dataSource);
        jdbcChannelMessageStore.setTablePrefix(CONCURRENT_METADATA_STORE_PREFIX);
        jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(
                new MySqlChannelMessageStoreQueryProvider());
        return jdbcChannelMessageStore;
    }

    @Bean
    ConcurrentMetadataStore concurrentMetadataStore(DataSource dataSource) {
        JdbcMetadataStore jdbcMetadataStore = new JdbcMetadataStore(dataSource);
        jdbcMetadataStore.setTablePrefix(CONCURRENT_METADATA_STORE_PREFIX);
        return jdbcMetadataStore;
    }


    @Bean
    MessageHandler sendEmailMessageHandler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                String target = (String) message.getHeaders().get("target");
                LOGGER.info("not sending email with body: {} {}", message, target);
                throw new RuntimeException("");
            }
        };
    }

    @Bean
    QueueChannel mailboxChannel(JdbcChannelMessageStore jdbcChannelMessageStore) {
        return MessageChannels.queue(jdbcChannelMessageStore, "mailbox").getObject();
    }

    @Bean
    public IntegrationFlow buildFlow(ChannelMessageStore channelMessageStore,
                                     MessageHandler sendEmailMessageHandler) {
        return IntegrationFlow.from("mailbox")
                              .routeToRecipients(routes -> {
                                  routes
                                          .transactional()
                                          .recipientFlow(flow -> flow
                                                  .channel(channels -> channels.queue(channelMessageStore, "outbox"))
                                                  .handle(sendEmailMessageHandler, e -> e.poller(poller -> poller.fixedDelay(1000).transactional()))
                                          );
                              }).get();
    }

}

我还有一个 ApplicationRunner 来测试驱动器:

@Component
public class Runner implements ApplicationRunner {

    private static final Logger LOGGER = LoggerFactory.getLogger(Runner.class);

    private final SpringIntegrationTestApplicationConfiguration.EmailGateway emailGateway;

    public Runner(SpringIntegrationTestApplicationConfiguration.EmailGateway emailGateway) {
        this.emailGateway = emailGateway;
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        LOGGER.info("Sending 1");
        emailGateway.sendEmail("This is my body", "target");
        LOGGER.info("Sending 2");
        emailGateway.sendEmail("This is my body2", "target2");
    }
}

我已经使用

docker-compose
设置了 MySQL:

version: "3.9"
services:
  db:
    image: mysql:8-oracle
    environment:
      MYSQL_ROOT_PASSWORD: 'root'
      MYSQL_ALLOW_EMPTY_PASSWORD: 1
      MYSQL_ROOT_HOST: "%"
      MYSQL_DATABASE: 'sidb'
    ports:
      - "3306:3306"
    healthcheck:
      test: [ "CMD", "mysqladmin" ,"ping", "-h", "localhost" ]
      timeout: 10s
      interval: 5s
      retries: 10

并通过

application.properties
配置以使用该数据库:

spring.datasource.url=jdbc:mysql://localhost:3306/sidb
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

必要的表已通过Flyway配置(参见https://github.com/wimdeblauwe/spring-integration-test/blob/main/src/main/resources/db/migration/V1__init.sql)。

完整源代码可以在https://github.com/wimdeblauwe/spring-integration-test/tree/main

查看

目标是在启动时模拟发送 2 封电子邮件,但电子邮件发送的处理失败。我希望看到 Spring Integration 继续尝试每 1 秒发送一次电子邮件,并且数据库包含这些电子邮件的条目。因此,如果我停止程序并再次启动它,重试就会不断发生。

当我现在运行这个时,90%的时间,我都会遇到异常:

org.springframework.dao.CannotAcquireLockException: PreparedStatementCallback; SQL [INSERT into _spring_integration_CHANNEL_MESSAGE(
    MESSAGE_ID,
    GROUP_KEY,
    REGION,
    CREATED_DATE,
    MESSAGE_PRIORITY,
    MESSAGE_BYTES)
values (?, ?, ?, ?, ?, ?)
]; Lock wait timeout exceeded; try restarting transaction
    at org.springframework.jdbc.support.SQLExceptionSubclassTranslator.doTranslate(SQLExceptionSubclassTranslator.java:78) ~[spring-jdbc-6.1.1.jar:6.1.1]
    at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:107) ~[spring-jdbc-6.1.1.jar:6.1.1]

我注意到的一件事是,如果我将轮询器更改为:

poller.fixedDelay(1000).transactional()

poller.fixedDelay(1000,1000).transactional()

然后问题似乎不再发生,因为在轮询器启动时“电子邮件”可能已经插入数据库中。但这可能只是一个修复,并不是真正的修复,因为我猜这个锁定问题可能随时出现在实际应用程序中?

java mysql spring-boot spring-integration
1个回答
0
投票

您的项目的修复如下:

.handle(sendEmailMessageHandler, e ->
        e.poller(poller ->
                    poller.fixedDelay(1000)
                        .transactional(
                                        new TransactionInterceptorBuilder()
                                                .isolation(Isolation.READ_COMMITTED)
                                                .build())))

如果您无法以某种方式为您的数据库设置

transaction-isolation = READ-COMMITTED

© www.soinside.com 2019 - 2024. All rights reserved.