我在 Scala/flink 中为以下方法编写了自己的
processwindowfunction
,但由于某种原因,我在 IDE 中收到此错误:
Type mismatch.
Required: ProcessWindowFunction[(String, String, Int), NotInferredR, String, TimeWindow]
Found: MyProcessWindowFunction
这是我的实现方法的相关部分:
def question_seven(
commitStream: DataStream[Commit]): DataStream[CommitSummary] = {
val windowedStream = commitStream
.map { x =>
(x.url.split("/")(4) + "/" + x.url.split("/")(5), x.commit.committer.name, x.stats.map(_.total).getOrElse(0))
}
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.days(1)))
.process(new MyProcessWindowFunction) //here is the error occurring
}
在我的自定义窗口进程类下:
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction
import org.apache.flink.util.Collector
import util.Protocol.CommitSummary
import java.lang
import java.text.SimpleDateFormat
import scala.collection.JavaConverters._
class MyProcessWindowFunction extends ProcessWindowFunction[
(String, String, Int),
CommitSummary,
String,
TimeWindow
] {
/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param context The context in which the window is being evaluated.
* @param elements The elements in the window being evaluated.
* @param out A collector for emitting elements.
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
override def process(key: String, context: ProcessWindowFunction[(String, String, Int), CommitSummary, String, TimeWindow]#Context, iterable: lang.Iterable[(String, String, Int)], collector: Collector[CommitSummary]): Unit = {
val elements = iterable.asScala
val simpleDateFormat = new SimpleDateFormat("dd-MM-yyyy")
val windowStart = context.window.getStart
val date = simpleDateFormat.format(windowStart)
val amountOfCommits = elements.size
val committerCounts = elements.groupBy(_._2).mapValues(_.size)
val amountOfCommitters = committerCounts.size
val totalChanges = elements.map(_._3).sum
val maxCommits = committerCounts.values.max
val topCommitters = committerCounts.filter(_._2 == maxCommits).keys.toList.sorted.mkString(",")
val commitSummary = CommitSummary(key, date, amountOfCommits, amountOfCommitters, totalChanges, topCommitters)
collector.collect(commitSummary)
}
}
当我尝试更改其中的某些内容时,我会收到其他错误,抱怨我没有正确实现原始类。如果需要,我可以提供更多信息。据我所知,
process
类应该返回数据流或窗口数据流。
我实际上发现了错误发生的原因。这是由于 ProcessWindowFunction 的导入错误造成的。
我做到了:
org.apache.flink.streaming.api.functions.windowing
但我应该这样做:org.apache.flink.streaming.api.scala.function
第一个是 Java 函数,有不同的参数,而第二个是 scala 函数,有其他参数。
Java参数:[(String, String, Int), NotInferredR, String, TimeWindow] Scala 参数:[(String, String, Int), CommitSummary, String, TimeWindow]
Commitsummary 可以是任何类