如何在特定条件下连接 2 个 DataFrame?

问题描述 投票:0回答:1

我有那 2 个数据框:

df1:
+---+----------+----------+
|id |id_special|date_1    |
+---+----------+----------+
|1  |101       |2024-11-01|
|2  |102       |2024-11-03|
|3  |103       |2024-11-04|
|4  |null      |2024-11-05|
+---+----------+----------+
df2:
+----------+----------+------+
|id_special|date_2    |type  |
+----------+----------+------+
|101       |2024-10-30|Type_1|
|101       |2024-10-31|Type_2|
|101       |2024-11-01|Type_3|
|102       |2024-11-03|Type_4|
+----------+----------+------+

我的目标是在

df2_type
中创建一个名为
df1
的新列。 为此,我需要在
df1
df2
之间进行特殊连接。 以下是创建列的规则
df2_type

  1. 如果 df1.id_special 为 null,则将 df1.df2_type 设置为“Unknown”。
  2. 如果 df1.id_special 不在 df2.id_client 中,则将 df1.df2_type 设置为“Unknown”。
  3. 如果 df1.id_special 位于 df2.id_client 中:
    1. 获取df2.date_2所在的记录< df1.date_1 and is the closest to df1.date_1
    2. 从记录中,使用df2.type设置df2_type。

所以,从珍贵的DataFrames来看,这就是我期待的结果:

+---+----------+----------+--------+
|id |id_special|date_1    |df2_type|
+---+----------+----------+--------+
|1  |101       |2024-11-01|Type_2  |
|2  |102       |2024-11-03|Unknown |
|3  |103       |2024-11-04|Unknown |
|4  |null      |2024-11-05|Unknown |
+---+----------+----------+--------+

我尝试在我的 2 个 DataFrame 之间进行连接,但我始终无法正确连接它。 这是我的代码:

from awsglue.context import GlueContext
from datetime import date
from pyspark.context import SparkContext
from pyspark.sql.functions import lit
from pyspark.sql.types import DateType, IntegerType, StringType, StructField, StructType

glueContext = GlueContext(SparkContext.getOrCreate())

data1 = [
    (1, 101, date.fromisoformat("2024-11-01")),
    (2, 102, date.fromisoformat("2024-11-03")),
    (3, 103, date.fromisoformat("2024-11-04")),
    (4, None, date.fromisoformat("2024-11-05")),
]

data2 = [
    (101, date.fromisoformat("2024-10-30"), "Type_1"),
    (101, date.fromisoformat("2024-10-31"), "Type_2"),
    (101, date.fromisoformat("2024-11-01"), "Type_3"),
    (102, date.fromisoformat("2024-11-03"), "Type_4"),
]

schema1 = StructType([
   StructField("id", IntegerType(), True), # Unique key
   StructField("id_special", IntegerType(), True),
   StructField("date_1", DateType(), True),
])

schema2 = StructType([
   StructField("id_special", IntegerType(), True),
   StructField("date_2", DateType(), True),
   StructField("type", StringType(), True),
])


df1 = spark.createDataFrame(data1, schema1)
df2 = spark.createDataFrame(data2, schema2)


# Step 1 - Add df2_type columns
df1 = df1.withColumn("df2_type", lit(None))


# The final DataFrame need to be like this
# +---+----------+----------+--------+
# |id |id_special|date_1    |df2_type|
# +---+----------+----------+--------+
# |1  |101       |2024-11-01|Type_2  |
# |2  |102       |2024-11-03|Unknown |
# |3  |103       |2024-11-04|Unknown |
# |4  |null      |2024-11-05|Unknown |
# +---+----------+----------+--------+
python pyspark
1个回答
0
投票

第 1 步:按

df2
id_special
进行分组,并将所有行收集到结构列表中。

df2grouped = df2.groupBy('id_special') \
    .agg(F.collect_list(F.struct('date_2', 'type')).alias('data'))

第 2 步:在

df1
df2grouped
之间运行左外连接。从
df2grouped
开始对数组进行排序,过滤掉不匹配的日期,然后从剩余数组的第一个条目中取出
type

df1.join(df2grouped, on = 'id_special', how = 'left') \
    .withColumn('data', F.expr('array_sort(data, (l,r) -> datediff(r.date_2, l.date_2))')) \
    .withColumn('data', F.expr('filter(data, x -> x.date_2 < date_1)')) \
    .withColumn('df2_type', F.expr('data[0].type')) \
    .drop('data') \
    .na.fill('Unknown')
© www.soinside.com 2019 - 2024. All rights reserved.