我有一个MongoDB集合。我想使用Vert.x将此集合的一个子集(基于某些查询)流式传输到Kafka主题。
到目前为止,我已经为KafkaWriteStream
创建了一个Vert.x verticle,它似乎与虚拟硬编码字符串一起使用。
不幸的是,我不确定如何从MongoDB获取文件流,以后可以使用专用的Verticle流式传输到Kafka。
我怎么处理这个?有没有人有任何相关的链接或信息?
Vert.x Mongo Client模块允许从Mongo查询中获取ReadStream
。
当你拥有它和KafkaWriteStream
时,你可以开始从Mongo读取数据并将其发送给Kafka。
但请注意背压:如果Mongo加载数据太快,请不要压倒Kafka客户端。你的算法应该是这样的:
readstream.handler(data -> {
transformedData = transform(data); // your own transformations
writestream.write(transformedData);
if (writestream.writeQueueFull()) {
readstream.pause();
writestream.drainHandler(done -> {
readstream.resume();
});
}
});