如何使用 AMQP 将消息发布到 Solace 代理中的主题

问题描述 投票:0回答:1

我们有一个使用smallrye-amqp 连接器的Quarkus 应用程序。我有以下用于向主题发布消息的代码。

@ApplicationScoped
public class InsightLifecycleObserver {

    public static final String INSIGHTS_DELETED_TOPIC = "insightscentral/insight/deleted";

    @Inject
    @Channel(INSIGHTS_DELETED_TOPIC)
    @OnOverflow(value = Strategy.NONE)
    Emitter<byte[]> insightsEmitter;
    void onInsightRemoved(@Observes(during = AFTER_SUCCESS) InsightDeletedEvent event) {


        final var tenantId = tenantResolver.getTenantId();
        try {
          var insightLifeCycleEvent = convertToLifeCycleEvent(event);
          var eventEnvelope =
              EventEnvelope.builder()
                  .withId(UUID.randomUUID())
                  .withData(
                      PojoCloudEventData.wrap(insightLifeCycleEvent, objectMapper::writeValueAsBytes))
                  .build();

          // send the event to the Insights topic
          insightsEmitter
              .send(eventEnvelope.serialize())
              .whenComplete(
                  (result, throwable) -> {

                    if (throwable != null) {
                      LOGGER.error("Failed to emit InsightDeleted event", throwable);
                      metricsService.incrementErrorMessageCounter(
                          MessageType.INSIGHT_DELETED_EVENT, tenantId);
                    } else {
                      LOGGER.debug(
                          "Published InsightDeleted event for insight with id: {} and sri: {}",
                          event.insight().id(),
                          event.insight().sri());
                      metricsService.incrementMessageCounter(
                          MessageType.INSIGHT_DELETED_EVENT, tenantId);
                    }
                  });
        } catch (Exception e) {
          LOGGER.error("Failed to publish InsightDeleted event", e);
          metricsService.incrementErrorMessageCounter(MessageType.INSIGHT_DELETED_EVENT, tenantId);
        }
    }

调用此钩子时,出现以下异常:

io.vertx.core.impl.NoStackTraceThrowable: message rejected (REJECTED): Error{condition=amqp:not-allowed, description='SMF AD ack response error', info={solace.response_code=400, solace.response_text=Queue Not Found}}

我做错了什么?

quarkus amqp smallrye-reactive-messaging
1个回答
0
投票

您可以使用 Solace Quarkus 扩展来代替 AMQP 连接器,它为 Solace PubSub 提供托管客户端和消息传递连接器:https://solacelabs.github.io/solace-quarkus/

© www.soinside.com 2019 - 2024. All rights reserved.