from pyspark.sql.functions import *
from pyspark.sql.types import *
我正在尝试将数据帧转换为
df.column.cast(ShortType())
,但是当我尝试插入数据99999时,它会转换为null而不会引发任何错误,所以你能建议任何在转换时引发错误的方法吗?
如果转换出错,Spark 不会抛出异常。
作为捕获这些错误的自定义方法,您可以编写一个 UDF,如果强制转换为 null,则会抛出异常。但这会降低脚本的性能,因为 Spark 无法优化 UDF 执行。
pyspark.sql.Column.cast
失败,Spark 将默默失败,即整个列将变成
NULL
。您有几个选项可以解决此问题:
mode=failfast
集进行读取,例如:df = spark.createDataFrame([(1,0,0,2),(1,1,1,1)],['c1','c2','c3','c4'])
df.toPandas().to_csv("./test.csv")
spark.read.csv("./test.csv").show()
+----+---+---+---+---+
| _c0|_c1|_c2|_c3|_c4|
+----+---+---+---+---+
|null| c1| c2| c3| c4|
| 0| 1| 0| 0| 2|
| 1| 1| 1| 1| 1|
+----+---+---+---+---+
运行
spark.read.schema("_c0 INT, _c1 INT, _c2 INT, _c3 INT, _c4 INT").option("mode", "failfast").csv("./test.csv").show()
抛出:
org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST.
,因为 _c1
到 _c4
中的记录是第一行中的字符串(因为默认情况下 header=False
,因此标题被视为行)。这可以在堆栈跟踪的更深处找到:Caused by: java.lang.NumberFormatException: For input string: "_c1"
。为了比较,运行
spark.read.schema("_c0 INT, c1 INT, c2 INT, c3 INT, c4 INT").option("mode", "ignore").csv("./test.csv").show()
+----+----+----+----+----+
| _c0| c1| c2| c3| c4|
+----+----+----+----+----+
|null|null|null|null|null|
| 0| 1| 0| 0| 2|
| 1| 1| 1| 1| 1|
+----+----+----+----+----+
但是会抛出以下警告
WARN ParseMode: ignore is not a valid parse mode. Using PERMISSIVE.
pandas_udf
,因为它是矢量化的)。在这里,您面临着遇到难以调试的类型匹配错误的风险,因为您尝试将 Python/Pandas 使用的类型与 PySpark 使用的 JVM 类型进行匹配。一个例子是:import pyspark.sql.functions as f
df2 = spark.createDataFrame([("a",0,0,2),("b",1,1,1)],['c1','c2','c3','c4'])
df2.show()
+---+---+---+---+
| c1| c2| c3| c4|
+---+---+---+---+
| a| 0| 0| 2|
| b| 1| 1| 1|
+---+---+---+---+
@f.pandas_udf("long")
def my_cast(column):
return column.astype("int64")
df2.select(my_cast(f.col("c1"))).show()
此操作将抛出:
ValueError: invalid literal for int() with base 10: 'b'
除了使用自定义类型检查器
UDF
之外,您还可以使用pyspark.sql.functions.raise_error
。例如
col = F.coalesce(col.cast(ty),F.raise_error(err_msg))
df.select(col).show()
下面的查询是可接受大小写NULL的情况。
col = (F.when(col.isNotNull(),
F.coalesce(col.cast(ty),F.raise_error(err_msg)))
.otherwise(col.cast(ty)))
df.select(col).show()
Spark 将在
NULL
失败时返回 pyspark.sql.Column.cast
。当强制转换表达式将 NULL
返回到函数 coalesce
的第一个参数时,coalesce
将执行第二个参数 raise_error
。这会导致抛出异常。
此外,
pyspark.sql.functions.raise_error
接受类型为Column
或literal[str]
的参数。这意味着您可以自定义可以打印错误数据的错误消息。