Alpakka AMQP:如何检测声明异常?

问题描述 投票:0回答:1

我有一个带有声明的AMQP源和AMQP接收器:

List<Declaration> declarations = new ArrayList<Declaration>() {{
      add(QueueDeclaration.create(sourceExchangeName));
      add(BindingDeclaration.create(sourceExchangeName, sourceExchangeName).withRoutingKey(sourceRoutingKey));
    }};

amqpSource = AmqpSource
    .committableSource(
        NamedQueueSourceSettings.create(connectionProvider, sourceExchangeName)
            .withDeclarations(declarations),
        bufferSize);

AmqpWriteSettings amqpWriteSettings = AmqpWriteSettings.create(connectionProvider)
    .withExchange("DEST_XCHANGE")
    .withRoutingKey("ROUTE123")
    .withDeclaration(ExchangeDeclaration.create(destinationExchangeName,
        BuiltinExchangeType.DIRECT.getType()));

amqpSink = AmqpSink.create(amqpWriteSettings);

然后我有一个流程..

amqpSource.map(doSomething).async().map(doSomethingElse).async().to(amqpSink)

现在,在我启动该应用程序之后,邮件发送到源队列的消息没有被占用。后来我发现这是由于声明期间发生的错误所致。 (即,当我在“源”和“接收器”设置中删除.withDeclarations(..)时,它工作正常。

所以我的问题:

  1. 如何检测AMQP源和接收器是否正常运行?
  2. 如何忽略声明异常?
  3. 如果发生任何异常,我怎么知道并使系统失败?
akka akka-stream akka-http akka-cluster alpakka
1个回答
0
投票

为了回答1和3,AmqpSink实现了您必须保留的CompletionStage<Done>,并处理(在其中注册了一些回调函数)以观察流的失败和完成。在docs样本中,我们阻止了在生产代码(https://doc.akka.io/docs/alpakka/current/amqp.html#with-sink)中不好的完成阶段,这可能是因为该样本包含在Alpakka测试之一中。请改用通常的CompletionStage回调/转换方法(例如,请参见this introduction)。

CompletionStage将在发生错误,流实现/启动或元素处理期间失败,或者在源到达末尾并且每个元素都已通过您的接收器流之后完成。这意味着对于启动流,如果不是很快就会失败,它将运行。

对于问题2,不确定是否可以忽略声明异常,可能是那些总是失败的连接。

© www.soinside.com 2019 - 2024. All rights reserved.