我有一个
IntegrationFlow
从 JMS messageDrivenChannelAdapter
接收消息,我想对其进行一些性能测试。我想在测试中创建一个 Jms 源,以便我可以控制它生成的消息的数量和内容。最好的方法是什么?
我的集成流程:
@Bean
public IntegrationFlow artemisIntegrationFlow(
@Qualifier("jmsConnectionFactory") final ConnectionFactory connFactory,
@Qualifier(TaskExecutorConfig.EXECUTOR_MESSAGE_HANDLER) TaskExecutor taskExecutor,
final KpiMongoService kpiMongoService) {
return IntegrationFlow
.from(Jms.messageDrivenChannelAdapter(connFactory)
.id(MESSAGE_CHANNEL_ID)
.destination(JMS_QUEUE_NAME))
.transform(Transformers.fromJson(ArtemisKpisMessageDto.class))
.aggregate(aggregatorSpec -> aggregatorSpec
.correlationStrategy(msg -> GROUP_MESSAGE_STATIC_CORRELATION_KEY)
.releaseStrategy(getReleaseStrategy())
.groupTimeout(GROUP_MESSAGE_BUFFERING_MAX_MILLIS)
.sendPartialResultOnExpiry(true)
.expireGroupsUponCompletion(true))
.handle(handlerToMongo(kpiMongoService))
.nullChannel();
}
目前,当我运行此代码时,最多需要 15/20 秒才能保存大约 1800 Kpi 的组(这些组的平均大小约为每 Kpi 160 字节,
handle()
只是在 MongoDB 中执行 saveAll() 操作)我觉得很慢。
你的问题不清楚。
您绝对可以使用
EmbeddedActiveMQ
代理:https://activemq.apache.org/components/artemis/documentation/latest/embedding-activemq.html
然后在单元测试中创建一个
JmsTemplate
,以在该 JMS_QUEUE_NAME
中生成所需数量的数据。
不过,您可能会在
aggregate()
中发现问题。因为不清楚您的 getReleaseStrategy()
到底是什么,因为您的消息没有以任何方式分组。