如何把for循环放在一定时间间隔的睡眠中(scala)

问题描述 投票:0回答:1

我是Scala的新手,我用下面的代码从CSV(有20条记录)中用for循环一个一个地获取行(行),然后我把这些发送到Kafka。

for (line <- FileStream.getLines) {

          val today = Calendar.getInstance.getTime

          val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

          val key = UUID.randomUUID().toString().split("-")(0)

          val value = formatter.format(today) + "," + line


          val data = new ProducerRecord[String, String](topic, key, value)


          println(data.value())

          producer.send(data)

        }

我需要的是在它读完5行之后,也就是说,当它读到5个循环之后,它应该休眠5秒,然后它应该从它离开的地方(从第6行开始)继续。

我期望的输出是。

2012-08-13T00:00:00.000Z,92.29,92.59,91.74,92.4,2075391.0,MMM
2012-08-14T00:00:00.000Z,92.36,92.5,92.01,92.3,1843476.0,MMM
2012-08-15T00:00:00.000Z,92.0,92.74,91.94,92.54,1983395.0,MMM
2012-08-16T00:00:00.000Z,92.75,93.87,92.21,93.74,3395145.0,MMM
2012-08-17T00:00:00.000Z,93.93,94.3,93.59,94.24,3069513.0,MMM 

              ------5 seconds sleep------


2012-08-20T00:00:00.000Z,94.0,94.17,93.55,93.89,1640008.0,MMM
    2012-08-21T00:00:00.000Z,93.98,94.1,92.99,93.21,2302988.0,MMM
    2012-08-22T00:00:00.000Z,92.56,93.36,92.43,92.68,2463908.0,MMM
    2012-08-23T00:00:00.000Z,92.65,92.68,91.79,91.98,1823757.0,MMM
    2012-08-24T00:00:00.000Z,92.03,92.97,91.94,92.83,1945796.0,MMM

那么我们如何才能实现这个目标呢?先谢谢你

scala for-loop thread-sleep
1个回答
1
投票

你可以先把这个for循环转化为迭代器,把循环中的公共元素去掉。

val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
// be careful because SimpledateFormat is not threads safe

val iter: Iterator[Unit] = Iterator.continually {
  val line = FileStream.getLines
  val today = Calendar.getInstance.getTime
  val key = UUID.randomUUID().toString().split("-")(0)
  val value = formatter.format(today) + "," + line
  val data = new ProducerRecord[String, String](topic, key, value)
  println(data.value())
  producer.send(data)
}

现在你有一个无限迭代器 可以消耗和发送消息,让我们用滑动方法让它以5条消息为单位分批运行

val iter5 = iter.sliding(5,5)

这将使我们的迭代器以5条消息为一组,不重复工作。现在要把睡眠,在每个批次之后,我们将附加一个睡眠调用。

val iterWithSleeps: Iterator[Unit] = iter5.flatMap(batch => batch :+ {Thread.sleep(5000})

flatMap 将产生一个扁平化的迭代器,每5条消息,将执行一个5秒的睡眠。


0
投票

你可以尝试

producer.send(data)

Thread.sleep(5000) // 5000 milliseconds
© www.soinside.com 2019 - 2024. All rights reserved.