如何在 Camel 中永远重试失败的消息?

问题描述 投票:0回答:1

我有一个场景,我的骆驼路由监听 KAFKA 通知消息,将它们转发到基于队列的日志服务,然后转发到 REST 端点。例如,如果发生故障,如果消息可以传递到 REST 端点,则应无限重试该消息,直到端点启动。在这里我希望禁用日志记录。例如,如果再次传递交换,则不应将消息路由到日志记录服务的 to 端点。另外,我还提供了 onException 块的代码。在 onException 块中,仅应在第一次时记录错误消息。我怎样才能实现这个目标?

from(<<kafka endpoint>>)
                .process(exchange -> {

                    //set exchange headers and message properties
                    exchange.setProperty("EndpointURI", exchange.getFromEndpoint().getEndpointUri());
                    String[][] messageProperties = setHeadersAsMessageProperties(exchange);
                
                    //Generate xml message and set it to the body of the exchange
                    String xmlPayload = log.generateLoggingMessage(trackingID, exchange.getIn().getBody(String.class), fromComponentName, fromEndpoint, serviceName, operation, process, "", user, false, "", messageProperties);
                    exchange.getIn().setBody(xmlPayload);

                })
                .removeHeaders("kafka*")
                .removeHeaders("camel*")

                //Send the exchange to the logging queue
                .to(<<jms endpoint>>)
                .process(exchange -> {
                    //set exchange headers and message properties
                    String xmlPayload = log.generateLoggingMessage(exchange.getIn().getHeader("ESB_TrackingId", String.class), exchange.getProperty("InitialBody", String.class), toComponentName, toEndpoint, serviceName, operation, process, "", user, false, "", messageProperties);
                    exchange.getIn().setBody(xmlPayload);

                })
                //Send the exchange to the logging queue
                 .to(<<jms endpoint>>)
                .removeHeaders("cameljms*")
                .removeHeaders("camel*")
                .process(exchange -> {
                    String originalBody = exchange.getProperty("InitialBody", String.class);
                    exchange.getIn().setBody(originalBody);
                })
                .to(<<rest endpoint>>)

    
apache-kafka apache-camel
1个回答
0
投票

您应该创建重新投递策略,并将重试次数设置为-1。

protected RedeliveryPolicyDefinition createRedeliveryPolicy(String maximumRedeliveries,
                                                            String redeliveryDelay) {
    RedeliveryPolicyDefinition redeliveryPolicy = new RedeliveryPolicyDefinition();
    redeliveryPolicy.setMaximumRedeliveries(maximumRedeliveries);
    redeliveryPolicy.setRedeliveryDelay(redeliveryDelay);
    redeliveryPolicy.retryAttemptedLogLevel(LoggingLevel.WARN);
    redeliveryPolicy.logRetryStackTrace(false);
    return redeliveryPolicy;
}

其中参数maximumRedeliveries设置为-1。

一个例子是:

    route
                .onException(Exception.class)
                .handled(true)
                .process(this::logException)
                .onRedelivery(this::logAttemptRedelivery)
     .setRedeliveryPolicyType(
createRedeliveryPolicy(maxRedeliveryRetries, redeliveryDelay)
);
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.