我正在使用 PySpark,需要连接两个基于城市的数据集和场地名称的模糊匹配条件。第一个数据集包含有关体育场馆的信息,包括唯一的venue_id,而我定期收到的第二个数据集仅包含场馆名称和城市,没有venue_id。
我想加入这些数据集,使用模糊逻辑将传入数据中的venue_name与现有数据集匹配(因为名称并不总是写成相同的),然后拉取相应的venue_id。
现有数据集(df_stadium_information):
场地名称 | 城市 | venue_id |
---|---|---|
Sree Kanteerava 体育场 | 班加罗尔 | 1 |
Sree Kanteerava 体育场 | 高知 | 2 |
伊甸花园 | 加尔各答 | 3 |
纳伦德拉·莫迪体育场 | 艾哈迈达巴德 | 4 |
传入数据(df_new_stadium_data):
场地名称 | 城市 |
---|---|
斯里坎提拉瓦室内体育场 | 班加罗尔 |
伊甸花园 | 加尔各答 |
所需输出:
场地名称 | 城市 | venue_id |
---|---|---|
斯里坎提拉瓦室内体育场 | 班加罗尔 | 1 |
伊甸花园 | 加尔各答 | 空 |
如果venue_name 存在模糊匹配且city 存在精确匹配,我希望输出显示 df_stadium_information 中的venue_id。如果没有模糊匹配,则venue_id应该为空。
您想要的示例不正确,因为加尔各答有一场 Eden Gardens 比赛,场地 ID 为 3。
首先我们创建虚拟表。
%sql
CREATE OR REPLACE TABLE ref_tbl (
venue_name STRING,
city STRING,
venue_id INT
);
INSERT INTO ref_tbl VALUES
('Sree Kanteerava Stadium','Bengaluru',1),
('Sree Kanteerava Stads','Bengaluru',2),
('Sree Kanteerava Stadium','Kochi',3),
('Eden Gardens','Kolkata',4),
('Narendra Modi Stadium','Ahmedabad',5),
('Sree Kanteerava Stadium','Bangalore',6);
CREATE OR REPLACE TABLE IN_TABLE (
venue_name STRING,
city STRING
);
INSERT INTO IN_TABLE VALUES
('Sri Kanteerava Indoor Stadium','Kolkata'),
('Eden Garden','Kolkata'),
('Sri Kanteerava Stadium','Bengaluru');
安装名为rapidfuzz的python包
pip install rapidfuzz
然后将其注册为spark udf函数
from rapidfuzz import fuzz
from pyspark.sql.functions import udf
import pyspark.sql.functions as F
from pyspark.sql.types import StringType,IntegerType,DoubleType
def fuzzyrapid(s1, s2):
return (fuzz.token_sort_ratio(s1,s2))
spark.udf.register("fuzztest", fuzzyrapid,DoubleType())
选择场地名称和城市的不同组合。
%sql
-- Select distinct cities from the input file/table
CREATE OR REPLACE TEMPORARY VIEW v_distinct_input_names
AS
SELECT DISTINCT venue_name as input_name,city FROM IN_TABLE;
SELECT * FROM v_distinct_input_names;
进行交叉连接,以便获得同一城市的所有组合
%sql
-- cross join the distinct cities with the lookup table
CREATE OR REPLACE TEMPORARY VIEW v_crossjoin
AS
SELECT A.venue_name as ref_name, A.city as city, Input_name FROM ref_tbl A CROSS JOIN v_distinct_input_names B where A.city = B.city;
现在执行自定义 Spark udf 来获取相似度分数,并执行窗口函数以获得单个组合的最佳分数
%sql
CREATE OR REPLACE TEMPORARY VIEW vw_fuzzy1 AS
SELECT ref_name, Input_name, city, fuzztest(ref_name,Input_name) AS similarity_score FROM v_crossjoin;
CREATE OR REPLACE TEMPORARY VIEW v_fuzzy2 AS
SELECT ref_name, Input_name, city, similarity_score
FROM
(
SELECT ROW_NUMBER() OVER (PARTITION BY input_name ORDER BY similarity_score DESC) as RowNum, *
FROM vw_fuzzy1
)
WHERE RowNum = 1;
现在您可以进行连接来获取场地 ID。您需要为您的用例定义分数阈值。就我而言,我使用了 70。
%sql
SELECT input_name, B.venue_name, A.city,
case
when similarity_score > 70
then venue_id
else
null
end as venue_id
from v_fuzzy2 A left join ref_tbl B on A.ref_name = B.venue_name AND A.city=B.city;