使用Scala通过Flink timeWindow累积字数

问题描述 投票:0回答:1

我有twitter流API,我从那里检索推文。我还列出了我要考虑的所需单词。

我想做的是始终将最准确的值存储到我的Cassandra数据库中,该值对应于该单词在一天中使用了多少次。

我当时考虑使用窗口函数每5秒合并一次结果,然后将此合并值写入数据库。

我不知道这是否是最好的方法。如果这是最好的方法,我尝试按照文档做一个简单的示例,但它不会每5秒对单词进行分组。


    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val counts =
      env.fromElements("foo bar test test baz foo", "yes no no yes", "hi hello hi hello")
      .flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .filter(word => Words.listOfWords.contains(word) || Words.listOfWords2.contains(word))
      .map { (_, 1) }
      .keyBy(0)
      .timeWindow(Time.seconds(5)).sum( 1)

    counts.print()
    env.execute("test-code")

  }
twitter stream apache-flink
1个回答
1
投票

嗯,当前它不起作用,因为您是根据元素创建DataStream的,但这并不是窗口化的最佳方法,因为您实际上没有5秒钟的运行时间来创建多个窗口,因此所有消息都将转到同一窗口。但是,如果您要在实际的Twitter API上运行此程序,则通常应将项目正确分组到窗口中。

© www.soinside.com 2019 - 2024. All rights reserved.