Spark 窗口函数 - rangeBetween 日期

问题描述 投票:0回答:3

我有一个带有日期列的 Spark SQL

DataFrame
,我想要获取的是给定日期范围内当前行之前的所有行。例如,我想要获取给定行之前 7 天前的所有行。我发现,我需要使用
Window Function
,例如:

Window \
    .partitionBy('id') \
    .orderBy('start')

我想要

rangeBetween
7 天,但在 Spark 文档中我找不到任何相关内容。 Spark 甚至提供这样的选项吗?现在我只是获取前面的所有行:

.rowsBetween(-sys.maxsize, 0)

但想要实现类似的目标:

.rangeBetween("7 days", 0)
apache-spark date pyspark apache-spark-sql window-functions
3个回答
103
投票

火花 >= 2.3

自 Spark 2.3 起,可以通过 SQL API 使用间隔对象,但

DataFrame
API 支持仍在进行中

df.createOrReplaceTempView("df")

spark.sql(
    """SELECT *, mean(some_value) OVER (
        PARTITION BY id 
        ORDER BY CAST(start AS timestamp) 
        RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
     ) AS mean FROM df""").show()

## +---+----------+----------+------------------+       
## | id|     start|some_value|              mean|
## +---+----------+----------+------------------+
## |  1|2015-01-01|      20.0|              20.0|
## |  1|2015-01-06|      10.0|              15.0|
## |  1|2015-01-07|      25.0|18.333333333333332|
## |  1|2015-01-12|      30.0|21.666666666666668|
## |  2|2015-01-01|       5.0|               5.0|
## |  2|2015-01-03|      30.0|              17.5|
## |  2|2015-02-01|      20.0|              20.0|
## +---+----------+----------+------------------+

火花< 2.3

据我所知,无论是在 Spark 还是 Hive 中都不可能直接实现。两者都要求与

ORDER BY
一起使用的
RANGE
子句必须是数字。我发现的最接近的是转换为时间戳并以秒为单位进行操作。假设
start
列包含
date
类型:

from pyspark.sql import Row

row = Row("id", "start", "some_value")
df = sc.parallelize([
    row(1, "2015-01-01", 20.0),
    row(1, "2015-01-06", 10.0),
    row(1, "2015-01-07", 25.0),
    row(1, "2015-01-12", 30.0),
    row(2, "2015-01-01", 5.0),
    row(2, "2015-01-03", 30.0),
    row(2, "2015-02-01", 20.0)
]).toDF().withColumn("start", col("start").cast("date"))

一个小助手和窗口定义:

from pyspark.sql.window import Window
from pyspark.sql.functions import mean, col


# Hive timestamp is interpreted as UNIX timestamp in seconds*
days = lambda i: i * 86400 

最后查询:

w = (Window()
   .partitionBy(col("id"))
   .orderBy(col("start").cast("timestamp").cast("long"))
   .rangeBetween(-days(7), 0))

df.select(col("*"), mean("some_value").over(w).alias("mean")).show()

## +---+----------+----------+------------------+
## | id|     start|some_value|              mean|
## +---+----------+----------+------------------+
## |  1|2015-01-01|      20.0|              20.0|
## |  1|2015-01-06|      10.0|              15.0|
## |  1|2015-01-07|      25.0|18.333333333333332|
## |  1|2015-01-12|      30.0|21.666666666666668|
## |  2|2015-01-01|       5.0|               5.0|
## |  2|2015-01-03|      30.0|              17.5|
## |  2|2015-02-01|      20.0|              20.0|
## +---+----------+----------+------------------+

远谈不上漂亮,但很有效。


* Hive 语言手册,类型


10
投票

Spark 3.5 发布了,但是...

答案可能与 Spark 1.5.0 一样古老:

datediff
.

datediff(col_name, '1000')
将返回从 1000-01-01 到 col_name 的整数天差。

作为第一个参数,它接受日期、时间戳甚至字符串。
作为第二个,它甚至接受

1000


答案

日期差异以天为单位 - 取决于订单列的数据类型:

日期

  • 星火3.5+

    .orderBy(F.unix_date("col_name")).rangeBetween(-7, 0)
    
  • 星火3.1+

    .orderBy(F.expr("unix_date(col_name)")).rangeBetween(-7, 0)
    
  • 星火2.1+

    .orderBy(F.expr("datediff(col_name, '1000')")).rangeBetween(-7, 0)
    

时间戳

  • 星火2.1+

    .orderBy(F.expr("datediff(col_name, '1000')")).rangeBetween(-7, 0)
    

long - UNIX 时间(以微秒为单位)(例如 1672534861000000)

  • 星火2.1+

    .orderBy(F.col("col_name") / 86400_000000).rangeBetween(-7, 0)
    

long - UNIX 时间(以毫秒为单位)(例如 1672534861000)

  • 星火2.1+

    .orderBy(F.col("col_name") / 86400_000).rangeBetween(-7, 0)
    

long - UNIX 时间(以秒为单位)(例如 1672534861)

  • 星火2.1+

    .orderBy(F.col("col_name") / 86400).rangeBetween(-7, 0)
    

long,格式为 yyyyMMdd

  • 星火3.5+

    .orderBy(F.unix_date(F.to_date("col_name", 'yyyyMMdd'))).rangeBetween(-7, 0)
    
  • 星火3.3+

    .orderBy(F.expr("unix_date(to_date(col_name, 'yyyyMMdd'))")).rangeBetween(-7, 0)
    
  • 星火3.1+

    .orderBy(F.expr("unix_date(to_date(cast(col_name as string), 'yyyyMMdd'))")).rangeBetween(-7, 0)
    
  • 星火2.2+

    .orderBy(F.expr("datediff(to_date(cast(col_name as string), 'yyyyMMdd'), '1000')")).rangeBetween(-7, 0)
    
  • 星火2.1+

    .orderBy(F.unix_timestamp(F.col("col_name").cast('string'), 'yyyyMMdd') / 86400).rangeBetween(-7, 0)
    

字符串,日期格式为“yyyy-MM-dd”

  • 星火3.5+

    .orderBy(F.unix_date(F.to_date("col_name"))).rangeBetween(-7, 0)
    
  • 星火3.1+

    .orderBy(F.expr("unix_date(to_date(col_name))")).rangeBetween(-7, 0)
    
  • 星火2.1+

    .orderBy(F.expr("datediff(col_name, '1000')")).rangeBetween(-7, 0)
    
其他日期格式的

字符串(例如“MM-dd-yyyy”)

  • 星火3.5+

    .orderBy(F.unix_date(F.to_date("col_name", 'MM-dd-yyyy'))).rangeBetween(-7, 0)
    
  • 星火3.1+

    .orderBy(F.expr("unix_date(to_date(col_name, 'MM-dd-yyyy'))")).rangeBetween(-7, 0)
    
  • 星火2.2+

    .orderBy(F.expr("datediff(to_date(col_name, 'MM-dd-yyyy'), '1000')")).rangeBetween(-7, 0)
    
  • 星火2.1+

    .orderBy(F.unix_timestamp("col_name", 'MM-dd-yyyy') / 86400).rangeBetween(-7, 0)
    

字符串,时间戳格式为 'yyyy-MM-dd HH:mm:ss'

  • 星火2.1+

    .orderBy(F.expr("datediff(col_name, '1000')")).rangeBetween(-7, 0)
    

字符串采用其他时间戳格式(例如“MM-dd-yyyy HH:mm:ss”)

  • 星火2.2+

    .orderBy(F.expr("datediff(to_date(col_name, 'MM-dd-yyyy HH:mm:ss'), '1000')")).rangeBetween(-7, 0)
    

可以使用以下命令创建 Spark 3.4+ 中的不同测试用例:

ints = F.expr("sequence(1, 10)").alias('ints')
dates = (
    date := F.expr("sequence(to_date('2000-01-01'), to_date('2000-01-10'))")
    # timestamp := F.expr("sequence(to_timestamp('2000-01-01'), to_timestamp('2000-01-10'))")
    # long_micro := F.expr("sequence(946684800000000, 947462400000000, 86400000000)")
    # long_milli := F.expr("sequence(946684800000, 947462400000, 86400000)")
    # long_secs := F.expr("sequence(946684800, 947462400, 86400)")
    # long_yyyyMMdd := F.expr("sequence(20000101, 20000110)")
    # str_unformatted_date := F.expr("transform(sequence(to_date('2000-01-01'), to_date('2000-01-10')), x -> string(x))")
    # str_formatted_date := F.expr("transform(sequence(to_date('2000-01-01'), to_date('2000-01-10')), x -> date_format(x, 'MM-dd-yyyy'))")
    # str_unformatted_ts := F.expr("transform(sequence(to_timestamp('2000-01-01'), to_timestamp('2000-01-10')), x -> string(x))")
    # str_formatted_ts := F.expr("transform(sequence(to_date('2000-01-01'), to_date('2000-01-10')), x -> date_format(x, 'MM-dd-yyyy HH:mm:ss'))")
).alias('col_name')
df = spark.range(1).select(F.inline(F.arrays_zip(ints, dates)))

6
投票

很棒的解决方案@zero323,如果您想在几分钟内而不是像我那样需要几天的时间内进行操作,并且您不需要使用 id 进行分区,因此您只需修改代码的一个简单部分,如我所示:

df.createOrReplaceTempView("df")
spark.sql(
    """SELECT *, sum(total) OVER (
        ORDER BY CAST(reading_date AS timestamp) 
        RANGE BETWEEN INTERVAL 45 minutes PRECEDING AND CURRENT ROW
     ) AS sum_total FROM df""").show()
© www.soinside.com 2019 - 2024. All rights reserved.