pyspark udf 函数存储不正确的数据,尽管函数产生正确的结果

问题描述 投票:0回答:1

所以我有这个奇怪的问题。我正在使用一个巨大的数据集,其中的日期和时间由单个字符串表示。这个数据可以使用

datetime.strptime()
轻松转换,但问题是数据太大了,我需要使用pyspark来转换它。没问题,我想,我搜索了 stackoverflow 并看到了 UDF!所以我做了一个。不幸的是,存储在数据框中的值与函数实际生成的值不匹配。我假设一个函数会在 Spark 看到数据时逐行执行,但似乎并没有发生这种情况。

这是我所拥有的(数据是一个名为 result 的 pyspark 数据框,我显示前 5 行):

时间戳 节点id 子系统 传感器 标准杆 val_raw val_hrf
2018/01/01 00:00:06 001e0610e532 化学感 lps25小时 温度 -954 -9.54
2018/01/01 00:00:30 001e0610e532 化学感 lps25小时 温度 -954 -9.54
2018/01/01 00:00:54 001e0610e532 化学感 lps25小时 温度 -957 -9.57
2018/01/01 00:01:18 001e0610e532 化学感 lps25小时 温度 -961 -9.61
2018/01/01 00:01:42 001e0610e532 化学感 lps25小时 温度 -962 -9.62

为了将时间戳列转换为可操作的数据,我使用自定义函数将其转换为浮点数:

def timeswap(x:str):
    print(x)
    utime= datetime.timestamp(datetime.strptime(x, "%Y/%m/%d %H:%M:%S"))
    print(utime)
    return utime

我已经确认此功能可以正常工作。因此,我继续在整个列上运行它,并决定创建一个名为 unixTime 的新列来存储它:

timeUDF = spark.udf.register('timeUDF',timeswap,FloatType())  result_conv = result.withColumn('unixTime', timeUDF('timestamp'))

看起来确实有效。我花了几周的时间认为这是准确的,对数据运行算法却发现最近数据以不应该的方式聚集;同一日期的多次读数。所以我继续告诉 Spark 打印该专栏。这样做实际上会导致函数调用每一行。我知道这会是一件事,所以我放入打印语句作为健全性检查:

result_conv.select('unixTime').head(5)

它输出了我的#条评论:

2018/01/01 00:00:06 #The original string date
1514782806.0 #the correct output from the function
2018/01/01 00:00:30
1514782830.0
2018/01/01 00:00:54
1514782854.0
2018/01/01 00:01:18
1514782878.0
2018/01/01 00:01:42
1514782902.0
[Row(unixTime=1514782848.0), #I don't know what this value is
 Row(unixTime=1514782848.0),
 Row(unixTime=1514782848.0),
 Row(unixTime=1514782848.0),
 Row(unixTime=1514782848.0)]

有人知道我在这里缺少什么吗?我什至已经确认,当运行超过 5 行时,行列表中的浮点不存在,因此我不知道该值从何而来,也不知道为什么它会在行之间重复。它既不是平均值也不是中值(无论如何都不应该使用它们),而且我不知道为什么它会重复(当我查看较长的行时,重复的数量不一致)。我真的很想避免将其转换为 pandas DF,然后再转换回 Spark DF 来执行此操作。底线是我需要将日期字符串转换为 unixtime 浮点数,该浮点数对于该传感器的每行都是唯一的(因为它在数据中)。

感谢您的帮助!

python pandas dataframe pyspark user-defined-functions
1个回答
0
投票

您不必使用 UDF。您可以使用内置的 pyspark 函数来完成相同的任务。 UDF 是最后的手段。它们会减慢你的程序速度。

这就是我所做的。我认为价值的微小差异可能是由于时区问题造成的。但不确定。如果您能更好地说明问题,我可以进一步提供帮助。

import pyspark.sql.functions as F
from pyspark import SparkContext, SQLContext

sc = SparkContext('local')
sqlContext = SQLContext(sc)
### This is very important setting if you want legacy behaviour
sqlContext.setConf("spark.sql.legacy.timeParserPolicy", "LEGACY")

data1 = [
    ["2018/01/01 00:00:06"],
    ["2018/01/01 00:00:30"],
    ["2018/01/01 00:00:54"],
    ["2018/01/01 00:01:18"],
    ["2018/01/01 00:01:42"],


]

df1Columns = ["time_col"]
df1 = sqlContext.createDataFrame(data=data1, schema=df1Columns)

# 1514782806.0  # the correct output from the function

df1 = df1.withColumn("integer_value", F.unix_timestamp(F.to_timestamp('time_col', 'yyyy/MM/dd HH:mm:ss')))
df1.show(n=100, truncate=False)

输出:

+-------------------+-------------+
|time_col           |integer_value|
+-------------------+-------------+
|2018/01/01 00:00:06|1514745006   |
|2018/01/01 00:00:30|1514745030   |
|2018/01/01 00:00:54|1514745054   |
|2018/01/01 00:01:18|1514745078   |
|2018/01/01 00:01:42|1514745102   |
+-------------------+-------------+
© www.soinside.com 2019 - 2024. All rights reserved.