我是 PySpark/Databricks 的新手。我有一个关于根据每周或每月时间段连接一列列表的问题。这是我预期结果的代码。
dates = ['2023-04-01', '2023-04-02', '2023-04-03', '2023-04-04', '2023-04-05', '2023-04-06', '2023-04-07', '2023-04-08', '2023-04-09', '2023-04-10', '2023-04-11', '2023-04-12', '2023-04-13', '2023-04-14']
brands = [['bmw', 'vw'], ['chevy', 'buick'], ['nissan', 'lexus', 'email'], ['bmw', 'nissan', 'lexus'], ['bmw', 'vw', 'nissan', 'lexus'], ['bmw', 'vw', 'chevy'], ['chevy', 'bmw', 'buick'], ['bmw', 'vw'], ['chevy', 'nissan'], ['nissan', 'lexus', 'vw'], ['bmw', 'security', 'vw'], ['bmw', 'vw', 'nissan', 'lexus'], ['bmw', 'lexus', 'chevy'], ['chevy', 'bmw', 'buick']]
weights = [[0.99, 0.98], [0.97, 0.96], [0.95, 0.94, 0.93], [0.98, 0.96, 0.95], [0.97, 0.96, 0.95, 0.94], [0.975, 0.964, 0.952, 0.943], [0.98, 0.976, 0.967], [0.99, 0.987, 0.978],
[0.978, 0.975], [0.972, 0.963], [0.955, 0.942, 0.936], [0.982, 0.961, 0.952], [0.97, 0.96, 0.952, 0.94], [0.975, 0.964, 0.952, 0.943], [0.982, 0.976, 0.967], [0.992, 0.987, 0.978]]
df = spark.createDataFrame(zip(dates, brands, weights), ['date', 'brands', 'weight'])
df.show()
+----------+--------------------+--------------------+
| date| brands| weight|
+----------+--------------------+--------------------+
|2023-04-01| [bmw, vw]| [0.99, 0.98]|
|2023-04-02| [chevy, buick]| [0.97, 0.96]|
|2023-04-03|[nissan, lexus, e...| [0.95, 0.94, 0.93]|
|2023-04-04|[bmw, nissan, lexus]| [0.98, 0.96, 0.95]|
|2023-04-05|[bmw, vw, nissan,...|[0.97, 0.96, 0.95...|
|2023-04-06| [bmw, vw, chevy]|[0.975, 0.964, 0....|
|2023-04-07| [chevy, bmw, buick]|[0.98, 0.976, 0.967]|
|2023-04-08| [bmw, vw]|[0.99, 0.987, 0.978]|
|2023-04-09| [chevy, nissan]| [0.978, 0.975]|
|2023-04-10| [nissan, lexus, vw]| [0.972, 0.963]|
|2023-04-11| [bmw, security, vw]|[0.955, 0.942, 0....|
|2023-04-12|[bmw, vw, nissan,...|[0.982, 0.961, 0....|
|2023-04-13| [bmw, lexus, chevy]|[0.97, 0.96, 0.95...|
|2023-04-14| [chevy, bmw, buick]|[0.975, 0.964, 0....|
+----------+--------------------+--------------------+
df1 = df.withColumn("DateFormatted", to_timestamp(col("date"), "yyyy-MM-dd"))
df1.show()
+----------+--------------------+--------------------+-------------------+
| date| brands| weight| DateFormatted|
+----------+--------------------+--------------------+-------------------+
|2023-04-01| [bmw, vw]| [0.99, 0.98]|2023-04-01 00:00:00|
|2023-04-02| [chevy, buick]| [0.97, 0.96]|2023-04-02 00:00:00|
|2023-04-03|[nissan, lexus, e...| [0.95, 0.94, 0.93]|2023-04-03 00:00:00|
|2023-04-04|[bmw, nissan, lexus]| [0.98, 0.96, 0.95]|2023-04-04 00:00:00|
|2023-04-05|[bmw, vw, nissan,...|[0.97, 0.96, 0.95...|2023-04-05 00:00:00|
|2023-04-06| [bmw, vw, chevy]|[0.975, 0.964, 0....|2023-04-06 00:00:00|
|2023-04-07| [chevy, bmw, buick]|[0.98, 0.976, 0.967]|2023-04-07 00:00:00|
|2023-04-08| [bmw, vw]|[0.99, 0.987, 0.978]|2023-04-08 00:00:00|
|2023-04-09| [chevy, nissan]| [0.978, 0.975]|2023-04-09 00:00:00|
|2023-04-10| [nissan, lexus, vw]| [0.972, 0.963]|2023-04-10 00:00:00|
|2023-04-11| [bmw, security, vw]|[0.955, 0.942, 0....|2023-04-11 00:00:00|
|2023-04-12|[bmw, vw, nissan,...|[0.982, 0.961, 0....|2023-04-12 00:00:00|
|2023-04-13| [bmw, lexus, chevy]|[0.97, 0.96, 0.95...|2023-04-13 00:00:00|
|2023-04-14| [chevy, bmw, buick]|[0.975, 0.964, 0....|2023-04-14 00:00:00|
+----------+--------------------+--------------------+-------------------+
我将日期列转换为日期列以进行任何类型的时间戳相关聚合。现在,这就是我想要的:
我尝试了这些……但是,遇到了几个问题:
window(列,窗口持续时间,滑动持续时间,起始偏移量)
df2 = df1.groupBy(window(col("DateFormatted"), "1 week", "1 week", "64 hours")).agg(concat("brands") as "brands_concat").select("window.start", "window.end", "DateFormatted", "brands_concat").show()
SyntaxError: invalid syntax
File "<command-4423978228267630>", line 2
df2 = df1.groupBy(window(col("DateFormatted"), "1 week", "1 week", "64 hours")).agg(concat("brands") as "brands_concat").select("window.start", "window.end", "DateFormatted", "brands_concat").show()
^
SyntaxError: invalid syntax
另一个尝试:
import pyspark.sql.functions as f
df.groupBy("date").agg(f.concat_ws(",", f.collect_list("brands")).alias("brands")).show()
AnalysisException: cannot resolve 'concat_ws(',', collect_list(`brands`))' due to data type mismatch: argument 2 requires (array<string> or string) type, however, 'collect_list(`brands`)' is of array<array<string>> type.;
看起来 concat_ws 只连接字符串。不是列表。也许我需要使用某种 UDF 来执行此操作。所以,尝试了 array_join ..但是,不喜欢使用分组数据......
from pyspark.sql.functions import array_join
df2.withColumn("week_strt_day",date_sub(next_day(col("DateFormatted"),"sunday"),7)).groupBy("week_strt_day").apply(array_join("brands", ",").alias("brands")).orderBy("week_strt_day").show()
ValueError: Invalid udf: the udf argument must be a pandas_udf of type GROUPED_MAP.
from pyspark.sql import functions as F
df1 = df1.groupBy(F.window(F.col("DateFormatted"), "1 weeks").getItem("start").alias("week"))
df1 = df1.agg(
F.collect_list(F.col("brands")).alias("brands"),
F.collect_list(F.col("weight")).alias("weight"),
)
df1 = df1.select(
"week",
F.flatten(F.col("brands")).alias("brands"),
F.flatten(F.col("weight")).alias("weight"),
)
df1.show()
+-------------------+--------------------+--------------------+
| week| brands| weight|
+-------------------+--------------------+--------------------+
|2023-03-30 02:00:00|[bmw, vw, chevy, ...|[0.99, 0.98, 0.97...|
|2023-04-06 02:00:00|[chevy, bmw, buic...|[0.98, 0.976, 0.9...|
|2023-04-13 02:00:00| [chevy, bmw, buick]|[0.975, 0.964, 0....|
+-------------------+--------------------+--------------------+
注意: 我的结果有 2 小时的偏移,因为我在法国,因此,utc+2
对于 month agg,只需将 1. 更改为“1 month”而不是 week.
让我知道您对此解决方案的看法:
df = spark.createDataFrame([
('2023-04-01', ['bmw', 'vw'], [0.99, 0.98]),
('2023-04-02', ['bmw', 'chevy'], [0.96, 0.95]),
('2023-04-03', ['bmw', 'nissan', 'lexus'], [0.98, 0.96, 0.95]),
('2023-04-04', ['chevy', 'bmw', 'buick'], [0.98, 0.976, 0.967]),
('2023-04-05', ['nissan', 'lexus', 'vw'], [0.972, 0.963, 0.98]),
('2023-05-06', ['bmw', 'vw'], [0.99, 0.98]),
('2023-05-07', ['bmw', 'chevy'], [0.96, 0.95]),
], ['Date', 'Brands', 'Weights'])
df_grouped_weekly = (
df
.withColumn('DateFormatted', f.to_timestamp(f.col('Date'), 'yyyy-MM-dd'))
.withColumn('WeekOfYear', f.concat_ws('-', f.weekofyear(f.col('DateFormatted')), f.year(f.col('DateFormatted'))))
.groupBy('WeekOfYear')
.agg(
f.collect_list(f.col('Brands')).alias('GroupedBrands'),
f.collect_list(f.col('Weights')).alias('GroupedWeights'),
)
.withColumn('GroupedBrands', f.flatten(f.col('GroupedBrands')))
.withColumn('GroupedWeights', f.flatten(f.col('GroupedWeights')))
)
df_grouped_monthly = (
df
.withColumn('DateFormatted', f.to_timestamp(f.col('Date'), 'yyyy-MM-dd'))
.withColumn('Month', f.concat_ws('-', f.month(f.col('DateFormatted')), f.year(f.col('DateFormatted'))))
.groupBy('Month')
.agg(
f.collect_list(f.col('Brands')).alias('GroupedBrands'),
f.collect_list(f.col('Weights')).alias('GroupedWeights'),
)
.withColumn('GroupedBrands', f.flatten(f.col('GroupedBrands')))
.withColumn('GroupedWeights', f.flatten(f.col('GroupedWeights')))
)
df_grouped_weekly.show(truncate = False)
df_grouped_monthly.show(truncate = False)
输出为:
+----------+----------------------------------------------------------+----------------------------------------------------------+
|WeekOfYear|GroupedBrands |GroupedWeights |
+----------+----------------------------------------------------------+----------------------------------------------------------+
|18-2023 |[bmw, vw, bmw, chevy] |[0.99, 0.98, 0.96, 0.95] |
|14-2023 |[bmw, nissan, lexus, chevy, bmw, buick, nissan, lexus, vw]|[0.98, 0.96, 0.95, 0.98, 0.976, 0.967, 0.972, 0.963, 0.98]|
|13-2023 |[bmw, vw, bmw, chevy] |[0.99, 0.98, 0.96, 0.95] |
+----------+----------------------------------------------------------+----------------------------------------------------------+
+------+-------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|Month |GroupedBrands |GroupedWeights |
+------+-------------------------------------------------------------------------------+----------------------------------------------------------------------------------+
|4-2023|[bmw, vw, bmw, chevy, bmw, nissan, lexus, chevy, bmw, buick, nissan, lexus, vw]|[0.99, 0.98, 0.96, 0.95, 0.98, 0.96, 0.95, 0.98, 0.976, 0.967, 0.972, 0.963, 0.98]|
|5-2023|[bmw, vw, bmw, chevy] |[0.99, 0.98, 0.96, 0.95] |
+------+-------------------------------------------------------------------------------+----------------------------------------------------------------------------------+