如何使用Spark识别数据集中的属性级别重复项

问题描述 投票:0回答:1

下表具有两行相同,但是数据捕获已更改了firstname和secondname列。但是,这些行是重复的,并且输出数据框/数据集应该只有一行。我们如何使用Spark过滤大型数据集的重复项?任何指针都将非常有用。

database apache-spark bigdata filtering data-analysis
1个回答
0
投票

%python

创建示例数据框:

from pyspark.sql import functions as F
from pyspark.sql.window import Window
list=[[1,'ABC','XYZ',10000],
      [2,'XYZ','ABC',10000],
      [3,'Me','You',25000],
      [4,'Am','I',35000],
      [5,'You','Me',25000],
      [6,'I','Am',35000]]
df= spark.createDataFrame(list,['EmployeeID','FirstName','SecondName','Salary'])
df.show()

+----------+---------+----------+------+
|EmployeeID|FirstName|SecondName|Salary|
+----------+---------+----------+------+
|         1|      ABC|       XYZ| 10000|
|         2|      XYZ|       ABC| 10000|
|         3|       Me|       You| 25000|
|         4|       Am|         I| 35000|
|         5|      You|        Me| 25000|
|         6|        I|        Am| 35000|
+----------+---------+----------+------+

使用spark内置函数(将在spark1.5及更高版本中使用:]

-concat字符串中的first_name,last_name,salip与分隔符',然后在该分隔符上拆分以创建数组,sort_array以相同的方式对其进行排序,然后创建窗口以获取重复行的row_number,然后对row_number进行过滤1仅获得第一个。如果有相似的名称属于同一分区,也可以使用“薪金”。

-当窗口被分割时,将以分布式方式工作

w=Window().partitionBy("concat").orderBy("concat")
df.withColumn("concat", F.sort_array(F.split(F.concat_ws(',',F.col("FirstName"),F.col("SecondName"),F.col("Salary").cast("string")),',')))\
.withColumn("rownum", F.row_number().over(w)).filter(F.col("rownum")==1).drop("concat","rownum").show()



+----------+---------+----------+------+
|EmployeeID|FirstName|SecondName|Salary|
+----------+---------+----------+------+
|         3|       Me|       You| 25000|
|         4|       Am|         I| 35000|
|         1|      ABC|       XYZ| 10000|
+----------+---------+----------+------+
© www.soinside.com 2019 - 2024. All rights reserved.