我们有一个使用smallrye-amqp 连接器的Quarkus 应用程序。我有以下用于向主题发布消息的代码。
@ApplicationScoped
public class InsightLifecycleObserver {
public static final String INSIGHTS_DELETED_TOPIC = "insightscentral/insight/deleted";
@Inject
@Channel(INSIGHTS_DELETED_TOPIC)
@OnOverflow(value = Strategy.NONE)
Emitter<byte[]> insightsEmitter;
void onInsightRemoved(@Observes(during = AFTER_SUCCESS) InsightDeletedEvent event) {
final var tenantId = tenantResolver.getTenantId();
try {
var insightLifeCycleEvent = convertToLifeCycleEvent(event);
var eventEnvelope =
EventEnvelope.builder()
.withId(UUID.randomUUID())
.withData(
PojoCloudEventData.wrap(insightLifeCycleEvent, objectMapper::writeValueAsBytes))
.build();
// send the event to the Insights topic
insightsEmitter
.send(eventEnvelope.serialize())
.whenComplete(
(result, throwable) -> {
if (throwable != null) {
LOGGER.error("Failed to emit InsightDeleted event", throwable);
metricsService.incrementErrorMessageCounter(
MessageType.INSIGHT_DELETED_EVENT, tenantId);
} else {
LOGGER.debug(
"Published InsightDeleted event for insight with id: {} and sri: {}",
event.insight().id(),
event.insight().sri());
metricsService.incrementMessageCounter(
MessageType.INSIGHT_DELETED_EVENT, tenantId);
}
});
} catch (Exception e) {
LOGGER.error("Failed to publish InsightDeleted event", e);
metricsService.incrementErrorMessageCounter(MessageType.INSIGHT_DELETED_EVENT, tenantId);
}
}
调用此钩子时,出现以下异常:
io.vertx.core.impl.NoStackTraceThrowable: message rejected (REJECTED): Error{condition=amqp:not-allowed, description='SMF AD ack response error', info={solace.response_code=400, solace.response_text=Queue Not Found}}
我做错了什么?
您可以使用 Solace Quarkus 扩展来代替 AMQP 连接器,它为 Solace PubSub 提供托管客户端和消息传递连接器:https://solacelabs.github.io/solace-quarkus/