(Py) Spark - 在一段时间内按用户分组

问题描述 投票:0回答:2

我正在处理大量日志文件,我想将作业转移到 Spark,但我不知道如何像在 Pandas 中那样轻松地在基于事件的时间窗口上聚合事件。

这正是我想做的:

对于经历过某些事件的用户的日志文件(如下模拟),我想回溯到过去 7 天,并返回所有其他列的聚合。

这是在 Pandas 中。有什么想法如何将其移植到 PySpark 吗?

import pandas as pd
df = pd.DataFrame({'user_id':[1,1,1,2,2,2], 'event':[0,1,0,0,0,1], 'other':[12, 20, 16, 84, 11, 15] , 'event_date':['2015-01-01 00:02:43', '2015-01-04 00:02:03', '2015-01-10 00:12:26', '2015-01-01 00:02:43', '2015-01-06 00:02:43', '2015-01-012 18:10:09']})
df['event_date'] = pd.to_datetime(df['event_date'])
df

给予:

    event  event_date           other  user_id
0   0      2015-01-01 00:02:43  12     1
1   1      2015-01-04 00:02:03  20     1
2   0      2015-01-10 00:12:26  16     1
3   0      2015-01-01 00:02:43  84     2
4   0      2015-01-06 00:02:43  11     2
5   1      2015-01-12 18:10:09  15     2

我想按 user_id 对此 DataFrame 进行分组,然后从聚合中排除任何行,其中该行距“事件”已超过 7 天。

在 Pandas 中,就像这样:

def f(x):
    # Find event
    win = x.event == 1

    # Get the date when event === 1
    event_date = list(x[win]['event_date'])[0]

    # Construct the window
    min_date = event_date - pd.DateOffset(days=7) 

    # Set x to this specific date window
    x = x[(x.event_date > min_date) & (x.event_date <= event_date)]

    # Aggregate other
    x['other'] = x.other.sum()

    return x[win] #, x[z]])


df.groupby(by='user_id').apply(f).reset_index(drop=True)

给出所需的输出(每个用户一行,其中 event_date 对应于 event==1):

    event   event_date          other   user_id
0   1       2015-01-04 00:02:03 32      1
1   1       2015-01-12 18:10:09 26      2

有人知道从哪里开始在 Spark 中获得这个结果吗?

python apache-spark apache-spark-sql pyspark
2个回答
3
投票

相当 SQLish,但你可以这样做:

from pyspark.sql.functions import sum, col, udf
from pyspark.sql.types import BooleanType

# With raw SQL you can use datediff but it looks like it is not
# available as a function yet
def less_than_n_days(n):                                                       
    return udf(lambda dt1, dt2: 0 <= (dt1 - dt2).days < n, BooleanType())

# Select only events
events = df.where(df.event == 1).select(
        df.event_date.alias("evd"), df.user_id.alias("uid"))

(events
    .join(df, (events.uid == df.user_id) & (events.evd >= df.event_date))
    .where(less_than_n_days(7)(col("evd"), col("event_date")))
    .groupBy("evd", "user_id") 
    .agg(sum("other").alias("other"))
    .withColumnRenamed("evd", "event_date"))

不幸的是,我们不能在

less_than_n_days
子句中包含
join
,因为
udf
只能访问单个表中的列。由于它不适用于内置
datediff
,您可能更喜欢这样的原始 SQL:

df.registerTempTable("df")
events.registerTempTable("events")

sqlContext.sql("""
    SELECT evd AS event_date, user_id, SUM(other) AS other
    FROM df JOIN events ON
        df.user_id = events.uid AND
        datediff(evd, event_date) BETWEEN 0 AND 6
    GROUP by evd, user_id""")

0
投票

使用

rangeBetween()
函数的类似版本

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, max, sum, when, datediff

# Create a SparkSession
spark = SparkSession.builder.appName("Log Aggregation").getOrCreate()

# Read the DataFrame (replace with your actual data source)
df = spark.read.format("csv").option("header", "true").load("your_log_file.csv")

# Convert event_date to timestamp
df = df.withColumn("event_date", to_timestamp(col("event_date")))

# Identify event rows
df = df.withColumn("is_event", when(col("event") == 1, 1).otherwise(0))

# Calculate the latest event timestamp for each user
window_spec = Window.partitionBy("user_id").orderBy(col("event_date").desc())
df = df.withColumn("latest_event_date", max("event_date").over(window_spec))

# Create a time window of 7 days
window_spec = Window.partitionBy("user_id").orderBy(col("event_date")).rangeBetween(-7*24*60*60, 0)

# Aggregate data within the window
df = df.withColumn("other_sum", sum("other").over(window_spec))
df = df.filter(col("is_event") == 1)

# Select the desired columns
df = df.select("user_id", "event_date", "other_sum")

# Show the results
df.show()
© www.soinside.com 2019 - 2024. All rights reserved.