我正在像这样的简单数据集上测试NTILE函数:
((id:字符串,值:两倍)A 10B 3C 44天4号F 30C 30D 10A 4高4
针对HIVE运行以下查询(在MapReduce上)
SELECT tmp.id, tmp.sum_val, NTILE(4) OVER (ORDER BY tmp.sum_val) AS quartile FROM (SELECT id, sum(value) AS sum_val FROM testntile GROUP BY id) AS tmp
可以正常使用以下结果:
(id,sum_val,四分位)B 3 1高4 1E 4 2D 14 2A 14 3F 30 3C 34 4
对Hive on Spark(v 1.5)运行相同的查询仍然可以正常工作。
针对Spark SQL 1.5(CDH 5.5.1)运行相同的查询
val result = sqlContext.sql("SELECT tmp.id, tmp.sum_val, NTILE(4) OVER (ORDER BY tmp.sum_val) AS quartile FROM (SELECT id, sum(value) AS sum_val FROM testntile GROUP BY id) AS tmp")
result.collect().foreach(println)
我得到以下错误结果:
[B,3.0,0][E,4.0,0][H,4.0,0][A,14.0,0][D,14.0,0][F,30.0,0][C,34.0,0]
重要:结果不确定,因为返回“有时”正确的值
直接在数据帧上运行相同的算法
val x = sqlContext.sql("select id, sum(value) as sum_val from testntile group by id")
val w = Window.partitionBy("id").orderBy("sum_val")
val resultDF = x.select( x("id"),x("sum_val"), ntile(4).over(w) )
仍然返回错误结果。
我做错什么了吗?有任何想法吗?预先感谢您的回答。
如果使用Window.partitionBy("id").orderBy("sum_val")
,则按id
进行分组,然后应用ntile
功能。因此,以这种方式,每个组都有一个元素,并且ntile
对每个id
应用相同的值。
为了获得第一个结果,您需要删除partitionBy("id")
并仅使用Window.orderBy("somma")
。
这是我修改您的代码的方式:
val w = Window.orderBy("sum_val")
val resultDF = x.orderBy("sum_val").select( x("id"),x("sum_val"), ntile(4).over(w) )
这是resultDF.show()
的打印:
+---+-------+-----+
| id|sum_val|ntile|
+---+-------+-----+
| B| 3| 1|
| E| 4| 1|
| H| 4| 2|
| D| 14| 2|
| A| 14| 3|
| F| 30| 3|
| C| 34| 4|
+---+-------+-----+