我有一个要求,我使用Spring Batch从SQL DB中读取一堆行(数千),并在将其写入Kafka主题之前调用REST服务来丰富内容。
使用Spring Reactive webClient时,如何限制活动非阻塞服务调用的数量?在使用Spring Batch读取数据后,我是否应该以某种方式在循环中引入Flux?
(我理解delayElements的用法,并且它有不同的用途,因为当一个Get Service Call带来大量数据并且你希望服务器速度变慢时 - 虽然这里我的用例有点不同有许多WebClient调用,并希望限制调用的数量,以避免内存不足问题,但仍然获得非阻塞调用的优势)。
非常有趣的问题。我思考它,并想到了如何做到这一点的几个想法。我将分享我的想法,希望这里有一些想法可能会帮助你进行调查。
不幸的是,我不熟悉Spring Batch。然而,这听起来像rate limiting或经典producer-consumer problem的问题。
因此,我们有一个生产者,它产生了许多消息,消费者无法赶上,而中间的缓冲变得无法忍受。
我看到的问题是,正如您所描述的那样,您的Spring Batch进程不是作为流或管道工作,而是您的反应式Web客户端。
因此,如果我们能够将数据作为流读取,那么随着记录开始进入管道,那些将由反应式Web客户端处理,并且使用反压,我们可以控制来自生产者/数据库的流的流侧。
制片人
因此,我要改变的第一件事是如何从数据库中提取记录。我们需要控制当时从数据库中读取的记录数量,方法是分页我们的数据检索或控制fetch size,然后通过反压控制有多少记录通过反应流水线向下游发送。
因此,请考虑以下(基本)数据库数据检索,包装在Flux
中。
Flux<String> getData(DataSource ds) {
return Flux.create(sink -> {
try {
Connection con = ds.getConnection();
con.setAutoCommit(false);
PreparedStatement stm = con.prepareStatement("SELECT order_number FROM orders WHERE order_date >= '2018-08-12'", ResultSet.TYPE_FORWARD_ONLY);
stm.setFetchSize(1000);
ResultSet rs = stm.executeQuery();
sink.onRequest(batchSize -> {
try {
for (int i = 0; i < batchSize; i++) {
if (!rs.next()) {
//no more data, close resources!
rs.close();
stm.close();
con.close();
sink.complete();
break;
}
sink.next(rs.getString(1));
}
} catch (SQLException e) {
//TODO: close resources here
sink.error(e);
}
});
}
catch (SQLException e) {
//TODO: close resources here
sink.error(e);
}
});
}
在上面的例子中:
batchSize
),然后等待它使用背压请求更多。sink.onCancel
,sink.onDispose
),您可能会考虑做一些事情,因为关闭连接和其他资源是至关重要的。消费者方面
在消费者方面,您注册的订户仅在当时以1000的速度请求消息,并且只有在处理该批次后才会请求更多消息。
getData(source).subscribe(new BaseSubscriber<String>() {
private int messages = 0;
@Override
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(1000);
}
@Override
protected void hookOnNext(String value) {
//make http request
System.out.println(value);
messages++;
if(messages % 1000 == 0) {
//when we're done with a batch
//then we're ready to request for more
upstream().request(1000);
}
}
});
在上面的示例中,订阅开始时,它会请求第一批1000条消息。在onNext
中,我们处理第一批,使用Web客户端发出http请求。
批处理完成后,我们从发布者请求另一批1000,依此类推。
你有它!使用背压可以控制当时有多少打开的HTTP请求。
我的例子非常简陋,需要一些额外的工作才能使它生产就绪,但我相信这有希望提供一些可以适应你的Spring Batch场景的想法。