我试着设置最大并行的弗林克工作,使用ExecutionConfig.setMaxParallelism()
方法,但它似乎没有工作。
我还修改了标准字计数例如运行一些测试,它似乎setMaxParallelism()
方法不会对无论是本地环境或独立的集群任何影响。
如何setMaxParallelism()
工作?
弗林克提供两种设置:
setParallelism(x)
设置作业或操作者的并行到x
,即并行任务为运营商的数目。setMaxParallelism(y)
控制可以被分发给哪个键控状态的任务的最大数目,即,操作者的最大有效的并行性。运营商还可以有更多的任务,但只有y
他们将有键控状态分配和可用于处理。分发密钥的状态的单位被称为关键群体。该documentation更详细地解释概念。
我今天跑了几个测试,使用流而不是数据集。这一次,我看到setMaxParallelism的效果()。
public static void main(String[] args) throws Exception
{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setMaxParallelism(4); // <-- effect
DataStream<String> text = env.fromElements(WORDS);
DataStream<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).keyBy(0).sum(1);
counts.writeAsCsv("test.dat");
env.execute("WordCount Example");
}
有趣的错误客户端所看到的,
Caused by: org.apache.flink.runtime.JobException: Vertex Flat Map's parallelism (8) is higher than the max parallelism (4). Please lower the parallelism or increase the max parallelism.
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:188)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:830)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:232)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1152)
at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1132)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:294)
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:157)
... 10 more
谢谢