如何使用 Quarkus 读取 Rabbitmq 流

问题描述 投票:0回答:1

如何使用 Quarkus 从 RabbitMQ 流读取数据。是否可以使用 SmallRye 来实现这一目标?还是必须直接使用rabbit客户端?

我愿意

  1. 从rabbit流(而不是队列)读取数据
  2. 重播兔子流中某个时间点的数据

我还没有找到任何关于使用 SmallRye 包装器的参考资料

我有这个配置

# incoming
mp.messaging.incoming.requestss.connector=smallrye-rabbitmq
mp.messaging.incoming.requestss.exchange.name=quote-requests
mp.messaging.incoming.requestss.queue.name=quote-stream
mp.messaging.incoming.requestss.queue.x-queue-type=stream

但我收到以下错误

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - consumer prefetch count is not set for 'queue 'quote-stream' in vhost '/'', class-id=60, method-id=20)
        at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
        at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
        at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
        at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1378)
        ... 15 more
rabbitmq quarkus rabbitmq-stream
1个回答
0
投票

RabbitMQ Streams 模型是仅附加的消息日志。每个消费者开始读取日志的指定偏移量。因此,必须将 QoS(也称为预取计数)设置为传入通道。在 SmallRye RabbitMQ 连接器中,

max-outstanding-messages
配置属性控制 QoS 值

mp.messaging.incoming.requestss.connector=smallrye-rabbitmq
mp.messaging.incoming.requestss.excahnge.name=quote-requests
mp.messaging.incoming.requestss.queue.name=quote-stream
mp.messaging.incoming.requestss.queue.x-queue-type=stream

# sets prefetch count / QoS to 100
mp.messaging.incoming.requestss.max-outstanding-messages=100

注意:我还没有找到配置 RabbitMQ Streams 特定参数的可能性(例如

x-stream-offset
x-max-age
x-stream-max-segment-size-bytes
等),因此我假设每个客户端都将使用这些参数的默认值。如果 SmallRye 能做到那就太好了。

© www.soinside.com 2019 - 2024. All rights reserved.