我有那 2 个数据框:
df1:
+---+----------+----------+
|id |id_special|date_1 |
+---+----------+----------+
|1 |101 |2024-11-01|
|2 |102 |2024-11-03|
|3 |103 |2024-11-04|
|4 |null |2024-11-05|
+---+----------+----------+
df2:
+----------+----------+------+
|id_special|date_2 |type |
+----------+----------+------+
|101 |2024-10-30|Type_1|
|101 |2024-10-31|Type_2|
|101 |2024-11-01|Type_3|
|102 |2024-11-03|Type_4|
+----------+----------+------+
我的目标是在
df2_type
中创建一个名为 df1
的新列。
为此,我需要在 df1
和 df2
之间进行特殊连接。
以下是创建列的规则 df2_type
。
所以,从珍贵的DataFrames来看,这就是我期待的结果:
+---+----------+----------+--------+
|id |id_special|date_1 |df2_type|
+---+----------+----------+--------+
|1 |101 |2024-11-01|Type_2 |
|2 |102 |2024-11-03|Unknown |
|3 |103 |2024-11-04|Unknown |
|4 |null |2024-11-05|Unknown |
+---+----------+----------+--------+
我尝试在我的 2 个 DataFrame 之间进行连接,但我始终无法正确连接它。 这是我的代码:
from awsglue.context import GlueContext
from datetime import date
from pyspark.context import SparkContext
from pyspark.sql.functions import lit
from pyspark.sql.types import DateType, IntegerType, StringType, StructField, StructType
glueContext = GlueContext(SparkContext.getOrCreate())
data1 = [
(1, 101, date.fromisoformat("2024-11-01")),
(2, 102, date.fromisoformat("2024-11-03")),
(3, 103, date.fromisoformat("2024-11-04")),
(4, None, date.fromisoformat("2024-11-05")),
]
data2 = [
(101, date.fromisoformat("2024-10-30"), "Type_1"),
(101, date.fromisoformat("2024-10-31"), "Type_2"),
(101, date.fromisoformat("2024-11-01"), "Type_3"),
(102, date.fromisoformat("2024-11-03"), "Type_4"),
]
schema1 = StructType([
StructField("id", IntegerType(), True), # Unique key
StructField("id_special", IntegerType(), True),
StructField("date_1", DateType(), True),
])
schema2 = StructType([
StructField("id_special", IntegerType(), True),
StructField("date_2", DateType(), True),
StructField("type", StringType(), True),
])
df1 = spark.createDataFrame(data1, schema1)
df2 = spark.createDataFrame(data2, schema2)
# Step 1 - Add df2_type columns
df1 = df1.withColumn("df2_type", lit(None))
# The final DataFrame need to be like this
# +---+----------+----------+--------+
# |id |id_special|date_1 |df2_type|
# +---+----------+----------+--------+
# |1 |101 |2024-11-01|Type_2 |
# |2 |102 |2024-11-03|Unknown |
# |3 |103 |2024-11-04|Unknown |
# |4 |null |2024-11-05|Unknown |
# +---+----------+----------+--------+
第 1 步:按
df2
对 id_special
进行分组,并将所有行收集到结构列表中。
df2grouped = df2.groupBy('id_special') \
.agg(F.collect_list(F.struct('date_2', 'type')).alias('data'))
第 2 步:在
df1
和 df2grouped
之间运行左外连接。从 df2grouped
开始对数组进行排序,过滤掉不匹配的日期,然后从剩余数组的第一个条目中取出 type
。
df1.join(df2grouped, on = 'id_special', how = 'left') \
.withColumn('data', F.expr('array_sort(data, (l,r) -> datediff(r.date_2, l.date_2))')) \
.withColumn('data', F.expr('filter(data, x -> x.date_2 < date_1)')) \
.withColumn('df2_type', F.expr('data[0].type')) \
.drop('data') \
.na.fill('Unknown')