我想定义一个输入类型为 snowflake.snowpark.Row 的 Snowpark UDF。 这样做的原因是我想模仿
pandas.apply
方法
我可以在某个类中定义我的业务逻辑,然后将逻辑应用到 Snowpark 数据帧的每一行。每列都可以使用 asDict 轻松映射到类属性
例如(从 Snowflake Python 工作表运行):
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import udf
from snowflake.snowpark import Row
from snowflake.snowpark.types import IntegerType
from dataclasses import dataclass
@dataclass
class MyEvent:
attribute1: str = 'dummy'
attribute2: str = 'unknown'
def someCalculation(self) -> int:
return len(self.attribute1) + len(self.attribute2.strip())
def testSomeCalculation():
inputDict = {'attribute1': 'foo',
'attribute2': 'baz'}
event = MyEvent(**inputDict)
print(event.someCalculation())
def main(session: snowpark.Session):
some_logic = udf(lambda row: MyEvent(**(row.asDict())).someCalculation()
, return_type=IntegerType()
, input_types=[Row])
但是,当我尝试使用 Snowpark.Row 作为输入类型时,我得到一个
unsupported data type
:
File "snowflake/snowpark/_internal/udf_utils.py", line 972, in create_python_udf_or_sp
input_sql_types = [convert_sp_to_sf_type(arg.datatype) for arg in input_args]
File "snowflake/snowpark/_internal/udf_utils.py", line 972, in <listcomp>
input_sql_types = [convert_sp_to_sf_type(arg.datatype) for arg in input_args]
File "snowflake/snowpark/_internal/type_utils.py", line 195, in convert_sp_to_sf_type
raise TypeError(f"Unsupported data type: {datatype.__class__.__name__}")
TypeError: Unsupported data type: type
我看到所有 UDF 示例都使用
snowpark.types
中的基本类型。
输入类型不能是 Snowpark.Row 有什么根本原因吗?
我知道我可以明确列出
MyEvent
中的所有 input_type=[]
属性,
但这很容易出错并且违背了设计的目的
我的代码围绕代表我的业务对象的类。
这是 Snowflake 中 Python UDF 的官方类型映射:
如果您想接收
dict
,请输入 dict
(在 SQL 中将是 variant
或 object
)。
然后,无需处理该行,只需将其转换为 dict,然后再将其发送到 UDF。