我有twitter流API,我从那里检索推文。我还列出了我要考虑的所需单词。
我想做的是始终将最准确的值存储到我的Cassandra数据库中,该值对应于该单词在一天中使用了多少次。
我当时考虑使用窗口函数每5秒合并一次结果,然后将此合并值写入数据库。
我不知道这是否是最好的方法。如果这是最好的方法,我尝试按照文档做一个简单的示例,但它不会每5秒对单词进行分组。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val counts =
env.fromElements("foo bar test test baz foo", "yes no no yes", "hi hello hi hello")
.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.filter(word => Words.listOfWords.contains(word) || Words.listOfWords2.contains(word))
.map { (_, 1) }
.keyBy(0)
.timeWindow(Time.seconds(5)).sum( 1)
counts.print()
env.execute("test-code")
}
嗯,当前它不起作用,因为您是根据元素创建DataStream
的,但这并不是窗口化的最佳方法,因为您实际上没有5秒钟的运行时间来创建多个窗口,因此所有消息都将转到同一窗口。但是,如果您要在实际的Twitter API上运行此程序,则通常应将项目正确分组到窗口中。