处理 UDTF 内的 Snowpark 表 - 未找到会话

问题描述 投票:0回答:1

我创建了一个小

udtf
来突出我当前的挑战。我需要根据某些参数处理不同的表(udtf 可以在任何时间点获取任何表)。

我将表作为字符串传递给我的函数,在

process
中,我试图获取活动会话,以便我可以将我的表作为数据帧引用,进行一些处理并返回处理后的表。

这个 udtf 最终将在 dbt python 模型中运行(尚未达到该部分)。

到目前为止我已经尝试过:

  • 添加
    __init__
def __init__(self):
        self.session = get_active_session()  # Store the passed session object

到目前为止还没有运气。

这是我的例子

from snowflake.snowpark.types import StructField, StructType, StringType, IntegerType, FloatType
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.functions import udtf, table_function,lit
import simplejson as json
udtf_schema = StructType([StructField("json_output", StringType())])

@udtf(output_schema=udtf_schema,input_types=[StringType()],packages=["simplejson", "snowflake-snowpark-python"],is_permanent=False,replace=True)
class example_udtf:
  def process(self, sp_table):
      session = get_active_session()
      df = session.table(sp_table)
      df_with_json = df.select(
                    F.to_json(
                        F.struct(
                            *[F.col(col) for col in df.columns]  # Dynamically include all columns in the DataFrame
                        )
                    ).alias("json_output")
        )
      for part in df_with_json.to_local_iterator():
          result = { "value":part}
          yield (json.dumps(result),)
                
tbl = session.table_function(example_udtf.name,lit("db.schema.my_existing_sf_table")).collect()

产生以下错误

nowflake.snowpark.exceptions.SnowparkSessionException: (1403): No default Session is found. Please create a session before you call function 'udf' or use decorator '@udf'.
 in function SNOWPARK_TEMP_TABLE_FUNCTION_A8U80M1DGL with handler compute

也许这目前还不可能?

我不想创建 SP,因为 dbt 已经将 dbt 模型包装在 SP 中,而且我会失去沿袭。

蒂亚

pyspark snowflake-cloud-data-platform udtf
1个回答
0
投票

如果您要从 Snowflake Python 工作表创建 utdf,那么首先您需要通过传递 Snowflake 凭据来建立会话。

从snowflake.snowpark导入会话

# Define connection parameters
connection_parameters = {
    "account": "account",
    "user": "user",
    "password": "password",
    "role": "role",
    "warehouse": "warehouse",
    "database": "SF_RAG_APP",
    "schema": "RAG_APP_SCHEMA"
}

def get_snowpark_session():
    """ Get or create a Snowpark session """
   return Session.builder.configs(connection_parameters).create()

# Use this function to get a single session instead of creating new ones
session = get_snowpark_session()
© www.soinside.com 2019 - 2024. All rights reserved.