Spring-Integration
版本
5.5.18
,我有一个
flow
,它适用于小型输入集,但由于通过
java.lang.StackOverflowError
进行递归调用并使用
channel
结束递归,因此对于较大的集会遇到 filter
。对于修改流程以减少产生
java.lang.StackOverflowError
的可能性,是否有任何建议? (我确实知道为 JVM 请求更大的线程堆栈。)是否有一种方法可以通过 DSL 声明迭代地生成此内容,而不是递归地生成?或者这可能需要一个
MessageHandler
在
Java
Spring-Integration
之外的
Java-DSL
中执行迭代?展示问题的示例流程可能是:
package org.example.filter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.*;
import org.springframework.messaging.Message;
import java.util.function.Consumer;
@Configuration
public class OverflowFlows {
private static final String CURRENT_ITERATION = "currentIteration";
private static final String ACCUMULATOR = "accumulator";
private static final String RECURSION_CHANNEL = "recurseToBodyValue";
@Bean
public IntegrationFlow callRecursiveFlow() {
return IntegrationFlows
.from("createsOverflow")
.enrichHeaders(h -> h.headerFunction(CURRENT_ITERATION, m -> 0))
.enrichHeaders(h -> h.headerFunction(ACCUMULATOR, m -> 0))
.gateway(callRecurseToBodyValueFlow)
.transform("headers.accumulator")
.get();
}
private final IntegrationFlow callRecurseToBodyValueFlow =
flow -> flow.channel(RECURSION_CHANNEL);
@Bean IntegrationFlow recurseToBodyValueFlow() {
return IntegrationFlows
.from(RECURSION_CHANNEL)
.enrichHeaders(h -> h.headerExpression(CURRENT_ITERATION,
"headers.currentIteration + 1",
true))
.filter("headers.currentIteration < payload", DISCARD_RETURNS_CURRENT_MESSAGE)
.enrichHeaders(h -> h.headerExpression(ACCUMULATOR,
"headers.accumulator + headers.currentIteration",
true))
.channel(RECURSION_CHANNEL)
.get();
}
public static final Consumer<FilterEndpointSpec> DISCARD_RETURNS_CURRENT_MESSAGE = discardReturnsCurrentMessage();
public static Consumer<FilterEndpointSpec> discardReturnsCurrentMessage() {
return filterSpec -> filterSpec.discardFlow(IntegrationFlowDefinition::bridge);
}
}
与 MessageChannel
和
MessagingGateway
的声明结合使用:
package org.example.filter;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component;
@Component
public class OverflowMessageChannels {
@Bean(name = "createsOverflow")
public MessageChannel createsOverflow() {
return MessageChannels.direct().get();
}
}
package org.example.filter;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
@MessagingGateway(name = "overflowGateway", defaultReplyTimeout = "20")
public interface OverflowGateway {
@Gateway(requestChannel = "createsOverflow")
Integer createsOverflow(Integer value);
}
1000
迭代单元测试会导致堆栈溢出。任何有关策略的指示将不胜感激。
import org.example.filter.OverflowGateway;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.util.stream.IntStream;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = org.example.filter.ApiConfiguration.class)
public class OverflowTest {
@Autowired
OverflowGateway overflowGateway;
@Test
public void call_with_one_returns_one() {
assertThat(overflowGateway.createsOverflow(1), equalTo(sumOfRange(1)));
}
@Test
public void call_with_20_returns_20() {
assertThat(overflowGateway.createsOverflow(20), equalTo(sumOfRange(20)));
}
@Test
public void call_with_1000_returns_1000() {
// causes java.lang.StackOverflowError
assertThat(overflowGateway.createsOverflow(1000), equalTo(sumOfRange(1000)));
}
private static int sumOfRange(final int topOfRange) {
return IntStream.range(1, topOfRange).sum();
}
}
StackOverflowError
:java 调用堆栈的最大深度是多少?。是的,Spring Integration 通过通道和端点之间的消息传递性质增加了一些开销,但从逻辑上讲,这并不重要:如果您的逻辑在同一线程中递归,那么当添加更多步骤时,您最终会得到
StackOverflowError
那个循环。不过,您可以破坏它,使
RECURSION_CHANNEL
成为
ExecutorChannel
实例。通过这种方式,您可以将该循环的每次迭代推送到单独的线程中,并且调用堆栈变得不递归。
MessageHandler
来执行迭代。这不是我想要的形式,但这是我想要的。测试方法
call_with_1000_returns_1000()
适用于此表格。
package org.example.filter;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.dsl.*;
import org.springframework.stereotype.Component;
@Component
public class OverflowFlows {
public static final String CURRENT_ITERATION = "currentIteration";
public static final String ACCUMULATOR = "accumulator";
private final IteratingMessageHandler iteratingMessageHandler;
public OverflowFlows(final IteratingMessageHandler iteratingMessageHandler) {
this.iteratingMessageHandler = iteratingMessageHandler;
}
@Bean
public IntegrationFlow callRecursiveFlow() {
return IntegrationFlows
.from("createsOverflow")
.enrichHeaders(h -> h.headerFunction(CURRENT_ITERATION, m -> 0))
.enrichHeaders(h -> h.headerFunction(ACCUMULATOR, m -> 0))
.gateway(callIterativeHandler)
.transform("headers.accumulator") // put final head value in body
.get();
}
private final IntegrationFlow callIterativeHandler = flow -> flow.handle(iteratingMessageHandler);
}
package org.example.filter;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import static org.example.filter.OverflowFlows.ACCUMULATOR;
import static org.example.filter.OverflowFlows.CURRENT_ITERATION;
@Component
public class IteratingMessageHandler extends AbstractReplyProducingMessageHandler {
@Override
protected Object handleRequestMessage(Message<?> message) {
do {
final Integer iteration = getIntegerValue(message, CURRENT_ITERATION);
message = setHeader(message, CURRENT_ITERATION, iteration + 1);
if (isCurrentIterationBelowPayload(message)) {
final Integer accumulator = getIntegerValue(message, ACCUMULATOR);
message = setHeader(message, ACCUMULATOR, accumulator + iteration + 1);
}
} while (isCurrentIterationBelowPayload(message));
return message;
}
private static boolean isCurrentIterationBelowPayload(final Message<?> message) {
return getIntegerValue(message, CURRENT_ITERATION) < (Integer) message.getPayload();
}
private static Integer getIntegerValue(final Message<?> message, final String header) {
return (Integer) message.getHeaders().get(header);
}
private static Message<?> setHeader(final Message<?> message, final String header, final Object value) {
return MessageBuilder.fromMessage(message)
.setHeader(header, value)
.build();
}
}