我正在运行一个基于EventTime测试窗口的简单示例。我可以使用处理时间生成输出,但是当我使用EventTime时,没有输出。请帮我理解我做错了什么。
我正在创建一个大小为10秒的SlidingWindow,它每5秒滑动一次,在窗口结束时,系统将发出在此期间收到的消息数。
input :
a,1513695853 (generated at 13th second, received at 13th second)
a,1513695853 (generated at 13th second, received at 13th second)
a,1513695856 (generated at 16th second, received at 19th second)
a,1513695859 (generated at 13th second, received at 19th second)
第二个字段代表事件的时间戳,代表第13,13,16,19秒的一分钟。
if i am using Processing Time window :
Output :
(a,1)
(a,3)
(a,2)
但是,当我使用事件时间时,没有输出打印。请帮我理解出了什么问题。
package org.apache.flink.window.training;
import java.io.InputStream;
import java.util.Properties;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import com.fasterxml.jackson.databind.ObjectMapper;
public class SocketStream {
private static Properties properties = new Properties();
public static void main(String args[]) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
InputStream inputStream =
SocketStream.class.getClassLoader().getResourceAsStream("local-kafka-server.properties");
properties.load(inputStream);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
FlinkKafkaConsumer010<String> consumer =
new FlinkKafkaConsumer010<>("test-topic", new SimpleStringSchema(), properties);
DataStream<Element> socketStockStream =
env.addSource(consumer).map(new MapFunction<String, Element>() {
@Override
public Element map(String value) throws Exception {
String split[] = value.split(",");
Element element = new Element(split[0], Long.parseLong(split[1]));
return element;
}
}).assignTimestampsAndWatermarks(new TimestampExtractor());
socketStockStream.map(new MapFunction<Element, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(Element value) throws Exception {
return new Tuple2<String, Integer>(value.getId(), 1);
}
}).keyBy(0).timeWindow(Time.seconds(10), Time.seconds(5))
.sum(1).
print();
env.execute();
}
public static class TimestampExtractor implements AssignerWithPunctuatedWatermarks<Element> {
private static final long serialVersionUID = 1L;
@Override
public long extractTimestamp(Element element, long previousElementTimestamp) {
return element.getTimestamp();
}
@Override
public Watermark checkAndGetNextWatermark(Element lastElement, long extractedTimestamp) {
// TODO Auto-generated method stub
return null;
}
}
}
事件时间处理需要正确生成timestamps and watermarks。
代码中的TimestampExtractor
不会生成水印,但总是返回null
。