数据帧转换不会抛出溢出异常并产生 null

问题描述 投票:0回答:3
from pyspark.sql.functions import *
from pyspark.sql.types import *

我正在尝试将数据帧转换为

df.column.cast(ShortType())
,但是当我尝试插入数据99999时,它会转换为null而不会引发任何错误,所以你能建议任何在转换时引发错误的方法吗?

apache-spark pyspark apache-spark-sql
3个回答
4
投票

如果转换出错,Spark 不会抛出异常。

作为捕获这些错误的自定义方法,您可以编写一个 UDF,如果强制转换为 null,则会抛出异常。但这会降低脚本的性能,因为 Spark 无法优化 UDF 执行。


0
投票
如果

pyspark.sql.Column.cast

 失败,
Spark 将默默失败,即整个列将变成
NULL
。您有几个选项可以解决此问题:

  1. 如果您想在从文件读取时检测类型,您可以使用预定义(预期)模式和
    mode=failfast
    集进行读取,例如:
df = spark.createDataFrame([(1,0,0,2),(1,1,1,1)],['c1','c2','c3','c4'])
df.toPandas().to_csv("./test.csv")
spark.read.csv("./test.csv").show()
+----+---+---+---+---+
| _c0|_c1|_c2|_c3|_c4|
+----+---+---+---+---+
|null| c1| c2| c3| c4|
|   0|  1|  0|  0|  2|
|   1|  1|  1|  1|  1|
+----+---+---+---+---+

运行

spark.read.schema("_c0 INT, _c1 INT, _c2 INT, _c3 INT, _c4 INT").option("mode", "failfast").csv("./test.csv").show()
抛出:
org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST.
,因为
_c1
_c4
中的记录是第一行中的字符串(因为默认情况下
header=False
,因此标题被视为行)。这可以在堆栈跟踪的更深处找到:
Caused by: java.lang.NumberFormatException: For input string: "_c1"
。为了比较,运行

spark.read.schema("_c0 INT, c1 INT, c2 INT, c3 INT, c4 INT").option("mode", "ignore").csv("./test.csv").show()
+----+----+----+----+----+
| _c0|  c1|  c2|  c3|  c4|
+----+----+----+----+----+
|null|null|null|null|null|
|   0|   1|   0|   0|   2|
|   1|   1|   1|   1|   1|
+----+----+----+----+----+

但是会抛出以下警告

WARN ParseMode: ignore is not a valid parse mode. Using PERMISSIVE.

  1. 您的第二个选择是使用 UDF(或者更好的是,使用
    pandas_udf
    ,因为它是矢量化的)。在这里,您面临着遇到难以调试的类型匹配错误的风险,因为您尝试将 Python/Pandas 使用的类型与 PySpark 使用的 JVM 类型进行匹配。一个例子是:
import pyspark.sql.functions as f

df2 = spark.createDataFrame([("a",0,0,2),("b",1,1,1)],['c1','c2','c3','c4'])
df2.show()
+---+---+---+---+
| c1| c2| c3| c4|
+---+---+---+---+
|  a|  0|  0|  2|
|  b|  1|  1|  1|
+---+---+---+---+


@f.pandas_udf("long")
def my_cast(column):
  return column.astype("int64")

df2.select(my_cast(f.col("c1"))).show()

此操作将抛出:

ValueError: invalid literal for int() with base 10: 'b'


0
投票

除了使用自定义类型检查器

UDF
之外,您还可以使用
pyspark.sql.functions.raise_error
。例如

col = F.coalesce(col.cast(ty),F.raise_error(err_msg))
df.select(col).show()

下面的查询是可接受大小写NULL的情况。

col = (F.when(col.isNotNull(),
                F.coalesce(col.cast(ty),F.raise_error(err_msg)))
        .otherwise(col.cast(ty)))
df.select(col).show()

Spark 将在

NULL
失败时返回
pyspark.sql.Column.cast
。当强制转换表达式将
NULL
返回到函数
coalesce
的第一个参数时,
coalesce
将执行第二个参数
raise_error
。这会导致抛出异常。

此外,

pyspark.sql.functions.raise_error
接受类型为
Column
literal[str]
的参数。这意味着您可以自定义可以打印错误数据的错误消息。

© www.soinside.com 2019 - 2024. All rights reserved.