我正在使用 Apache Camel 和 CXF 在 Java DSL 中使用 RedHat Fuse 适配器。我正在尝试处理相对较大的传入 SOAP 消息,但遇到了超时问题。然后,发送者会收到 500 内部服务器错误,即使我们这边的处理进展顺利。有时处理时间有点快,然后发送方收到 200,就一切顺利了。
CXF 或 Camel 似乎发生了某种超时(“OUT 消息”花费的时间太长),但对我们来说发生得太快了。问题似乎是我找不到这个 30000ms 的超时设置在哪里以及如何设置,以及如何更改它。
任何关于 CXF 或 Camel 的帮助或指示都将不胜感激,比如这个超时到底是什么,或者哪个属性改变了它。
以下是我收到的错误消息以及我正在使用的路线:
26-09-24 07:52:12.575 [XNIO-1 task-1] WARN o.a.cxf.phase.PhaseInterceptorChain - Application {http://example.namespace.com/wsdl/Service/V1}MyService#{http://example.namespace.com/wsdl/Service/V1}processRequest has thrown exception, unwinding now
org.apache.cxf.interceptor.Fault: The OUT message was not received within: 30000 millis. Exchange[91F3A9AAC0D6092-0000000000000000]
at org.apache.camel.component.cxf.jaxws.CxfConsumer$CxfConsumerInvoker.checkFailure(CxfConsumer.java:350)
at org.apache.camel.component.cxf.jaxws.CxfConsumer$CxfConsumerInvoker.setResponseBack(CxfConsumer.java:322)
at org.apache.camel.component.cxf.jaxws.CxfConsumer$CxfConsumerInvoker.asyncInvoke(CxfConsumer.java:212)
at org.apache.camel.component.cxf.jaxws.CxfConsumer$CxfConsumerInvoker.invoke(CxfConsumer.java:161)
...
Caused by: org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 30000 millis. Exchange[91F3A9AAC0D6092-0000000000000000]
at org.apache.camel.component.cxf.jaxws.CxfConsumer$CxfConsumerInvoker.asyncInvoke(CxfConsumer.java:210)
... 105 common frames omitted
这是我正在使用的路线(Java DSL):
package com.example.routes;
import com.example.logging.CustomLogger;
import com.example.beans.TransformData;
import com.example.beans.UUIDGenerator;
import com.example.configuration.ExceptionConnectionErrorResolver;
import com.example.webservices.WebServiceConfig;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.builder.ThreadPoolBuilder;
import org.apache.camel.CamelContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.concurrent.ExecutorService;
@Component
public class MyServiceRoute extends RouteBuilder {
@Autowired
CustomLogger customLogger;
@Autowired
WebServiceConfig webServiceConfig;
@Autowired
UUIDGenerator generator;
@Autowired
ExceptionConnectionErrorResolver exceptionConnectionErrorResolver;
@Override
public void configure() throws Exception {
CamelContext context = getContext();
ExecutorService executorService = new ThreadPoolBuilder(context)
.poolSize(150)
.maxPoolSize(150)
.maxQueueSize(-1)
.build("SplitThreadPool");
from("cxf:bean:processRequestSoapEndpoint?exchangePattern=InOnly").routeId("myServiceRoute")
.onException(org.apache.camel.support.processor.validation.SchemaValidationException.class,org.xml.sax.SAXParseException.class,org.apache.camel.ValidationException.class).id("onTransformationException")
.handled(true)
.bean(customLogger, "logCode(${exchangeProperty.BreadCrumb}, CODE-992, Validation of message failed. ${exception.message})").id("logCode992")
.process(exchange -> {
Exception exception = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
exchange.getContext().createProducerTemplate()
.sendBody("log:error?showAll=true&multiline=true",
"CODE-992 Validation Error: " + exception.getMessage() + "\nStack Trace:\n" + getStackTrace(exception));
})
.end()
.onException(javax.xml.transform.TransformerException.class).id("OnValidationException")
.handled(true)
.bean(customLogger, "logCode(${exchangeProperty.BreadCrumb}, CODE-032)").id("logCode032")
.end()
.onException(org.apache.cxf.interceptor.Fault.class,org.apache.camel.http.base.HttpOperationFailedException.class).id("onEndpointException")
.handled(true)
.bean(customLogger, "logCode(${exchangeProperty.BreadCrumb},CODE-043)").id("logCode043")
.bean(customLogger, "logCode(${exchangeProperty.BreadCrumb},CODE-996,Error: ${exception.message})").id("logCode996")
.end()
.onException(java.net.ConnectException.class,org.apache.http.conn.HttpHostConnectException.class).id("onConnectionEndpoint")
.handled(true)
.maximumRedeliveries(exceptionConnectionErrorResolver.getMaxRedeliveries())
.redeliveryDelay(exceptionConnectionErrorResolver.getRedeliveryDelay())
.retryAttemptedLogLevel(LoggingLevel.WARN)
.bean(customLogger, String.format("logCode(${exchangeProperty.BreadCrumb},CODE-042,%s)",webServiceConfig.processRequestSoapEndpointBackend().getAddress())).id("logCode042")
.bean(customLogger, "logCode(${exchangeProperty.BreadCrumb},CODE-996,Error: ${exception.message})").id("logCode996")
.end()
.onException(java.lang.Exception.class).id("onGeneralException")
.handled(true)
.bean(customLogger, "logCode(${exchangeProperty.BreadCrumb},CODE-000)").id("logCode000")
.bean(customLogger, "logCode(${exchangeProperty.BreadCrumb},CODE-996,Error: ${exception.message})").id("logUnexpectedException")
.end()
.bean(customLogger, "logStart").id("logStart")
.bean("PropertyGenerator").id("setProperties")
.setProperty("messageHeaderProperty", xpath("/ns:processRequestHeader", XpathNameSpaces.namespaces)).id("setMessageHeaderProperty")
.setProperty("voorloopRecordProperty", xpath("/ns:processRequestBody/ns:businessData/ns:record", XpathNameSpaces.namespaces)).id("setVoorloopRecordProperty")
.to("validator:classpath:services/ExampleService/xsd/BM/V1/ExampleMessage.xsd").id("validateExampleMessage")
.bean(customLogger, "logCode(${exchangeProperty.BreadCrumb}, CODE-092)").id("logCode092")
.split(xpath("/ns:processRequestBody/ns:businessData/ns:record/ns:person", XpathNameSpaces.namespaces)).parallelProcessing().executorService(executorService).stopOnException().id("splitPerPerson")
.bean(TransformData.class).id("setSplittedBody")
.process(generator).id("setUUIDProp")
.setProperty("timestampProperty").simple("${date:now:yyyy-MM-dd'T'HH:mm:ss.SSS'Z'}").id("setTimestamp")
.to("xslt:classpath:/transformations/ExampleTransform.xsl").id("transformExample")
.to("validator:classpath:services/ExampleService/xsd/BAS/V1/ExampleValidation.xsd").id("validateExampleValidation")
.removeHeaders("*", "BreadCrumb").id("removeHeaders")
.to("cxf:bean:processRequestSoapEndpointBackend").id("toSoapEndpoint")
.bean(customLogger, String.format("logCode(${exchangeProperty.BreadCrumb}, CODE-041,%s)",webServiceConfig.processRequestSoapEndpointBackend().getAddress())).id("logCode041")
.bean(customLogger, "logEnd").id("logCodeEnd");
}
private String getStackTrace(Exception exception) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
exception.printStackTrace(pw);
return sw.toString();
}
}
我们尝试使用并行处理来加快处理时间,并进行一些小调整以尝试将时间控制在 30 秒以下,但真正的问题是这个超时时间太短了。
问题似乎是我找不到这个 30000ms 的超时设置在哪里以及如何设置,以及如何更改它。我尝试过一些类似
cxf.timeout=60000
的东西,但似乎CXF没有这个属性。
像
Cxf.ConnectionTimeOut
和Cxf.ReceiveTimeout
这样的东西似乎指的是HTTP超时,但我认为这是应用程序中的某种内部超时。
原来这是“ContinationTimeout”。
更改了我的路线中使用的 cxf 端点,如下所示:
from("cxf:bean:processRequestSoapEndpoint?continuationTimeout=60000")
另请参阅此处的答案: