我们正在使用kafka构建一个异常管理工具。将有源连接器 - 它将从物理文件中提取记录。另一方面,将有sink连接(mongodb-sinkconnect),它将从主题中提取记录并将其推送到mongoDb。一切正常。
我们需要在不同的主题中捕获事件(用于审计目的)。诸如的事件,
我在这里有几个问题:1。我们能够通过在SourceTask中实例化KafkaProducer将事件发送到不同的主题,一旦文件被完全处理,我们发送一个事件
public class FileSourceTask extends SourceTask {
private Producer<Key, Event> auditProducer;
public void start(Map<String, String> props) {
auditProducer = new KafkaProducer<Key, Event>(auditProps);
}
public List<SourceRecord> poll() {
List<SourceRecord> results = this.filePoller.poll();
if(results.isEmpty() && eventNotSentForCurrentFile) {
Event event = new Event();
auditProducer.send(
new ProducerRecord<Key, Event>(this.props.get("event.topic"), key, event));
}
// futher processing
}
上述方法是否正确?
请建议一种方法来解决这个问题。
非常感谢。
我想,你可以围绕Kafka-connect ReST API构建一些东西
https://docs.confluent.io/current/connect/restapi.html#get--connectors-(string-name)-status
但是,通过这种方式,您需要将观察者保持在连接器状态,并且一旦完成连接器的所有任务,您就可以采取措施。