我正在尝试使用 Spring Integration 实现发件箱模式。我配置了以下 bean:
@Configuration
public class SpringIntegrationTestApplicationConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(SpringIntegrationTestApplicationConfiguration.class);
public static final String CONCURRENT_METADATA_STORE_PREFIX = "_spring_integration_";
@MessagingGateway
public interface EmailGateway {
@Gateway(requestChannel = "mailbox")
void sendEmail(String mailBody,
@Header String target);
}
@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
JdbcChannelMessageStore jdbcChannelMessageStore = new JdbcChannelMessageStore(dataSource);
jdbcChannelMessageStore.setTablePrefix(CONCURRENT_METADATA_STORE_PREFIX);
jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(
new MySqlChannelMessageStoreQueryProvider());
return jdbcChannelMessageStore;
}
@Bean
ConcurrentMetadataStore concurrentMetadataStore(DataSource dataSource) {
JdbcMetadataStore jdbcMetadataStore = new JdbcMetadataStore(dataSource);
jdbcMetadataStore.setTablePrefix(CONCURRENT_METADATA_STORE_PREFIX);
return jdbcMetadataStore;
}
@Bean
MessageHandler sendEmailMessageHandler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String target = (String) message.getHeaders().get("target");
LOGGER.info("not sending email with body: {} {}", message, target);
throw new RuntimeException("");
}
};
}
@Bean
QueueChannel mailboxChannel(JdbcChannelMessageStore jdbcChannelMessageStore) {
return MessageChannels.queue(jdbcChannelMessageStore, "mailbox").getObject();
}
@Bean
public IntegrationFlow buildFlow(ChannelMessageStore channelMessageStore,
MessageHandler sendEmailMessageHandler) {
return IntegrationFlow.from("mailbox")
.routeToRecipients(routes -> {
routes
.transactional()
.recipientFlow(flow -> flow
.channel(channels -> channels.queue(channelMessageStore, "outbox"))
.handle(sendEmailMessageHandler, e -> e.poller(poller -> poller.fixedDelay(1000).transactional()))
);
}).get();
}
}
我还有一个 ApplicationRunner 来测试驱动器:
@Component
public class Runner implements ApplicationRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(Runner.class);
private final SpringIntegrationTestApplicationConfiguration.EmailGateway emailGateway;
public Runner(SpringIntegrationTestApplicationConfiguration.EmailGateway emailGateway) {
this.emailGateway = emailGateway;
}
@Override
public void run(ApplicationArguments args) throws Exception {
LOGGER.info("Sending 1");
emailGateway.sendEmail("This is my body", "target");
LOGGER.info("Sending 2");
emailGateway.sendEmail("This is my body2", "target2");
}
}
我已经使用
docker-compose
设置了 MySQL:
version: "3.9"
services:
db:
image: mysql:8-oracle
environment:
MYSQL_ROOT_PASSWORD: 'root'
MYSQL_ALLOW_EMPTY_PASSWORD: 1
MYSQL_ROOT_HOST: "%"
MYSQL_DATABASE: 'sidb'
ports:
- "3306:3306"
healthcheck:
test: [ "CMD", "mysqladmin" ,"ping", "-h", "localhost" ]
timeout: 10s
interval: 5s
retries: 10
并通过
application.properties
配置以使用该数据库:
spring.datasource.url=jdbc:mysql://localhost:3306/sidb
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
必要的表已通过Flyway配置(参见https://github.com/wimdeblauwe/spring-integration-test/blob/main/src/main/resources/db/migration/V1__init.sql)。
完整源代码可以在https://github.com/wimdeblauwe/spring-integration-test/tree/main
查看目标是在启动时模拟发送 2 封电子邮件,但电子邮件发送的处理失败。我希望看到 Spring Integration 继续尝试每 1 秒发送一次电子邮件,并且数据库包含这些电子邮件的条目。因此,如果我停止程序并再次启动它,重试就会不断发生。
当我现在运行这个时,90%的时间,我都会遇到异常:
org.springframework.dao.CannotAcquireLockException: PreparedStatementCallback; SQL [INSERT into _spring_integration_CHANNEL_MESSAGE(
MESSAGE_ID,
GROUP_KEY,
REGION,
CREATED_DATE,
MESSAGE_PRIORITY,
MESSAGE_BYTES)
values (?, ?, ?, ?, ?, ?)
]; Lock wait timeout exceeded; try restarting transaction
at org.springframework.jdbc.support.SQLExceptionSubclassTranslator.doTranslate(SQLExceptionSubclassTranslator.java:78) ~[spring-jdbc-6.1.1.jar:6.1.1]
at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:107) ~[spring-jdbc-6.1.1.jar:6.1.1]
我注意到的一件事是,如果我将轮询器更改为:
poller.fixedDelay(1000).transactional()
到
poller.fixedDelay(1000,1000).transactional()
然后问题似乎不再发生,因为在轮询器启动时“电子邮件”可能已经插入数据库中。但这可能只是一个修复,并不是真正的修复,因为我猜这个锁定问题可能随时出现在实际应用程序中?
您的项目的修复如下:
.handle(sendEmailMessageHandler, e ->
e.poller(poller ->
poller.fixedDelay(1000)
.transactional(
new TransactionInterceptorBuilder()
.isolation(Isolation.READ_COMMITTED)
.build())))
如果您无法以某种方式为您的数据库设置
transaction-isolation = READ-COMMITTED
。