如何下沉到文件的示例 Flink 1.20

问题描述 投票:0回答:1

我正在尝试获取一个示例,说明如何将其沉入一个已弃用的文件中。 flink 文档没有帮助,因为我得到的所有内容都已弃用。

 public static void main( String[] args )
{
    try 
    {
        // Set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Define the input data path

        DataStream<String> text = env.fromElements("Nathan", "Noah", "Olivia", "Emily", "Nathaniel");

        DataStream < String > filtered = text.filter(new FilterFunction < String > ()
            {
                public boolean filter(String value) {
                return value.startsWith("N");
            }
        });
        DataStream < Tuple2 < String, Integer >> tokenized = filtered.map(new Tokenizer());

        DataStream < Tuple2 < String, Integer >> counts = tokenized
            .keyBy(value -> value.f0)
            .sum(1);

        DataStream<String> result = counts.map(new MapFunction<Tuple2<String, Integer>, String>() {
            @Override
            public String map(Tuple2<String, Integer> value) {
                return value.f0 + ": " + value.f1;
            }
        });

        //result.print();

        result.writeAsText("/Users/T/Repos/FlinkPOC/src/main/java/com/poc/flink/Output.txt");

        
    
        // Execute the Flink job
        env.execute("Word Count Example with datastreams");
    } 
    catch (Exception e) {
         // Print stack trace to diagnose errors
         e.printStackTrace();
    }
}

在此代码片段中, result.writeAsText 和 env.fromElements 已弃用。我使用的是 Flink 1.20.0

java apache-flink
1个回答
0
投票

fromElements 的替代品是 fromData ——有关详细信息,请参阅 docs,但是

env.fromData("Nathan", "Noah", "Olivia", "Emily", "Nathaniel");

应该完成工作。

对于接收器,您需要使用FileSink。这不是 writeAsText 的直接替代品。 writeAsText 不是一个生产就绪的接收器——它只适合小规模测试——我们不想将它带入 Flink 2.0。请参阅上面链接的文档作为示例,但您需要执行以下操作:

final FileSink<String> sink = FileSink
    .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
    .withRollingPolicy(
        DefaultRollingPolicy.builder()
            .withRolloverInterval(Duration.ofMinutes(15))
            .withInactivityInterval(Duration.ofMinutes(5))
            .withMaxPartSize(MemorySize.ofMebiBytes(1024))
            .build())
    .build();

result.sinkTo(sink);

但是,您需要调整格式、展期策略等以满足您的需求。

© www.soinside.com 2019 - 2024. All rights reserved.