使用flink将hdfs中的kafka数据存储为拼花格式?

问题描述 投票:0回答:1

[使用flink将hdfs中的kafka数据存储为拼花格式,我正在尝试不起作用的fink文档。

我找不到任何适当的文档来将其存储为拼花文件

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        final ParameterTool params = ParameterTool.fromArgs(args);
        env.getConfig().setGlobalJobParameters(params);

        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", "localhost:9092");

        DataStream<String> kafkaData = env.addSource(new FlinkKafkaConsumer011("test", new SimpleStringSchema(), prop));
        DataStream<Tuple2<String, Integer>> counts = kafkaData.filter(new FilterFunction<String>() {
            public boolean filter(String value) throws Exception {
                return true;
            }
        }).map(new Tokenizer()) 
                .keyBy(0).sum(1); 
                counts.writeAsText("");
        env.execute("Streaming WordCount");

如果有人可以帮助您提供适当的文档或代码,请联系我们

apache-kafka apache-flink flink-streaming
1个回答
0
投票

写在documentation of StreamingFileSink上:

重要:使用StreamingFileSink时需要启用检查点。零件文件只能在成功的检查点上完成。如果禁用检查点功能,则零件文件将永远处于documentation of StreamingFileSinkin-progress状态,并且下游系统无法安全地读取它们。

要启用,只需使用

pending

env.enableCheckpointing(1000); ,您有很多选择。


这里是完整的tweak

example
© www.soinside.com 2019 - 2024. All rights reserved.