[使用flink将hdfs中的kafka数据存储为拼花格式,我正在尝试不起作用的fink文档。
我找不到任何适当的文档来将其存储为拼花文件
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final ParameterTool params = ParameterTool.fromArgs(args);
env.getConfig().setGlobalJobParameters(params);
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", "localhost:9092");
DataStream<String> kafkaData = env.addSource(new FlinkKafkaConsumer011("test", new SimpleStringSchema(), prop));
DataStream<Tuple2<String, Integer>> counts = kafkaData.filter(new FilterFunction<String>() {
public boolean filter(String value) throws Exception {
return true;
}
}).map(new Tokenizer())
.keyBy(0).sum(1);
counts.writeAsText("");
env.execute("Streaming WordCount");
如果有人可以帮助您提供适当的文档或代码,请联系我们
写在documentation of StreamingFileSink
上:
重要:使用StreamingFileSink时需要启用检查点。零件文件只能在成功的检查点上完成。如果禁用检查点功能,则零件文件将永远处于
documentation of StreamingFileSink
或in-progress
状态,并且下游系统无法安全地读取它们。
要启用,只需使用
pending
env.enableCheckpointing(1000);
,您有很多选择。
这里是完整的tweak
example