我创建了一个小
udtf
来突出我当前的挑战。我需要根据某些参数处理不同的表(udtf 可以在任何时间点获取任何表)。
我将表作为字符串传递给我的函数,在
process
中,我试图获取活动会话,以便我可以将我的表作为数据帧引用,进行一些处理并返回处理后的表。
这个 udtf 最终将在 dbt python 模型中运行(尚未达到该部分)。
到目前为止我已经尝试过:
__init__
def __init__(self):
self.session = get_active_session() # Store the passed session object
到目前为止还没有运气。
这是我的例子
from snowflake.snowpark.types import StructField, StructType, StringType, IntegerType, FloatType
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.functions import udtf, table_function,lit
import simplejson as json
udtf_schema = StructType([StructField("json_output", StringType())])
@udtf(output_schema=udtf_schema,input_types=[StringType()],packages=["simplejson", "snowflake-snowpark-python"],is_permanent=False,replace=True)
class example_udtf:
def process(self, sp_table):
session = get_active_session()
df = session.table(sp_table)
df_with_json = df.select(
F.to_json(
F.struct(
*[F.col(col) for col in df.columns] # Dynamically include all columns in the DataFrame
)
).alias("json_output")
)
for part in df_with_json.to_local_iterator():
result = { "value":part}
yield (json.dumps(result),)
tbl = session.table_function(example_udtf.name,lit("db.schema.my_existing_sf_table")).collect()
产生以下错误
nowflake.snowpark.exceptions.SnowparkSessionException: (1403): No default Session is found. Please create a session before you call function 'udf' or use decorator '@udf'.
in function SNOWPARK_TEMP_TABLE_FUNCTION_A8U80M1DGL with handler compute
也许这目前还不可能?
我不想创建 SP,因为 dbt 已经将 dbt 模型包装在 SP 中,而且我会失去沿袭。
蒂亚
如果您要从 Snowflake Python 工作表创建 utdf,那么首先您需要通过传递 Snowflake 凭据来建立会话。
从snowflake.snowpark导入会话
# Define connection parameters
connection_parameters = {
"account": "account",
"user": "user",
"password": "password",
"role": "role",
"warehouse": "warehouse",
"database": "SF_RAG_APP",
"schema": "RAG_APP_SCHEMA"
}
def get_snowpark_session():
""" Get or create a Snowpark session """
return Session.builder.configs(connection_parameters).create()
# Use this function to get a single session instead of creating new ones
session = get_snowpark_session()