.process 方法不接受自定义流程窗口类

问题描述 投票:0回答:1

我在 Scala/flink 中为以下方法编写了自己的

processwindowfunction
,但由于某种原因,我在 IDE 中收到此错误:

Type mismatch.
Required: ProcessWindowFunction[(String, String, Int), NotInferredR, String, TimeWindow]
Found: MyProcessWindowFunction

这是我的实现方法的相关部分:

 def question_seven(
                      commitStream: DataStream[Commit]): DataStream[CommitSummary] = {

    val windowedStream = commitStream
      .map { x =>
        (x.url.split("/")(4) + "/" + x.url.split("/")(5), x.commit.committer.name, x.stats.map(_.total).getOrElse(0))
      }
      .keyBy(_._1)
      .window(TumblingEventTimeWindows.of(Time.days(1)))
      .process(new MyProcessWindowFunction) //here is the error occurring
}

在我的自定义窗口进程类下:

import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction
import org.apache.flink.util.Collector
import util.Protocol.CommitSummary

import java.lang
import java.text.SimpleDateFormat
import scala.collection.JavaConverters._

class MyProcessWindowFunction extends ProcessWindowFunction[
  (String, String, Int),
  CommitSummary,
  String,
  TimeWindow
] {


  /**
   * Evaluates the window and outputs none or several elements.
   *
   * @param key      The key for which this window is evaluated.
   * @param context  The context in which the window is being evaluated.
   * @param elements The elements in the window being evaluated.
   * @param out      A collector for emitting elements.
   * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
   */
  override def process(key: String, context: ProcessWindowFunction[(String, String, Int), CommitSummary, String, TimeWindow]#Context, iterable: lang.Iterable[(String, String, Int)], collector: Collector[CommitSummary]): Unit = {
    val elements = iterable.asScala
    val simpleDateFormat = new SimpleDateFormat("dd-MM-yyyy")
    val windowStart = context.window.getStart
    val date = simpleDateFormat.format(windowStart)

    val amountOfCommits = elements.size

    val committerCounts = elements.groupBy(_._2).mapValues(_.size)

    val amountOfCommitters = committerCounts.size

    val totalChanges = elements.map(_._3).sum

    val maxCommits = committerCounts.values.max
    val topCommitters = committerCounts.filter(_._2 == maxCommits).keys.toList.sorted.mkString(",")

    val commitSummary = CommitSummary(key, date, amountOfCommits, amountOfCommitters, totalChanges, topCommitters)
    collector.collect(commitSummary)
  }

}

当我尝试更改其中的某些内容时,我会收到其他错误,抱怨我没有正确实现原始类。如果需要,我可以提供更多信息。据我所知,

process
类应该返回数据流或窗口数据流。

scala apache-flink flink-streaming
1个回答
0
投票

我实际上发现了错误发生的原因。这是由于 ProcessWindowFunction 的导入错误造成的。

我做到了:

org.apache.flink.streaming.api.functions.windowing
但我应该这样做:
org.apache.flink.streaming.api.scala.function

第一个是 Java 函数,有不同的参数,而第二个是 scala 函数,有其他参数。

Java参数:[(String, String, Int), NotInferredR, String, TimeWindow] Scala 参数:[(String, String, Int), CommitSummary, String, TimeWindow]

Commitsummary 可以是任何类

© www.soinside.com 2019 - 2024. All rights reserved.