我正在尝试使用Flink SQL运行流式top-n查询,但无法获得“优化版本” outlined in the Flink docs。设置如下:
我有一个Kafka主题,其中每个记录都包含一个元组(GUID,达到分数,最大可能分数)。可以将它们想象为一个正在接受评估的学生,而元组则代表了他所获得的分数。
[我想得到的是五个GUID的列表,这些GUID的最高得分以百分比衡量(即按SUM(reached_score)/ SUM(最大可能得分)排序)。
我首先汇总分数并将其按GUID分组:
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);
Table scores = tableEnv.fromDataStream(/* stream from kafka */, "guid, reached_score, max_score");
tableEnv.registerTable("scores", scores);
Table aggregatedScores = tableEnv.sqlQuery(
"SELECT " +
" guid, " +
" SUM(reached_score) as reached_score, " +
" SUM(max_score) as max_score, " +
" SUM(reached_score) / CAST(SUM(max_score) AS DOUBLE) as score " +
"FROM scores " +
"GROUP BY guid");
tableEnv.registerTable("agg_scores", aggregatedScores);
结果表包含未排序的总分列表。然后,我尝试将其提供给Flink文档中使用的Top-N查询:
Table topN = tableEnv.sqlQuery(
"SELECT guid, reached_score, max_score, score, row_num " +
"FROM (" +
" SELECT *," +
" ROW_NUMBER() OVER (ORDER BY score DESC) as row_num" +
" FROM agg_scores)" +
"WHERE row_num <= 5");
tableEnv.toRetractStream(topN, Row.class).print();
运行此查询将按预期运行,并且如果元素的顺序更改,则会导致多个更新。
// add first entry
6> (true,63992935-9684-4285-8c2b-1fd57b51b48f,97,200,0.485,1)
// add a second entry with lower score below the first one
7> (true,d7847f58-a4d9-40f8-a38d-161821b48481,67,200,0.335,2)
// update the second entry with a much higher score
8> (false,d7847f58-a4d9-40f8-a38d-161821b48481,67,200,0.335,2)
1> (true,d7847f58-a4d9-40f8-a38d-161821b48481,229,400,0.5725,1)
3> (true,63992935-9684-4285-8c2b-1fd57b51b48f,97,200,0.485,2)
2> (false,63992935-9684-4285-8c2b-1fd57b51b48f,97,200,0.485,1)
然后我遵循了文档中的建议,并从投影中删除了row_number:
Table topN = tableEnv.sqlQuery(
"SELECT guid, reached_score, max_score, score " +
"FROM (" +
" SELECT *," +
" ROW_NUMBER() OVER (ORDER BY score DESC) as row_num" +
" FROM agg_scores)" +
"WHERE row_num <= 5");
运行类似的数据集:
// add first entry
4> (true,63992935-9684-4285-8c2b-1fd57b51b48f,112,200,0.56)
// add a second entry with lower score below the first one
5> (true,d7847f58-a4d9-40f8-a38d-161821b48481,76,200,0.38)
// update the second entry with a much higher score
7> (true,d7847f58-a4d9-40f8-a38d-161821b48481,354,400,0.885)
1> (true,63992935-9684-4285-8c2b-1fd57b51b48f,112,200,0.56) <-- ???
8> (false,63992935-9684-4285-8c2b-1fd57b51b48f,112,200,0.56) <-- ???
6> (false,d7847f58-a4d9-40f8-a38d-161821b48481,76,200,0.38)
我不明白的是:
63992935-9684-4285-8c2b-1fd57b51b48f
)被删除并再次添加/仍然被触摸两者显然都与排序顺序的变化有关,但这不是优化的top-n查询(written further down in the documentation)应该解决的吗?
我已经检查了这个问题,也可以在我的本地环境中复制。我也做了一些调查,其原因是:
“在某些情况下我们没有进行这种优化,您的情况似乎就是其中一种。”
但是,根据用户文档,我认为在您的方案中也包括这种优化是正确的要求。对我来说,这似乎是一个错误,我们声称进行了一些优化,但没有解决。
我创建了一个问题:https://issues.apache.org/jira/browse/FLINK-15497来跟踪此问题,希望我们可以在以后的1.9.2和1.10.0版本中对其进行修复。
感谢您举报。