是否可以使用带有 quarkus 和 smallrye-reactive-messaging
@Incoming("queue")
注释的相同方法并行处理多个 amqp - 消息?
更准确地说,我有以下课程:
@ApplicationScoped
public class Receiver {
@Incoming("test-queue")
public void process(String input) {
System.out.println("start processing:" + input);
try {
Thread.sleep(10_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end processing:" + input);
}
}
在application.properties中配置:
amqp-host: localhost
amqp-port: 5672
amqp-username: quarkus
amqp-password: quarkus
mp.messaging.incoming.test-queue.connector: smallrye-amqp
mp.messaging.incoming.test-queue.address: test-queue
现在我想通过配置定义有多少消息的并行处理是可能的。例如,在一个 4 核 cpu 上它应该并行运行 4 个。
目前我只能添加 4 个具有不同名称的方法副本以允许这种并行性,但这是不可配置的。
我不确定,但我认为 Reactive Messaging 不支持您的要求。
然而,你可以用另一种方式做你想做的事。我认为这也是使用消息传递的更好的整体模式。
查找具有 CompletionStage 和显式 ack() 的示例。该变体是异步的,因此如果您将它与 Java 现有的并发设施结合起来,您将获得高效的并行处理。
我会将传入的工作发送给执行者,然后在它完成时让执行任务 ack()。
我刚遇到同样的场景,这里是规范打算如何处理并发: 来自 eclipse Microprofile 规范
基本上,而不是有一个像这样的方法的类:
@Incoming("test-queue")
public void process(String input) {}
你有 2 个这样的类:
@ApplicationScoped
public class MessageSubscriberProducer {
@Incoming("test-queue")
public Subscriber<String> createSubscriber() {
return new SubscriberImpl();
}
}
public class SubsciberImpl implements Subscriber<String> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(4); // this tells how many messages to grab right away
}
@Override
public void onNext(String val) {
// do processing
this.subscription.request(1); // grab 1 more
}
}
这具有将处理代码从 vert.x 事件循环线程移动到工作线程池的额外优势。
这个问题有点老,但我会回答以帮助有同样问题的人。
我最近在使用 Quarkus 和反应式编程进行一些测试时遇到了同样的问题。 ChatGPT 帮不了我,所以我必须更好地理解 Vertx,我确实创建了这两个存储库https://github.com/eduardo-villasboas/vertx-pochttps://github.com/eduardo-villasboas/ quarkus-poc.
我正在使用 kotlin,所以我使用了 Vertx 与协程的集成。在 Quarkus 示例 https://github.com/eduardo-villasboas/quarkus-poc 中,您可以看到 Quarkus 快速入门示例 (https://quarkus.io/guides/rabbitmq) 的改编版。此实现将访问 Vertx 协程上下文以使用挂起函数。这并不完全符合您的要求,即运行多个连接但从并发性的角度来看具有相同或更好的效果。这是解决方案的示例。请注意,此方法不存在 ack 问题。我希望能有所帮助,我将在此处使用流而不是协同程序放置另一个示例。
@ApplicationScoped 类 QuoteConsumer(私有 val vertx:Vertx){
private val logger = LoggerFactory.getLogger(this.javaClass)
private val random = Random()
@Channel("quotes")
lateinit var quoteResultEmitter: Emitter<Quote>
@Incoming("requests")
@Blocking
@Throws(
InterruptedException::class
)
fun process(quoteRequest: String) {
logger.info("Starting process [thread: ${threadIdentification()}, message: ${quoteRequest}]")
CoroutineScope(vertx.dispatcher()).launch {
//delay(3000)
logger.info("Heavy process start [thread: ${threadIdentification()}, message: ${quoteRequest}]")
delay(5000)
logger.info("Heavy process finish [thread: ${threadIdentification()}, message: ${quoteRequest}]")
val value = random.nextInt(100)
logger.info("Finishing process [thread: ${threadIdentification()}, message: ${quoteRequest}]")
quoteResultEmitter.send(Quote(quoteRequest, value))
}
}
private fun threadIdentification() =
"Thread.id=${Thread.currentThread().id},Thread.name=${Thread.currentThread().name}"
}