使用这些参数:
canada {
hosts = ["dd.weather.gc.ca"]
username = "anonymous"
password = "anonymous"
port = 5671
exchange = "xpublic"
queue = "q_anonymous_gsk"
routingKey = "v02.post.observations.swob-ml.#"
requestedHeartbeat = 300
ssl = true
}
我可以使用NewMotion/Akka连接到加拿大的气象服务,但是当我尝试op-rabbit时,我得到:
ACCESS_REFUSED - access to exchange 'xpublic' in vhost '/' refused for user 'anonymous'
[INFO] [foo-akka.actor.default-dispatcher-7] [akka://foo/user/$a/connection] akka://foo/user/$a/connection connected to amqp://anonymous@{dd.weather.gc.ca:5671}:5671//
[INFO] [foo-op-rabbit.default-channel-dispatcher-6] [akka://foo/user/$a/connection/$a] akka://foo/user/$a/connection/$a connected
[INFO] [foo-akka.actor.default-dispatcher-4] [akka://foo/user/$a/connection/confirmed-publisher-channel] akka://foo/user/$a/connection/confirmed-publisher-channel connected
[INFO] [foo-akka.actor.default-dispatcher-4] [akka://foo/user/$a/connection/$b] akka://foo/user/$a/connection/$b connected
[ERROR] [foo-akka.actor.default-dispatcher-3] [akka://foo/user/$a/subscription-q_anonymous_gsk-1] Connection related error while trying to re-bind a consumer to q_anonymous_gsk. Waiting in anticipating of a new channel.
...
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - access to exchange 'xpublic' in vhost '/' refused for user 'anonymous', class-id=40, method-id=10)
以下在NewMotion / Akka中的作品:
val inQueue = "q_anonymous_gsk"
val inExchange = "xpublic"
val canadaQueue = canadaChannel.queueDeclare(inQueue, false, true, false, null).getQueue
canadaChannel.queueBind(canadaQueue, inExchange, inQueue)
val consumer = new DefaultConsumer(canadaChannel) {
override def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]) {
val s = fromBytes(body)
if (republishElsewhere) {
// ...
}
}
}
canadaChannel.basicConsume(canadaQueue, true, consumer)
但是使用op-rabbit
这样:
val inQueue = "q_anonymous_gsk"
val inExchange = "xpublic"
val inRoutingKey = "v02.post.observations.swob-ml.#""
val rabbitCanada: ActorRef = actorSystem.actorOf(Props(classOf[RabbitControl], connParamsCanada))
def runSubscription(): SubscriptionRef = Subscription.run(rabbitCanada) {
channel(qos = 3) {
consume(topic(queue(inQueue), List(inRoutingKey))) {
(body(as[String]) & routingKey) { (msg, key) =>
ack
}
}
}
}
我在本文顶部附近看到ACCESS_REFUSED
错误。为什么?如果要使用op-rabbit
,该如何解决?