我是Scala的新手,我用下面的代码从CSV(有20条记录)中用for循环一个一个地获取行(行),然后我把这些发送到Kafka。
for (line <- FileStream.getLines) {
val today = Calendar.getInstance.getTime
val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val key = UUID.randomUUID().toString().split("-")(0)
val value = formatter.format(today) + "," + line
val data = new ProducerRecord[String, String](topic, key, value)
println(data.value())
producer.send(data)
}
我需要的是在它读完5行之后,也就是说,当它读到5个循环之后,它应该休眠5秒,然后它应该从它离开的地方(从第6行开始)继续。
我期望的输出是。
2012-08-13T00:00:00.000Z,92.29,92.59,91.74,92.4,2075391.0,MMM
2012-08-14T00:00:00.000Z,92.36,92.5,92.01,92.3,1843476.0,MMM
2012-08-15T00:00:00.000Z,92.0,92.74,91.94,92.54,1983395.0,MMM
2012-08-16T00:00:00.000Z,92.75,93.87,92.21,93.74,3395145.0,MMM
2012-08-17T00:00:00.000Z,93.93,94.3,93.59,94.24,3069513.0,MMM
------5 seconds sleep------
2012-08-20T00:00:00.000Z,94.0,94.17,93.55,93.89,1640008.0,MMM
2012-08-21T00:00:00.000Z,93.98,94.1,92.99,93.21,2302988.0,MMM
2012-08-22T00:00:00.000Z,92.56,93.36,92.43,92.68,2463908.0,MMM
2012-08-23T00:00:00.000Z,92.65,92.68,91.79,91.98,1823757.0,MMM
2012-08-24T00:00:00.000Z,92.03,92.97,91.94,92.83,1945796.0,MMM
那么我们如何才能实现这个目标呢?先谢谢你
你可以先把这个for循环转化为迭代器,把循环中的公共元素去掉。
val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
// be careful because SimpledateFormat is not threads safe
val iter: Iterator[Unit] = Iterator.continually {
val line = FileStream.getLines
val today = Calendar.getInstance.getTime
val key = UUID.randomUUID().toString().split("-")(0)
val value = formatter.format(today) + "," + line
val data = new ProducerRecord[String, String](topic, key, value)
println(data.value())
producer.send(data)
}
现在你有一个无限迭代器 可以消耗和发送消息,让我们用滑动方法让它以5条消息为单位分批运行
val iter5 = iter.sliding(5,5)
这将使我们的迭代器以5条消息为一组,不重复工作。现在要把睡眠,在每个批次之后,我们将附加一个睡眠调用。
val iterWithSleeps: Iterator[Unit] = iter5.flatMap(batch => batch :+ {Thread.sleep(5000})
flatMap
将产生一个扁平化的迭代器,每5条消息,将执行一个5秒的睡眠。
你可以尝试
producer.send(data)
Thread.sleep(5000) // 5000 milliseconds