基于城市的场馆名称模糊连接

问题描述 投票:0回答:1

我正在使用 PySpark,需要连接两个基于城市的数据集和场地名称的模糊匹配条件。第一个数据集包含有关体育场馆的信息,包括唯一的venue_id,而我定期收到的第二个数据集仅包含场馆名称和城市,没有venue_id。

我想加入这些数据集,使用模糊逻辑将传入数据中的venue_name与现有数据集匹配(因为名称并不总是写成相同的),然后拉取相应的venue_id。

现有数据集(df_stadium_information):

场地名称 城市 venue_id
Sree Kanteerava 体育场 班加罗尔 1
Sree Kanteerava 体育场 高知 2
伊甸花园 加尔各答 3
纳伦德拉·莫迪体育场 艾哈迈达巴德 4

传入数据(df_new_stadium_data):

场地名称 城市
斯里坎提拉瓦室内体育场 班加罗尔
伊甸花园 加尔各答

所需输出:

场地名称 城市 venue_id
斯里坎提拉瓦室内体育场 班加罗尔 1
伊甸花园 加尔各答

如果venue_name 存在模糊匹配且city 存在精确匹配,我希望输出显示 df_stadium_information 中的venue_id。如果没有模糊匹配,则venue_id应该为空。

python sql pyspark apache-spark-sql
1个回答
0
投票

您想要的示例不正确,因为加尔各答有一场 Eden Gardens 比赛,场地 ID 为 3。

首先我们创建虚拟表。

%sql
CREATE OR REPLACE TABLE ref_tbl (
    venue_name STRING,
    city STRING,
    venue_id INT
  );

INSERT INTO ref_tbl VALUES
('Sree Kanteerava Stadium','Bengaluru',1),
('Sree Kanteerava Stads','Bengaluru',2),
('Sree Kanteerava Stadium','Kochi',3),
('Eden Gardens','Kolkata',4),
('Narendra Modi Stadium','Ahmedabad',5),
('Sree Kanteerava Stadium','Bangalore',6);

CREATE OR REPLACE TABLE IN_TABLE (
    venue_name STRING,
    city STRING
  );

INSERT INTO IN_TABLE VALUES
('Sri Kanteerava Indoor Stadium','Kolkata'),
('Eden Garden','Kolkata'),
('Sri Kanteerava Stadium','Bengaluru');

安装名为rapidfuzz的python包

pip install rapidfuzz

然后将其注册为spark udf函数

from rapidfuzz import fuzz
from pyspark.sql.functions import udf
import pyspark.sql.functions as F
from pyspark.sql.types import StringType,IntegerType,DoubleType
    
def fuzzyrapid(s1, s2):
    return (fuzz.token_sort_ratio(s1,s2))

spark.udf.register("fuzztest",  fuzzyrapid,DoubleType())

选择场地名称和城市的不同组合。

%sql
-- Select distinct cities from the input file/table
CREATE OR REPLACE TEMPORARY VIEW v_distinct_input_names 
AS 
SELECT DISTINCT venue_name as input_name,city FROM IN_TABLE;

SELECT * FROM v_distinct_input_names;

进行交叉连接,以便获得同一城市的所有组合

%sql
-- cross join the distinct cities with the lookup table 
CREATE OR REPLACE TEMPORARY VIEW v_crossjoin 
AS 
SELECT A.venue_name as ref_name,  A.city as city, Input_name FROM ref_tbl A CROSS JOIN v_distinct_input_names B where A.city = B.city;

现在执行自定义 Spark udf 来获取相似度分数,并执行窗口函数以获得单个组合的最佳分数

%sql
CREATE OR REPLACE TEMPORARY VIEW vw_fuzzy1 AS
SELECT ref_name, Input_name, city, fuzztest(ref_name,Input_name) AS similarity_score FROM v_crossjoin;

CREATE OR REPLACE TEMPORARY VIEW v_fuzzy2 AS 
 SELECT ref_name, Input_name, city, similarity_score
 FROM
 (
     SELECT ROW_NUMBER() OVER (PARTITION BY input_name ORDER BY similarity_score DESC) as RowNum, *
     FROM vw_fuzzy1
 ) 
 WHERE RowNum = 1;

现在您可以进行连接来获取场地 ID。您需要为您的用例定义分数阈值。就我而言,我使用了 70。

%sql
SELECT input_name, B.venue_name, A.city,
case
when similarity_score > 70
  then venue_id
else
  null
end as venue_id
from v_fuzzy2 A left join ref_tbl B on A.ref_name = B.venue_name AND A.city=B.city;

你的结果将是 enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.