我们有一个DSTREAM,如
val ssc = new StreamingContext(sc, Seconds(1))
val kS = KafkaUtils.createDirectStream[String, TMapRecord](
ssc,
PreferConsistent,
Subscribe[String, TMapRecord](topicsSetT, kafkaParamsInT)).
mapPartitions(part => {
part.map(_.value())
}).
mapPartitions(part1 => {
part1.map(c => {
TMsg(1,
c.field1,
c.field2, //And others
c.startTimeSeconds
)
})
})
因此,每个RDD有一些(技术)重点领域,我可以使用dediplicate DSTREAM一堆TMsg
对象。基本上,如果我们有二合一TMSG对象或两次与同一field1
和field2
离散RDDS,他们不到1秒(我们期待在startTimeSeconds
)不同,它是一个副本。
我看了看mapWithState。是的,我可以创造ķ> - V DSTREAM像
val mappedStream = kS.map(m => (m.field1, m.field2) -> m.startTimeSeconds)
因此,我可以使用的功能,但不明白我怎么可以用它来过滤重复。
窗口功能可忍不住了,因为解决方案是写在DStreams我不能使用(结构化流).deduplicate功能。
任何解决方案?谢谢
附:星火版本是2.2
你可以使用mapWithState
。这是一个很好manual how to use Stateful Streaming。你的情况,你可以:
1.设置检查点:
val ssc = new StreamingContext(sc, Seconds(1))
ssc.checkpoint("path/to/persistent/storage")
2.Define更新功能:
def update(key: (String, String),
value: Option[Int],
state: State[Int]): Option[((String, String), Int)] = {
(value, state.getOption()) match {
case (Some(_), Some(_)) => None
case (Some(v), _) =>
# you can update your state in any value you want
# it is just a marker that value not new
state.update(value.get)
Option((key, v))
case (_, _) if state.isTimingOut() => None
}
}
3.Make状态规格:
val stateSpec =
StateSpec
.function(update _)
# it is important to define how long
# you want to check duplication
# in this example check interval is 1 second.
.timeout(Seconds(1))
4.使用它:
ks
# make key->value pairs
.map(m => (m.field1, m.field2) -> m.startTimeSeconds)
.mapWithState(stateSpec)
如果你想利用最后的值来代替,更新功能可能是:
def update(key: (String, String),
value: Option[Int],
state: State[Int]): Option[((String, String), Int)] = {
(value, state.getOption()) match {
case (Some(_), Some(_)) => None
case (Some(v), _) =>
state.update(value.get)
None
case (_, _) if state.isTimingOut() => Option((key, value.get))
}
}