snowpark scala 中未知的用户定义函数

问题描述 投票:0回答:1

我是滑雪公园的新手。我有一个 Spark-scala 项目。我要迁移到斯卡拉雪场。我的项目中有这个udf

def maskColumnsUDF(maskChar : String = "*", noOfLeftVisibleChars : Int = 1, noOfRightVisiblechars : Int = 1): UserDefinedFunction = udf((attrVal :String) => {

      var finalString =  ""
      if(attrVal == null){ attrVal }
      else if (attrVal.length > 4 ) {
        val attrArray = attrVal.toCharArray
        for (counter <- 0 to attrArray.length() - 1) {
          if (counter < noOfLeftVisibleChars || counter > attrArray.length() - (noOfRightVisiblechars + 1){
            finalString = finalString + attrArray(counter)
          }
          else {
            finalString = finalString + maskChar
          }
        }
        finalString
      }
      else { "***" }
})

当我调用 udf 时:

columnNamesList.foreach(columnName => {
    sampleDf = sampleDf.withColumn(columnName+"_zs_masked",
    UserDefinedFunctions.maskColumnsUDF("*",1,1)(col(columnName)))
})

我收到此错误

[主要]错误 com.snowflake.snowpark.internal.ServerConnection - 无法分析查询架构: 线程“main”中出现异常 net.snowflake.client.jdbc.SnowflakeSQLException:SQL 编译错误:未知的用户定义函数 SNOWPARK_TEST.PUBLIC.SNOWPARK_TEMP_FUNCTION_GICGX7MLYN3OHZO

我什至尝试过像这样的简单udf

val schema = StructType(Seq(
    StructField("column_name", StringType, nullable = false),
    StructField("featureName", StringType, nullable = false),
    StructField("distribution", DoubleType, nullable = false)
  ))

val data = Seq(
    Row("A", "foo", 0.2),
    Row("A", "bar", 0.5),
    Row("B", "baz", 0.1),
    Row("B", "qux", 0.3)
)

var featureDF: DataFrame = session.createDataFrame(data, schema)
featureDF = featureDF.withColumn("test_udf", UserDefinedFunctions.test_udf(col("column_name")))

得到这个

[主要] 错误 com.snowflake.snowpark.internal.ServerConnection - 无法执行查询: 线程“main”中出现异常 net.snowflake.client.jdbc.SnowflakeSQLException:SQL 编译错误: 未知的用户定义函数 SNOWPARK_TEST.PUBLIC.SNOWPARK_TEMP_FUNCTION_R0KRMUFIRRFYNUX

我没有添加它在这两种情况下尝试执行的 sql 查询,因为查询太长了

scala snowflake-cloud-data-platform
1个回答
1
投票

我尝试了一些与您类似的代码。 我什至创建了一个像这样的测试 scala 过程:

create procedure test_proc()
    returns string
    language scala
    runtime_version = 2.12
    packages =('com.snowflake:snowpark:latest')
    handler = 'Procedure.main'
    as '

import com.snowflake.snowpark._
import com.snowflake.snowpark.types._
import com.snowflake.snowpark.functions._

object Procedure {
  def main(session: Session): String = {
    import java.io.{ByteArrayOutputStream, PrintStream}
    val outputStream = new ByteArrayOutputStream()
    Console.withOut(new PrintStream(outputStream)) {
        def maskColumnsUDF(maskChar : String = "*", noOfLeftVisibleChars : Int = 1, noOfRightVisiblechars : Int = 1): UserDefinedFunction = udf((attrVal :String) => {        
              var finalString =  ""
              if(attrVal == null){ attrVal }
              else if (attrVal.length > 4 ) {
                val attrArray = attrVal.toCharArray
                for (counter <- 0 to attrArray.length() - 1) {
                  if (counter < noOfLeftVisibleChars || counter > attrArray.length() - (noOfRightVisiblechars + 1))
                  {
                    finalString = finalString + attrArray(counter)
                  }
                  else {
                    finalString = finalString + maskChar
                  }
                }
                finalString
              }
              else { "***" }
        })
        
        val schema = StructType(Seq(
            StructField("column_name", StringType, nullable = false),
            StructField("featureName", StringType, nullable = false),
            StructField("distribution", DoubleType, nullable = false)
          ))
    
        val data = Seq(
            Row("A", "foo", 0.2),
            Row("A", "bar", 0.5),
            Row("B", "baz", 0.1),
            Row("B", "qux", 0.3)
        )
        
        var featureDF: DataFrame = session.createDataFrame(data, schema)
        featureDF.withColumn("xx",maskColumnsUDF("*",1,1)(col("column_name"))).show()
    }
    outputStream.toString()
  }
}';

而且效果很好。 因为这是临时 UDF,我认为您可能面临的是您正在使用不同的会话,这就是为什么您收到该函数不存在的错误。

你可以尝试这样的事情:

def maskColumnsUDF(session:Session,maskChar : String = "*", noOfLeftVisibleChars : Int = 1, noOfRightVisiblechars : Int = 1): 
    UserDefinedFunction = {
        val udf_name = s"maskColumnsUDF_${noOfLeftVisibleChars}_${noOfRightVisiblechars}"
        session.sql(s"drop function if exists ${udf_name}(varchar)")
        session.udf.registerPermanent(
         udf_name,
         (attrVal :String) => {        
              var finalString =  ""
              if(attrVal == null){ attrVal }
              else if (attrVal.length > 4 ) {
                val attrArray = attrVal.toCharArray
                for (counter <- 0 to attrArray.length() - 1) {
                  if (counter < noOfLeftVisibleChars || counter > attrArray.length() - (noOfRightVisiblechars + 1))
                  {
                    finalString = finalString + attrArray(counter)
                  }
                  else {
                    finalString = finalString + maskChar
                  }
                }
                finalString
              }
              else { "***" }
        },"@mystage")
    }

这将使用永久 UDF,它应该在所有会话中工作。我只是建议诊断错误。最好确定会话是否不同。

© www.soinside.com 2019 - 2024. All rights reserved.