我有一个 kafka 流应用程序(用 java springboot 编写),它有 3 个处理器。在第二个处理器中,我想以异步方式调用rest api,当响应到来时,我想将响应转发到下一个处理器。我不希望流线程在执行其余 api 时阻塞其余 api 调用并获取其他消息。有什么办法可以做到这一点吗? 我拥有的处理器示例
public class secondprocessorsupplier implements ProcessorSupplier<IndexedRecord, IndexedRecord> {
public secondprocessorsupplier() {
}
private static final ProcessorName PROCESSOR_NAME = ProcessorName.SECOND_PROCESSOR;
@Override
public Processor<IndexedRecord, IndexedRecord> get() {
return new Secondprocessor();
}
private class Secondprocessor extends Processor<IndexedRecord, IndexedRecord> {
@Override
public void init(ProcessorContext context) {
this.context=context;
}
@Override
public void process(final IndexedRecord keyInput, final IndexedRecord streamMessageInput) {
final HttpClient httpclient= HttpClient.newBuilder()
.connectTimeout(Duration.ofMillis(30000))
.version(getClientVersion(httpClientVersion))
.followRedirects(behavior.getHttpClientRedirect())
.sslContext(sslContext)
.sslParameters(sslParameters)
.build());
httpclient.sendAsync(httprequest, HttpResponse.BodyHandlers.ofByteArray()).thenAccept(
response -> context.forward(keyInput, response)
)
}
@Override
public ProcessorName getProcessorName() {
return PROCESSOR_NAME;
}
}
}
在上面的例子中,在thenAccept执行中上下文中的当前节点为null并且forward不起作用
您可以使用 STREAM_TIME 标点符号来异步处理事件:
init
方法中注册标点符号 initSchedule = context.schedule(Duration.ofMillis(100), PunctuationType.STREAM_TIME, this::processRecords);
process
方法中将记录存储到本地集合,如果您只需要处理一次并且记录数据并不重要。如果数据很关键,寄存器内部存储来保存数据(推荐)。processRecords
中对所有存储的记录进行 synch HTTP 调用并转发数据。再次强调,如果您只想处理记录一次,将其从集合/存储中删除。它是如何工作的 - STREAM_TIME 标点符号仅在记录到达时触发。因此,如果每秒有一条记录,它将每 1 秒 + ~100 毫秒触发一次。如果您每 50 毫秒收到一条记录,标点符号将每 100 毫秒触发一次,平均处理 2 条记录。