如何避免与 Apache Camel 和 CXF 一起使用的 RedHat Fuse 适配器中的交换超时?

问题描述 投票:0回答:1

我正在使用 Apache Camel 和 CXF 在 Java DSL 中使用 RedHat Fuse 适配器。我正在尝试处理相对较大的传入 SOAP 消息,但遇到了超时问题。然后,发送者会收到 500 内部服务器错误,即使我们这边的处理进展顺利。有时处理时间有点快,然后发送方收到 200,就一切顺利了。

CXF 或 Camel 似乎发生了某种超时(“OUT 消息”花费的时间太长),但对我们来说发生得太快了。问题似乎是我找不到这个 30000ms 的超时设置在哪里以及如何设置,以及如何更改它。

任何关于 CXF 或 Camel 的帮助或指示都将不胜感激,比如这个超时到底是什么,或者哪个属性改变了它。

以下是我收到的错误消息以及我正在使用的路线:

26-09-24 07:52:12.575 [XNIO-1 task-1] WARN  o.a.cxf.phase.PhaseInterceptorChain - Application {http://example.namespace.com/wsdl/Service/V1}MyService#{http://example.namespace.com/wsdl/Service/V1}processRequest has thrown exception, unwinding now
org.apache.cxf.interceptor.Fault: The OUT message was not received within: 30000 millis. Exchange[91F3A9AAC0D6092-0000000000000000]
    at org.apache.camel.component.cxf.jaxws.CxfConsumer$CxfConsumerInvoker.checkFailure(CxfConsumer.java:350)
    at org.apache.camel.component.cxf.jaxws.CxfConsumer$CxfConsumerInvoker.setResponseBack(CxfConsumer.java:322)
    at org.apache.camel.component.cxf.jaxws.CxfConsumer$CxfConsumerInvoker.asyncInvoke(CxfConsumer.java:212)
    at org.apache.camel.component.cxf.jaxws.CxfConsumer$CxfConsumerInvoker.invoke(CxfConsumer.java:161)
    ...
Caused by: org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 30000 millis. Exchange[91F3A9AAC0D6092-0000000000000000]
    at org.apache.camel.component.cxf.jaxws.CxfConsumer$CxfConsumerInvoker.asyncInvoke(CxfConsumer.java:210)
    ... 105 common frames omitted

这是我正在使用的路线(Java DSL):

package com.example.routes;

import com.example.logging.CustomLogger;
import com.example.beans.TransformData;
import com.example.beans.UUIDGenerator;
import com.example.configuration.ExceptionConnectionErrorResolver;
import com.example.webservices.WebServiceConfig;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.builder.ThreadPoolBuilder;
import org.apache.camel.CamelContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.concurrent.ExecutorService;

@Component
public class MyServiceRoute extends RouteBuilder {
    @Autowired
    CustomLogger customLogger;

    @Autowired
    WebServiceConfig webServiceConfig;

    @Autowired
    UUIDGenerator generator;

    @Autowired
    ExceptionConnectionErrorResolver exceptionConnectionErrorResolver;

    @Override
    public void configure() throws Exception {

        CamelContext context = getContext();

        ExecutorService executorService = new ThreadPoolBuilder(context)
                .poolSize(150)
                .maxPoolSize(150)
                .maxQueueSize(-1)
                .build("SplitThreadPool");

        from("cxf:bean:processRequestSoapEndpoint?exchangePattern=InOnly").routeId("myServiceRoute")

        .onException(org.apache.camel.support.processor.validation.SchemaValidationException.class,org.xml.sax.SAXParseException.class,org.apache.camel.ValidationException.class).id("onTransformationException")
                .handled(true)
                .bean(customLogger, "logCode(${exchangeProperty.BreadCrumb}, CODE-992, Validation of message failed. ${exception.message})").id("logCode992")
                .process(exchange -> {
                    Exception exception = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
                    exchange.getContext().createProducerTemplate()
                            .sendBody("log:error?showAll=true&multiline=true",
                                    "CODE-992 Validation Error: " + exception.getMessage() + "\nStack Trace:\n" + getStackTrace(exception));
                })
        .end()

        .onException(javax.xml.transform.TransformerException.class).id("OnValidationException")
                .handled(true)
                .bean(customLogger, "logCode(${exchangeProperty.BreadCrumb}, CODE-032)").id("logCode032")
        .end()

        .onException(org.apache.cxf.interceptor.Fault.class,org.apache.camel.http.base.HttpOperationFailedException.class).id("onEndpointException")
                .handled(true)
                .bean(customLogger, "logCode(${exchangeProperty.BreadCrumb},CODE-043)").id("logCode043")
                .bean(customLogger, "logCode(${exchangeProperty.BreadCrumb},CODE-996,Error: ${exception.message})").id("logCode996")
        .end()

        .onException(java.net.ConnectException.class,org.apache.http.conn.HttpHostConnectException.class).id("onConnectionEndpoint")
                .handled(true)
                .maximumRedeliveries(exceptionConnectionErrorResolver.getMaxRedeliveries())
                .redeliveryDelay(exceptionConnectionErrorResolver.getRedeliveryDelay())
                .retryAttemptedLogLevel(LoggingLevel.WARN)
                .bean(customLogger, String.format("logCode(${exchangeProperty.BreadCrumb},CODE-042,%s)",webServiceConfig.processRequestSoapEndpointBackend().getAddress())).id("logCode042")
                .bean(customLogger, "logCode(${exchangeProperty.BreadCrumb},CODE-996,Error: ${exception.message})").id("logCode996")
        .end()

        .onException(java.lang.Exception.class).id("onGeneralException")
                .handled(true)
                .bean(customLogger, "logCode(${exchangeProperty.BreadCrumb},CODE-000)").id("logCode000")
                .bean(customLogger, "logCode(${exchangeProperty.BreadCrumb},CODE-996,Error: ${exception.message})").id("logUnexpectedException")
        .end()

                .bean(customLogger, "logStart").id("logStart")
                .bean("PropertyGenerator").id("setProperties")
                .setProperty("messageHeaderProperty", xpath("/ns:processRequestHeader", XpathNameSpaces.namespaces)).id("setMessageHeaderProperty")
                .setProperty("voorloopRecordProperty", xpath("/ns:processRequestBody/ns:businessData/ns:record", XpathNameSpaces.namespaces)).id("setVoorloopRecordProperty")
                .to("validator:classpath:services/ExampleService/xsd/BM/V1/ExampleMessage.xsd").id("validateExampleMessage")
                .bean(customLogger, "logCode(${exchangeProperty.BreadCrumb}, CODE-092)").id("logCode092")
                .split(xpath("/ns:processRequestBody/ns:businessData/ns:record/ns:person", XpathNameSpaces.namespaces)).parallelProcessing().executorService(executorService).stopOnException().id("splitPerPerson")
                    .bean(TransformData.class).id("setSplittedBody")
                    .process(generator).id("setUUIDProp")
                    .setProperty("timestampProperty").simple("${date:now:yyyy-MM-dd'T'HH:mm:ss.SSS'Z'}").id("setTimestamp")
                    .to("xslt:classpath:/transformations/ExampleTransform.xsl").id("transformExample")
                    .to("validator:classpath:services/ExampleService/xsd/BAS/V1/ExampleValidation.xsd").id("validateExampleValidation")
                    .removeHeaders("*", "BreadCrumb").id("removeHeaders")
                    .to("cxf:bean:processRequestSoapEndpointBackend").id("toSoapEndpoint")
                    .bean(customLogger, String.format("logCode(${exchangeProperty.BreadCrumb}, CODE-041,%s)",webServiceConfig.processRequestSoapEndpointBackend().getAddress())).id("logCode041")
                .bean(customLogger, "logEnd").id("logCodeEnd");
    }

        private String getStackTrace(Exception exception) {
            StringWriter sw = new StringWriter();
            PrintWriter pw = new PrintWriter(sw);
            exception.printStackTrace(pw);
            return sw.toString();
        }
}

我们尝试使用并行处理来加快处理时间,并进行一些小调整以尝试将时间控制在 30 秒以下,但真正的问题是这个超时时间太短了。

问题似乎是我找不到这个 30000ms 的超时设置在哪里以及如何设置,以及如何更改它。我尝试过一些类似

cxf.timeout=60000
的东西,但似乎CXF没有这个属性。

Cxf.ConnectionTimeOut
Cxf.ReceiveTimeout
这样的东西似乎指的是HTTP超时,但我认为这是应用程序中的某种内部超时。

java apache-camel cxf jbossfuse camel-cxf
1个回答
0
投票

原来这是“ContinationTimeout”。

更改了我的路线中使用的 cxf 端点,如下所示:

from("cxf:bean:processRequestSoapEndpoint?continuationTimeout=60000")

另请参阅此处的答案:

Camel CXF:Soap 客户端超时

© www.soinside.com 2019 - 2024. All rights reserved.