我正在 Azure synapse 工作,并逐渐习惯使用 pyspark。我想在 df 中的行之间创建配对逻辑,但我无法让它工作。我有一个 ID 列和一个序列号。例如:
身份证 | 序列号 |
---|---|
100 | 3609 |
100 | 3610 |
100 | 3616 |
100 | 3617 |
100 | 3622 |
100 | 3623 |
100 | 3634 |
100 | 3642 |
100 | 3643 |
这就是代码应该输出的内容:
身份证 | 序列号 | 配对ID |
---|---|---|
100 | 3609 | 1 |
100 | 3610 | 1 |
100 | 3616 | 2 |
100 | 3617 | 2 |
100 | 3622 | 3 |
100 | 3623 | 3 |
100 | 3634 | 空 |
100 | 3642 | 4 |
100 | 3643 | 4 |
与 3634 的线路不应配对,因为序列号之间的差异应该是 1。
我的Python逻辑似乎可以工作,但是我无法利用spark的处理能力。有人可以帮我在 pyspark 中创建逻辑吗?
# window specification
windowSpec = Window.orderBy("seqNum")
# Add prev and next sequence numbers
df = df.withColumn("prev_seq", lag("seqNum").over(windowSpec))
df = df.withColumn("next_seq", lead("seqNum").over(windowSpec))
# Add flags to indicate proximity
df = df.withColumn("diff_prev", col("ID") - col("prev_seq"))
df = df.withColumn("diff_next", col("next_seq") - col("seqNum"))
#make PairID
df = df.withColumn("PairID", lit(None).cast("int"))
# Assign PairID based on proximity logic
pair_id = 1
rows = df.collect() # Collect rows for iterative processing
paired_indices = set() # Track already paired rows
result = []
for i, row in enumerate(rows):
if i in paired_indices:
continue # Skip already paired rows
current = row["seqNum"]
prev_diff = row["diff_prev"]
next_diff = row["diff_next"]
# Pair with the row above if diff_prev == 1 and it is not already paired
if prev_diff == 1 and (i - 1) not in paired_indices:
result.append((current, pair_id, rows[i - 1]["seqNum"]))
result.append((rows[i - 1]["seqNum"], pair_id, current))
paired_indices.update([i, i - 1])
pair_id += 1
# Pair with the row below if diff_next == 1 and it is not already paired
elif next_diff == 1 and (i + 1) not in paired_indices:
result.append((current, pair_id, rows[i + 1]["seqNum"]))
result.append((rows[i + 1]["seqNum"], pair_id, current))
paired_indices.update([i, i + 1])
pair_id += 1
else:
result.append((current, None, None))
# to DataFrame
result_df = spark.createDataFrame(result, ["seqNum", "PairID", "Closest"])
如果您只需要 2 个数字的配对 ID,您可以使用以下代码。
但是在这里你不会得到配对 ID 的正确顺序,但它们会与一个数字配对。
df.withColumn("prev_seq", lag("seqNum").over(windowSpec))\
.withColumn("next_seq", lead("seqNum").over(windowSpec))\
.withColumn("pair_with_prev", when((col("seqNum") - col("prev_seq")) == 1, lit(1)).otherwise(lit(0)))\
.withColumn("pair_with_next", when((col("next_seq") - col("seqNum")) == 1, lit(1)).otherwise(lit(0)))\
.withColumn("cond1",when((col("pair_with_prev") == 0) & (col("pair_with_next") == 1), randn()).otherwise(lit(0)))\
.withColumn("cond2",when((col("pair_with_prev") == 1) & (col("pair_with_next") == 0), lag("cond1").over(windowSpec)).otherwise(lit(0)))\
.withColumn("all_cond",col("cond1")+col("cond2"))\
.withColumn("res",dense_rank().over(Window.partitionBy("ID").orderBy("all_cond")))\
.withColumn("res",when(col("all_cond") == 0, lit(None)).otherwise(col("res")))\
.select("ID","seqNum","res")\
.display()
在这里我检查前一个和下一个配对的条件。
如果它没有前一对并且有下一对,则找到新对,我生成随机值。
如果有前一对而不是下一个,那么那就是该对的末尾,我分配了前一对随机值。
接下来,我在上述两个条件下生成了密集排名,并将没有对的 id 设置为空。
输出:
身份证 | 序列号 | 资源 |
---|---|---|
100 | 3634 | 空 |
100 | 3616 | 2 |
100 | 3617 | 2 |
100 | 3609 | 3 |
100 | 3610 | 3 |
100 | 3622 | 4 |
100 | 3623 | 4 |
100 | 3642 | 5 |
100 | 3643 | 5 |