我目前正在试图找出,如何通过列参数将String-format参数传递给to_date pyspark函数。
具体来说,我有以下设置:
sc = SparkContext.getOrCreate()
df = sc.parallelize([('a','2018-01-01','yyyy-MM-dd'),
('b','2018-02-02','yyyy-MM-dd'),
('c','02-02-2018','dd-MM-yyyy')]).toDF(
["col_name","value","format"])
我目前正在尝试添加一个新列,其中F.col(“value”)列中的每个日期(字符串值)都被解析为日期。
对于每种格式,可以单独完成
df = df.withColumn("test1",F.to_date(F.col("value"),"yyyy-MM-dd")).\
withColumn("test2",F.to_date(F.col("value"),"dd-MM-yyyy"))
然而,这给了我2个新列 - 但我希望有1列包含两个结果 - 但是使用to_date函数似乎无法调用列:
df = df.withColumn("test3",F.to_date(F.col("value"),F.col("format")))
这里抛出错误“Column object not callable”。
是否可以对所有可能的格式使用通用方法(这样我就不必为每种格式手动添加新列)?
你可以使用spark-sql语法在没有use a column value as a parameter的情况下使用udf
:
Spark版本2.2及以上版本
from pyspark.sql.functions import expr
df.withColumn("test3",expr("to_date(value, format)")).show()
#+--------+----------+----------+----------+
#|col_name| value| format| test3|
#+--------+----------+----------+----------+
#| a|2018-01-01|yyyy-MM-dd|2018-01-01|
#| b|2018-02-02|yyyy-MM-dd|2018-02-02|
#| c|02-02-2018|dd-MM-yyyy|2018-02-02|
#+--------+----------+----------+----------+
或者等效地使用pyspark-sql:
df.createOrReplaceTempView("df")
spark.sql("select *, to_date(value, format) as test3 from df").show()
Spark 1.5及以上版本
较旧版本的spark不支持在format
函数中使用to_date
参数,因此您必须使用unix_timestamp
和from_unixtime
:
from pyspark.sql.functions import expr
df.withColumn(
"test3",
expr("from_unixtime(unix_timestamp(value,format))").cast("date")
).show()
或者等效地使用pyspark-sql:
df.createOrReplaceTempView("df")
spark.sql(
"select *, cast(from_unixtime(unix_timestamp(value,format)) as date) as test3 from df"
).show()
据我所知,您的问题需要一些udf
(用户定义的函数)来应用正确的格式。但是在udf
里面你不能直接使用像to_date
这样的火花功能。所以我在解决方案中创建了一些解决方法。首先,udf
使用列中的相应格式进行python日期转换,并将其转换为iso格式。然后另一个withColumn
将iso-date转换为test3列中的正确格式。但是,您必须调整原始列中的格式以匹配python dateformat字符串,例如yyyy - >%Y,MM - >%m,...
test_df = spark.createDataFrame([
('a','2018-01-01','%Y-%m-%d'),
('b','2018-02-02','%Y-%m-%d'),
('c','02-02-2018','%d-%m-%Y')
], ("col_name","value","format"))
def map_to_date(s,format):
return datetime.datetime.strptime(s,format).isoformat()
myudf = udf(map_to_date)
test_df.withColumn("test3",myudf(col("value"),col("format")))\
.withColumn("test3",to_date("test3")).show(truncate=False)
结果:
+--------+----------+--------+----------+
|col_name|value |format |test3 |
+--------+----------+--------+----------+
|a |2018-01-01|%Y-%m-%d|2018-01-01|
|b |2018-02-02|%Y-%m-%d|2018-02-02|
|c |02-02-2018|%d-%m-%Y|2018-02-02|
+--------+----------+--------+----------+