在 Pyspark 中创建基于时间的功能

问题描述 投票:0回答:1

我有一个功能表 Pyspark DF 每天通过管道创建。现在的要求是为每个特征创建基于时间的特征,其中每个 t-1 到 t-30(t=时间)特征捕获前一天的值。

例如

6 月 1 日输入表

A 校Count_n'A'10'B'12
因此将开始日期视为 6 月 1 日

我的具有基于时间的特征的特征表如下所示:

A 校Count_ncount_n_t-1count_n_t-2count_n_t-ncount_n_t-30'A'100000'B'120000
由于这是开始日期,我已将所有功能初始化为 0。

次日输入数据 -

6 月 2 日

A 校Count_n'A'17'B'15
现在具有基于时间的特征的特征表将类似于 6 月 2 日:

A 校Count_ncount_n_t-1count_n_t-2count_n_t-ncount_n_t-30'A'1710000'B'1512000
同样适用于

6 月 3 日

输入数据

A 校Count_n'A'21'B'35
功能表:

A 校Count_ncount_n_t-1count_n_t-2count_n_t-ncount_n_t-30'A'21171000'B'35151200
如果我们观察到我们正在根据前一天的值根据其各自的组(COL A)转移特征。同样,不同的领域也会有类似的t-n特征。但 t-1 到 t-30 是我们正在创建的一组不变的特征。

有人可以建议我一种方法,如何使用

pyspark 以最有效的方式进行操作。

注意: 如果问题的解释不清楚,请告诉我。我会尝试再次澄清它。

谢谢

我还没有开始使用这种方法,但我最初的想法是首先将当前日期表与前一天的表连接到

COL A 上以获得 t-1 到 t-n 特征,然后在 COL A 上使用 groupby 和应用 Pandas_udf 函数 df.groupby('A).apply(custom_udf_function)

在这个 UDF 中,我很难编写正确的方法。

python-3.x pyspark feature-engineering pandas-udf
1个回答
0
投票
您的问题似乎不需要任何聚合。如果我理解正确并且您每天都有单独的数据框并且它们被命名/您知道如何获取它们,那么您只需要在

COL A

 上进行内部联接

from pyspark.sql import functions as F def get_df_name(date): # Logic for getting your df names based on date df = ... # get your initial df tables = set(table.name for table in session.catalog.listTables()) for i in range(1, 31): date_of_df = date.today() - timedelta(days=i) df_name = get_df_name(date_of_df) if df_name in tables: right_df = session.table(df_name) right_df = right_df.withColumnRenamed('Count_n', f'count_n_t-{i}') df = df.join(right_df, 'COL A', 'left') else: df = df.withColumn(f'count_n_t-{i}', F.lit(0)) df = df.fillna(0)
    
© www.soinside.com 2019 - 2024. All rights reserved.