Apache Flink.ProcessWindowFunction KeyBy() ProcessWindowFunction KeyBy()的多个值。

问题描述 投票:0回答:1

我试图在DataStream中使用WindowFunction,我的目标是有一个像下面这样的Query。

SELECT  *,
    count(id) OVER(PARTITION BY country) AS c_country,
    count(id) OVER(PARTITION BY city) AS c_city,
    count(id) OVER(PARTITION BY city) AS c_addrs
FROM fm
ORDER BY country

帮助我完成了国家字段的汇总,但我需要在同一时间窗口中进行两个字段的汇总。我不知道是否可以将两个或更多键放在 keyBy( ) 对于这种情况

val parsed = stream2.map(x=> {
      val arr = x.split(",")
      (arr(0).toInt, arr(1), arr(2))
    })


    parsed
    .keyBy(x => x._2) 
      .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
      .process(new ProcessWindowFunction[
        (Int, String, String), (Int, String, String, Int), String, TimeWindow   
      ]() {
        override def process(key: String, context: Context,
                             elements: Iterable[(Int, String, String)],
                             out: Collector[(Int, String, String, Int)]): Unit = {  
          val lst = elements.toList
          lst.foreach(x => out.collect((x._1, x._2, x._3, lst.size)))
      }
      }).print().setParallelism(1)

这对于第一次汇总来说是很好的,但是我缺少了同一时间窗口中城市字段的第二次汇总。

输入数据。

10,"SPAIN","BARCELONA","C1"
20,"SPAIN","BARCELONA","C2"
30,"SPAIN","MADRID","C3"
30,"SPAIN","MADRID","C3"
80,"SPAIN","MADRID","C4"
90,"SPAIN","VALENCIA","C5"
40,"ITALY","ROMA","C6"
41,"ITALY","ROMA","C7"
42,"ITALY","VENECIA","C8"
50,"FRANCE","PARIS","C9"
60,"FRANCE","PARIS","C9"
70,"FRANCE","MARSELLA","C10"

预期输出

(10,"SPAIN","BARCELONA",6,2,1)
(20,"SPAIN","BARCELONA",6,2,1)
(30,"SPAIN","MADRID",6,3,2)
(30,"SPAIN","MADRID",6,3,2)
(80,"SPAIN","MADRID",6,3,1)
(90,"SPAIN","VALENCIA",6,1,1)
(50,"FRANCE","PARIS",3,2,1)
(60,"FRANCE","PARIS",3,2,1)
(70,"FRANCE","MARSELLA",3,1,1)
(40,"ITALY","ROMA",3,2,2)
(41,"ITALY","ROMA",3,2,2)
(42,"ITALY","VENECIA",3,1,1)

---------------- 更新2

我目前想做3列的聚合。如果我使用的选项是连锁KeyBy()输出,但这可能会变得非常长和复杂,而且不是很可读。除此之外,我还放了一个Time.seconds(1)的时间窗口,因为如果没有这个窗口,上面的KeyBy()输出就会作为单个事件。

我的兴趣是,如果我可以使这些聚合在一个单一的过程函数。

我有那么长的代码...

parsed
    .keyBy(_.country) // key by product id.
      .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
      .process(new ProcessWindowFunction[
        AlarmasIn, AlarmasOut, String, TimeWindow
      ]() {
        override def process(key: String, context: Context,
                             elements: Iterable[AlarmasIn],
                             out: Collector[AlarmasOut]): Unit = {
          val lst = elements.toList
          lst.foreach(x => out.collect(AlarmasOut(x.id, x.country, x.city,x.address, lst.size,0,0)))
      }
      })
      .keyBy( _.city).window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
        .process(new ProcessWindowFunction[
          AlarmasOut, AlarmasOut, String, TimeWindow
        ]() {
          override def process(key: String,
                               context: Context,
                               elements: Iterable[AlarmasOut],
                               out: Collector[AlarmasOut]): Unit = {
            val lst = elements.toList
            lst.foreach(x => out.collect(AlarmasOut(x.id, x.country, x.city,x.address,x.c_country,lst.size,x.c_addr)))
          }
        })
      .keyBy( _.address).window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
      .process(new ProcessWindowFunction[
        AlarmasOut, AlarmasOut, String, TimeWindow
      ]() {
        override def process(key: String,
                             context: Context,
                             elements: Iterable[AlarmasOut],
                             out: Collector[AlarmasOut]): Unit = {
          val lst = elements.toList
          lst.foreach(x => out.collect(AlarmasOut(x.id, x.country, x.city,x.address,x.c_country,x.c_city,lst.size)))
        }
      })
      .print()

/// CASE CLASS
 case class AlarmasIn(
                      id: Int,
                      country: String,
                      city: String,
                      address: String
                    )

  case class AlarmasOut(
                       id: Int,
                       country: String,
                       city: String,
                       address: String,
                       c_country: Int,
                       c_city: Int,
                       c_addr: Int
                     )

scala stream apache-flink flink-streaming
1个回答
1
投票

作为 city 的一个子类。country,您可以通过 city 维度,然后再通过 country 的维度,如果一个维度不是另一个dim的子维度,你可以把这两个dim连在一起,然后生成一个新的key,然后在process func中实现聚合逻辑。

val parsed = stream2.map(x=> {
      val arr = x.split(",")
      (arr(0).toInt, arr(1), arr(2))
    })


    parsed
    .keyBy(x => x._3) 
      .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
      .process(new ProcessWindowFunction[
        (Int, String, String), (Int, String, String, Int), String, TimeWindow   
      ]() {
        override def process(key: String, context: Context,
                             elements: Iterable[(Int, String, String)],
                             out: Collector[(Int, String, String, Int)]): Unit = {  
          val lst = elements.toList
          lst.foreach(x => out.collect((x._1, x._2, x._3, lst.size)))
      }
      })
      .keyBy(x => x._2)
      .process(new ProcessWindowFunction[
        (Int, String, String), (Int, String, String, Int), String, TimeWindow   
      ]() {
        override def process(key: String, context: Context,
                             elements: Iterable[(Int, String, String)],
                             out: Collector[(Int, String, String, Int)]): Unit = {  
          val cnt = 0
          for(e:elements){
             cnt += e._4
          }

          lst.foreach(x => out.collect((x._1, x._2, x._3, cnt)))
      }
      }).print().setParallelism(1)

如果一个维度不是另一个维度的子维度,你可以将这两个维度连接起来,然后生成一个新的key,然后自己在process func中实现聚合逻辑。

keyBy(x=>x._2+x._3)

更新

我认为不可能在一个过程函数中计算结果,因为你试图用不同的键进行统计。唯一的方法是将全局并行度设置为1(即使使用了全局并行度,所有的输入数据也会流向一个下游任务),一步到位。keyby func)或将输入数据广播给所有下游任务。

由于你的计算实际上有一些共同的过程逻辑,最好做一些抽象。

import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object CountJob {

  @throws[Exception]
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val transactions: DataStream[Record] = env
      .addSource(new SourceFunction[Record] {
        override def run(sourceContext: SourceFunction.SourceContext[Record]): Unit = {
          while (true) {
            sourceContext.collect(Record(1, "a", "b", "c", 1, 1, 1))
            Thread.sleep(1000)
          }
        }

        override def cancel(): Unit = {

        }
      })
      .name("generate source")

    transactions.keyBy(_.addr)
      .timeWindow(Time.seconds(1))
      .process(new CustomCountProc("ADDR"))
      .keyBy(_.city)
      .timeWindow(Time.seconds(1))
      .process(new CustomCountProc("CITY"))
      .keyBy(_.country)
      .timeWindow(Time.seconds(1))
      .process(new CustomCountProc("COUNTRY"))
      .print()


    env.execute("Count Job")
  }
}

// a common operator to process different aggregation
class CustomCountProc(aggrType: String) extends ProcessWindowFunction[Record, Record, String, TimeWindow] {

  override def process(key: String, context: Context, elements: Iterable[Record], out: Collector[Record]): Unit = {

    for (e <- elements) {
      if ("ADDR".equals(aggrType)) {
        out.collect(Record(-1, e.country, e.city, key, e.country_cnt, e.city_cnt, elements.size))
      }
      else if ("CITY".equals(aggrType)) {
        out.collect(Record(-1, e.country, key, e.country, e.country_cnt, elements.size, e.addr_cnt))
      }
      else if ("COUNTRY".equals(aggrType)) {
        out.collect(Record(-1, key, e.city, e.addr, elements.size, e.city_cnt, e.addr_cnt))
      }
    }

  }
}

case class Record(
                   id: Int,
                   country: String,
                   city: String,
                   addr: String,
                   country_cnt: Int,
                   city_cnt: Int,
                   addr_cnt: Int
                 ) {
}

另外,我不确定输出是否真的符合你的期望。由于你没有实现一个有状态的过程函数,我认为你是想计算每一批数据的聚合结果,而每一批数据都包含了一秒钟的时间窗口中摄取的数据。输出不会一直累积,每批数据都会从零开始。

通过使用 timeWindow 功能,您还需要注意到 TimeCharacteristic 默认情况下是处理时间。

输出也可能会延迟,因为使用3个相应的 window 函数。假设第一个过程func在一秒内完成了聚合,并将结果转发给下游。由于第二个过程func也有一个 timewindow 1秒的时间,它不会发出任何结果,直到收到上游的下一批输出。

看看别人有没有更好的办法来解决你的问题。

© www.soinside.com 2019 - 2024. All rights reserved.