在 pyspark 中左连接两个排序的数据帧

问题描述 投票:0回答:1

我有两个按某一列排序的数据框,这也是连接键。

是否可以合并这两个数据帧,并在 O(n+m) 中返回一个已排序的数据帧?我不在乎它是否不是并行完成的。我的数据太多(40 亿 + 2000 万行),我无法使用简单的

events.join(t_status, on='t').sort('t')

apache-spark pyspark apache-spark-sql
1个回答
0
投票

您可以在线性时间内O(n+m)

合并已排序的
数据帧,如下所示:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Example dataframes sorted by "key" in ascending order
df1 = spark.createDataFrame([(1, "A"), (3, "C"), (5, "E")], ["key", "value"])
df2 = spark.createDataFrame([(2, "B"), (4, "D"), (6, "F")], ["key", "value"])
rdd1 = df1.rdd
rdd2 = df2.rdd

def merge_sorted_rdds(rdd1, rdd2):
    # Get iterators for both RDDs
    iter1 = rdd1.collect()
    iter2 = rdd2.collect()
    
    i, j = 0, 0
    merged_result = []
    
    # Merge the two sorted RDDs
    while i < len(iter1) and j < len(iter2):
        if iter1[i][0] < iter2[j][0]:
            merged_result.append(iter1[i])
            i += 1
        else:
            merged_result.append(iter2[j])
            j += 1
            
    while i < len(iter1):
        merged_result.append(iter1[i])
        i += 1
        
    while j < len(iter2):
        merged_result.append(iter2[j])
        j += 1
        
    return merged_result

merged_data = merge_sorted_rdds(rdd1, rdd2)
merged_df = spark.createDataFrame(merged_data, ["key", "value"])
merged_df.show()

# Output:
# +---+-----+
# |key|value|
# +---+-----+
# |  1|    A|
# |  2|    B|
# |  3|    C|
# |  4|    D|
# |  5|    E|
# |  6|    F|
# +---+-----+
© www.soinside.com 2019 - 2024. All rights reserved.