具有行输入类型的Snowpark UDF

问题描述 投票:0回答:1

我想定义一个输入类型为 snowflake.snowpark.Row 的 Snowpark UDF。 这样做的原因是我想模仿

pandas.apply
方法 我可以在某个类中定义我的业务逻辑,然后将逻辑应用到 Snowpark 数据帧的每一行。每列都可以使用 asDict

轻松映射到类属性

例如(从 Snowflake Python 工作表运行):

import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import udf
from snowflake.snowpark import Row
from snowflake.snowpark.types import IntegerType


from dataclasses import dataclass

@dataclass
class MyEvent:
    attribute1: str = 'dummy'
    attribute2: str = 'unknown'
    def someCalculation(self) -> int:
        return len(self.attribute1) + len(self.attribute2.strip())

def testSomeCalculation():
    inputDict = {'attribute1': 'foo',
                 'attribute2': 'baz'}
    event = MyEvent(**inputDict)
    print(event.someCalculation())


def main(session: snowpark.Session):

    some_logic = udf(lambda row: MyEvent(**(row.asDict())).someCalculation()
              , return_type=IntegerType()
              , input_types=[Row])

但是,当我尝试使用 Snowpark.Row 作为输入类型时,我得到一个

unsupported data type
:

File "snowflake/snowpark/_internal/udf_utils.py", line 972, in create_python_udf_or_sp
    input_sql_types = [convert_sp_to_sf_type(arg.datatype) for arg in input_args]
  File "snowflake/snowpark/_internal/udf_utils.py", line 972, in <listcomp>
    input_sql_types = [convert_sp_to_sf_type(arg.datatype) for arg in input_args]
  File "snowflake/snowpark/_internal/type_utils.py", line 195, in convert_sp_to_sf_type
    raise TypeError(f"Unsupported data type: {datatype.__class__.__name__}")
TypeError: Unsupported data type: type

我看到所有 UDF 示例都使用

snowpark.types
中的基本类型。 输入类型不能是 Snowpark.Row 有什么根本原因吗?

我知道我可以明确列出

MyEvent
中的所有
input_type=[]
属性, 但这很容易出错并且违背了设计的目的 我的代码围绕代表我的业务对象的类。

python snowflake-cloud-data-platform user-defined-functions
1个回答
0
投票

这是 Snowflake 中 Python UDF 的官方类型映射:

如果您想接收

dict
,请输入
dict
(在 SQL 中将是
variant
object
)。

然后,无需处理该行,只需将其转换为 dict,然后再将其发送到 UDF。

© www.soinside.com 2019 - 2024. All rights reserved.