我有一个场景,我的骆驼路由监听 KAFKA 通知消息,将它们转发到基于队列的日志服务,然后转发到 REST 端点。例如,如果发生故障,如果消息可以传递到 REST 端点,则应无限重试该消息,直到端点启动。在这里我希望禁用日志记录。例如,如果再次传递交换,则不应将消息路由到日志记录服务的 to 端点。另外,我还提供了 onException 块的代码。在 onException 块中,仅应在第一次时记录错误消息。我怎样才能实现这个目标?
from(<<kafka endpoint>>)
.process(exchange -> {
//set exchange headers and message properties
exchange.setProperty("EndpointURI", exchange.getFromEndpoint().getEndpointUri());
String[][] messageProperties = setHeadersAsMessageProperties(exchange);
//Generate xml message and set it to the body of the exchange
String xmlPayload = log.generateLoggingMessage(trackingID, exchange.getIn().getBody(String.class), fromComponentName, fromEndpoint, serviceName, operation, process, "", user, false, "", messageProperties);
exchange.getIn().setBody(xmlPayload);
})
.removeHeaders("kafka*")
.removeHeaders("camel*")
//Send the exchange to the logging queue
.to(<<jms endpoint>>)
.process(exchange -> {
//set exchange headers and message properties
String xmlPayload = log.generateLoggingMessage(exchange.getIn().getHeader("ESB_TrackingId", String.class), exchange.getProperty("InitialBody", String.class), toComponentName, toEndpoint, serviceName, operation, process, "", user, false, "", messageProperties);
exchange.getIn().setBody(xmlPayload);
})
//Send the exchange to the logging queue
.to(<<jms endpoint>>)
.removeHeaders("cameljms*")
.removeHeaders("camel*")
.process(exchange -> {
String originalBody = exchange.getProperty("InitialBody", String.class);
exchange.getIn().setBody(originalBody);
})
.to(<<rest endpoint>>)
您应该创建重新投递策略,并将重试次数设置为-1。
protected RedeliveryPolicyDefinition createRedeliveryPolicy(String maximumRedeliveries,
String redeliveryDelay) {
RedeliveryPolicyDefinition redeliveryPolicy = new RedeliveryPolicyDefinition();
redeliveryPolicy.setMaximumRedeliveries(maximumRedeliveries);
redeliveryPolicy.setRedeliveryDelay(redeliveryDelay);
redeliveryPolicy.retryAttemptedLogLevel(LoggingLevel.WARN);
redeliveryPolicy.logRetryStackTrace(false);
return redeliveryPolicy;
}
其中参数maximumRedeliveries设置为-1。
一个例子是:
route
.onException(Exception.class)
.handled(true)
.process(this::logException)
.onRedelivery(this::logAttemptRedelivery)
.setRedeliveryPolicyType(
createRedeliveryPolicy(maxRedeliveryRetries, redeliveryDelay)
);