我有一个功能表 Pyspark DF 每天通过管道创建。现在的要求是为每个特征创建基于时间的特征,其中每个 t-1 到 t-30(t=时间)特征捕获前一天的值。
例如
6 月 1 日输入表
我的具有基于时间的特征的特征表如下所示:
次日输入数据 -
6 月 2 日
6 月 3 日
输入数据
有人可以建议我一种方法,如何使用
pyspark 以最有效的方式进行操作。
注意: 如果问题的解释不清楚,请告诉我。我会尝试再次澄清它。
谢谢我还没有开始使用这种方法,但我最初的想法是首先将当前日期表与前一天的表连接到
COL A 上以获得 t-1 到 t-n 特征,然后在 COL A 上使用 groupby 和应用 Pandas_udf 函数 df.groupby('A).apply(custom_udf_function)
。在这个 UDF 中,我很难编写正确的方法。
COL A
from pyspark.sql import functions as F
def get_df_name(date):
# Logic for getting your df names based on date
df = ... # get your initial df
tables = set(table.name for table in session.catalog.listTables())
for i in range(1, 31):
date_of_df = date.today() - timedelta(days=i)
df_name = get_df_name(date_of_df)
if df_name in tables:
right_df = session.table(df_name)
right_df = right_df.withColumnRenamed('Count_n', f'count_n_t-{i}')
df = df.join(right_df, 'COL A', 'left')
else:
df = df.withColumn(f'count_n_t-{i}', F.lit(0))
df = df.fillna(0)