如何在Apache Flink的StreamTableEnvironment中实现timeWindow()?]

问题描述 投票:0回答:1

每个人,我想在StreamTableEnvironment中使用flink时间窗口。

我以前将timeWindow(Time.seconds())函数与来自kafka主题的dataStream一起使用。对于外部问题,我正在将此DataStream转换为DataTable并使用sqlQuery()应用SQL查询。

我想使用SQL进行x次窗口聚合,然后将其发送到另一个kafka主题

数据源:

    val stream = senv
      .addSource(new FlinkKafkaConsumer[String]("flink", new SimpleStringSchema(), properties))

以前的汇总示例:

    val windowCounts = stream.keyBy("x").timeWindow(Time.seconds(5), Time.seconds(5))

当前数据表:

    val tableA = tableEnv.fromDataStream(parsed, 'user, 'product, 'amount)

在这一部分中,应该有一个查询,每X次进行一次汇总

    val result = tableEnv.sqlQuery(
          s"SELECT * FROM $tableA WHERE amount > 2".stripMargin)

或多或少,我的汇总将超过y(PARTITION BY x)谢谢!

每个人,我都想在StreamTableEnvironment中使用flink时间窗口。我以前曾将timeWindow(Time.seconds())函数与来自kafka主题的dataStream一起使用。对于外部...

apache-spark apache-kafka stream apache-flink
1个回答
0
投票

Ververica's training for Flink SQL将帮助您。在Querying Dynamic Tables with SQL的部分中,包括一些仅涵盖此类查询的练习/示例。

© www.soinside.com 2019 - 2024. All rights reserved.