所以我有这个奇怪的问题。我正在使用一个巨大的数据集,其中的日期和时间由单个字符串表示。这个数据可以使用
datetime.strptime()
轻松转换,但问题是数据太大了,我需要使用pyspark来转换它。没问题,我想,我搜索了 stackoverflow 并看到了 UDF!所以我做了一个。不幸的是,存储在数据框中的值与函数实际生成的值不匹配。我假设一个函数会在 Spark 看到数据时逐行执行,但似乎并没有发生这种情况。
这是我所拥有的(数据是一个名为 result 的 pyspark 数据框,我显示前 5 行):
时间戳 | 节点id | 子系统 | 传感器 | 标准杆 | val_raw | val_hrf |
---|---|---|---|---|---|---|
2018/01/01 00:00:06 | 001e0610e532 | 化学感 | lps25小时 | 温度 | -954 | -9.54 |
2018/01/01 00:00:30 | 001e0610e532 | 化学感 | lps25小时 | 温度 | -954 | -9.54 |
2018/01/01 00:00:54 | 001e0610e532 | 化学感 | lps25小时 | 温度 | -957 | -9.57 |
2018/01/01 00:01:18 | 001e0610e532 | 化学感 | lps25小时 | 温度 | -961 | -9.61 |
2018/01/01 00:01:42 | 001e0610e532 | 化学感 | lps25小时 | 温度 | -962 | -9.62 |
为了将时间戳列转换为可操作的数据,我使用自定义函数将其转换为浮点数:
def timeswap(x:str):
print(x)
utime= datetime.timestamp(datetime.strptime(x, "%Y/%m/%d %H:%M:%S"))
print(utime)
return utime
我已经确认此功能可以正常工作。因此,我继续在整个列上运行它,并决定创建一个名为 unixTime 的新列来存储它:
timeUDF = spark.udf.register('timeUDF',timeswap,FloatType()) result_conv = result.withColumn('unixTime', timeUDF('timestamp'))
看起来确实有效。我花了几周的时间认为这是准确的,对数据运行算法却发现最近数据以不应该的方式聚集;同一日期的多次读数。所以我继续告诉 Spark 打印该专栏。这样做实际上会导致函数调用每一行。我知道这会是一件事,所以我放入打印语句作为健全性检查:
result_conv.select('unixTime').head(5)
它输出了我的#条评论:
2018/01/01 00:00:06 #The original string date
1514782806.0 #the correct output from the function
2018/01/01 00:00:30
1514782830.0
2018/01/01 00:00:54
1514782854.0
2018/01/01 00:01:18
1514782878.0
2018/01/01 00:01:42
1514782902.0
[Row(unixTime=1514782848.0), #I don't know what this value is
Row(unixTime=1514782848.0),
Row(unixTime=1514782848.0),
Row(unixTime=1514782848.0),
Row(unixTime=1514782848.0)]
有人知道我在这里缺少什么吗?我什至已经确认,当运行超过 5 行时,行列表中的浮点不存在,因此我不知道该值从何而来,也不知道为什么它会在行之间重复。它既不是平均值也不是中值(无论如何都不应该使用它们),而且我不知道为什么它会重复(当我查看较长的行时,重复的数量不一致)。我真的很想避免将其转换为 pandas DF,然后再转换回 Spark DF 来执行此操作。底线是我需要将日期字符串转换为 unixtime 浮点数,该浮点数对于该传感器的每行都是唯一的(因为它在数据中)。
感谢您的帮助!
您不必使用 UDF。您可以使用内置的 pyspark 函数来完成相同的任务。 UDF 是最后的手段。它们会减慢你的程序速度。
这就是我所做的。我认为价值的微小差异可能是由于时区问题造成的。但不确定。如果您能更好地说明问题,我可以进一步提供帮助。
import pyspark.sql.functions as F
from pyspark import SparkContext, SQLContext
sc = SparkContext('local')
sqlContext = SQLContext(sc)
### This is very important setting if you want legacy behaviour
sqlContext.setConf("spark.sql.legacy.timeParserPolicy", "LEGACY")
data1 = [
["2018/01/01 00:00:06"],
["2018/01/01 00:00:30"],
["2018/01/01 00:00:54"],
["2018/01/01 00:01:18"],
["2018/01/01 00:01:42"],
]
df1Columns = ["time_col"]
df1 = sqlContext.createDataFrame(data=data1, schema=df1Columns)
# 1514782806.0 # the correct output from the function
df1 = df1.withColumn("integer_value", F.unix_timestamp(F.to_timestamp('time_col', 'yyyy/MM/dd HH:mm:ss')))
df1.show(n=100, truncate=False)
输出:
+-------------------+-------------+
|time_col |integer_value|
+-------------------+-------------+
|2018/01/01 00:00:06|1514745006 |
|2018/01/01 00:00:30|1514745030 |
|2018/01/01 00:00:54|1514745054 |
|2018/01/01 00:01:18|1514745078 |
|2018/01/01 00:01:42|1514745102 |
+-------------------+-------------+