我正在尝试获取一个示例,说明如何将其沉入一个已弃用的文件中。 flink 文档没有帮助,因为我得到的所有内容都已弃用。
public static void main( String[] args )
{
try
{
// Set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Define the input data path
DataStream<String> text = env.fromElements("Nathan", "Noah", "Olivia", "Emily", "Nathaniel");
DataStream < String > filtered = text.filter(new FilterFunction < String > ()
{
public boolean filter(String value) {
return value.startsWith("N");
}
});
DataStream < Tuple2 < String, Integer >> tokenized = filtered.map(new Tokenizer());
DataStream < Tuple2 < String, Integer >> counts = tokenized
.keyBy(value -> value.f0)
.sum(1);
DataStream<String> result = counts.map(new MapFunction<Tuple2<String, Integer>, String>() {
@Override
public String map(Tuple2<String, Integer> value) {
return value.f0 + ": " + value.f1;
}
});
//result.print();
result.writeAsText("/Users/T/Repos/FlinkPOC/src/main/java/com/poc/flink/Output.txt");
// Execute the Flink job
env.execute("Word Count Example with datastreams");
}
catch (Exception e) {
// Print stack trace to diagnose errors
e.printStackTrace();
}
}
在此代码片段中, result.writeAsText 和 env.fromElements 已弃用。我使用的是 Flink 1.20.0
fromElements 的替代品是 fromData ——有关详细信息,请参阅 docs,但是
env.fromData("Nathan", "Noah", "Olivia", "Emily", "Nathaniel");
应该完成工作。
对于接收器,您需要使用FileSink。这不是 writeAsText 的直接替代品。 writeAsText 不是一个生产就绪的接收器——它只适合小规模测试——我们不想将它带入 Flink 2.0。请参阅上面链接的文档作为示例,但您需要执行以下操作:
final FileSink<String> sink = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(15))
.withInactivityInterval(Duration.ofMinutes(5))
.withMaxPartSize(MemorySize.ofMebiBytes(1024))
.build())
.build();
result.sinkTo(sink);
但是,您需要调整格式、展期策略等以满足您的需求。