如何使用相同的@Incoming 方法并行处理多个 AMQP 消息

问题描述 投票:0回答:3

是否可以使用带有 quarkus 和 smallrye-reactive-messaging

@Incoming("queue")
注释的相同方法并行处理多个 amqp - 消息?

更准确地说,我有以下课程:

@ApplicationScoped
public class Receiver {
    @Incoming("test-queue")
    public void process(String input) {
        System.out.println("start processing:" + input);
        try {
            Thread.sleep(10_000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("end processing:" + input);
    }
}

在application.properties中配置:

amqp-host: localhost
amqp-port: 5672
amqp-username: quarkus
amqp-password: quarkus
mp.messaging.incoming.test-queue.connector: smallrye-amqp
mp.messaging.incoming.test-queue.address: test-queue

现在我想通过配置定义有多少消息的并行处理是可能的。例如,在一个 4 核 cpu 上它应该并行运行 4 个。

目前我只能添加 4 个具有不同名称的方法副本以允许这种并行性,但这是不可配置的。

amqp quarkus activemq-artemis smallrye-reactive-messaging
3个回答
0
投票

我不确定,但我认为 Reactive Messaging 不支持您的要求。

然而,你可以用另一种方式做你想做的事。我认为这也是使用消息传递的更好的整体模式。

http://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/2.5/amqp/amqp.html#amqp-inbound

查找具有 CompletionStage 和显式 ack() 的示例。该变体是异步的,因此如果您将它与 Java 现有的并发设施结合起来,您将获得高效的并行处理。

我会将传入的工作发送给执行者,然后在它完成时让执行任务 ack()。


0
投票

我刚遇到同样的场景,这里是规范打算如何处理并发: 来自 eclipse Microprofile 规范

基本上,而不是有一个像这样的方法的类:

@Incoming("test-queue")
public void process(String input) {}

你有 2 个这样的类:

@ApplicationScoped
public class MessageSubscriberProducer {

    @Incoming("test-queue")
    public Subscriber<String> createSubscriber() {
        return new SubscriberImpl();
    }
}

public class SubsciberImpl implements Subscriber<String> {

    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(4);  // this tells how many messages to grab right away
    }

    @Override
    public void onNext(String val) {
        // do processing
        this.subscription.request(1);  // grab 1 more
    }
}

这具有将处理代码从 vert.x 事件循环线程移动到工作线程池的额外优势。


0
投票

这个问题有点老,但我会回答以帮助有同样问题的人。

我最近在使用 Quarkus 和反应式编程进行一些测试时遇到了同样的问题。 ChatGPT 帮不了我,所以我必须更好地理解 Vertx,我确实创建了这两个存储库https://github.com/eduardo-villasboas/vertx-pochttps://github.com/eduardo-villasboas/ quarkus-poc.

我正在使用 kotlin,所以我使用了 Vertx 与协程的集成。在 Quarkus 示例 https://github.com/eduardo-villasboas/quarkus-poc 中,您可以看到 Quarkus 快速入门示例 (https://quarkus.io/guides/rabbitmq) 的改编版。此实现将访问 Vertx 协程上下文以使用挂起函数。这并不完全符合您的要求,即运行多个连接但从并发性的角度来看具有相同或更好的效果。这是解决方案的示例。请注意,此方法不存在 ack 问题。我希望能有所帮助,我将在此处使用流而不是协同程序放置另一个示例。

@ApplicationScoped 类 QuoteConsumer(私有 val vertx:Vertx){

private val logger = LoggerFactory.getLogger(this.javaClass)

private val random = Random()

@Channel("quotes")
lateinit var quoteResultEmitter: Emitter<Quote>

@Incoming("requests")
@Blocking
@Throws(
    InterruptedException::class
)
fun process(quoteRequest: String) {
    logger.info("Starting process [thread: ${threadIdentification()}, message: ${quoteRequest}]")

    CoroutineScope(vertx.dispatcher()).launch {
        //delay(3000)
        logger.info("Heavy process start [thread: ${threadIdentification()}, message: ${quoteRequest}]")
        delay(5000)
        logger.info("Heavy process finish [thread: ${threadIdentification()}, message: ${quoteRequest}]")
        val value = random.nextInt(100)

        logger.info("Finishing process [thread: ${threadIdentification()}, message: ${quoteRequest}]")
        quoteResultEmitter.send(Quote(quoteRequest, value))

    }

}

private fun threadIdentification() =
    "Thread.id=${Thread.currentThread().id},Thread.name=${Thread.currentThread().name}"

}

© www.soinside.com 2019 - 2024. All rights reserved.