我是滑雪公园的新手。我有一个 Spark-scala 项目。我要迁移到斯卡拉雪场。我的项目中有这个udf
def maskColumnsUDF(maskChar : String = "*", noOfLeftVisibleChars : Int = 1, noOfRightVisiblechars : Int = 1): UserDefinedFunction = udf((attrVal :String) => {
var finalString = ""
if(attrVal == null){ attrVal }
else if (attrVal.length > 4 ) {
val attrArray = attrVal.toCharArray
for (counter <- 0 to attrArray.length() - 1) {
if (counter < noOfLeftVisibleChars || counter > attrArray.length() - (noOfRightVisiblechars + 1){
finalString = finalString + attrArray(counter)
}
else {
finalString = finalString + maskChar
}
}
finalString
}
else { "***" }
})
当我调用 udf 时:
columnNamesList.foreach(columnName => {
sampleDf = sampleDf.withColumn(columnName+"_zs_masked",
UserDefinedFunctions.maskColumnsUDF("*",1,1)(col(columnName)))
})
我收到此错误
[主要]错误 com.snowflake.snowpark.internal.ServerConnection - 无法分析查询架构: 线程“main”中出现异常 net.snowflake.client.jdbc.SnowflakeSQLException:SQL 编译错误:未知的用户定义函数 SNOWPARK_TEST.PUBLIC.SNOWPARK_TEMP_FUNCTION_GICGX7MLYN3OHZO
我什至尝试过像这样的简单udf
val schema = StructType(Seq(
StructField("column_name", StringType, nullable = false),
StructField("featureName", StringType, nullable = false),
StructField("distribution", DoubleType, nullable = false)
))
val data = Seq(
Row("A", "foo", 0.2),
Row("A", "bar", 0.5),
Row("B", "baz", 0.1),
Row("B", "qux", 0.3)
)
var featureDF: DataFrame = session.createDataFrame(data, schema)
featureDF = featureDF.withColumn("test_udf", UserDefinedFunctions.test_udf(col("column_name")))
得到这个
[主要] 错误 com.snowflake.snowpark.internal.ServerConnection - 无法执行查询: 线程“main”中出现异常 net.snowflake.client.jdbc.SnowflakeSQLException:SQL 编译错误: 未知的用户定义函数 SNOWPARK_TEST.PUBLIC.SNOWPARK_TEMP_FUNCTION_R0KRMUFIRRFYNUX
我没有添加它在这两种情况下尝试执行的 sql 查询,因为查询太长了
我尝试了一些与您类似的代码。 我什至创建了一个像这样的测试 scala 过程:
create procedure test_proc()
returns string
language scala
runtime_version = 2.12
packages =('com.snowflake:snowpark:latest')
handler = 'Procedure.main'
as '
import com.snowflake.snowpark._
import com.snowflake.snowpark.types._
import com.snowflake.snowpark.functions._
object Procedure {
def main(session: Session): String = {
import java.io.{ByteArrayOutputStream, PrintStream}
val outputStream = new ByteArrayOutputStream()
Console.withOut(new PrintStream(outputStream)) {
def maskColumnsUDF(maskChar : String = "*", noOfLeftVisibleChars : Int = 1, noOfRightVisiblechars : Int = 1): UserDefinedFunction = udf((attrVal :String) => {
var finalString = ""
if(attrVal == null){ attrVal }
else if (attrVal.length > 4 ) {
val attrArray = attrVal.toCharArray
for (counter <- 0 to attrArray.length() - 1) {
if (counter < noOfLeftVisibleChars || counter > attrArray.length() - (noOfRightVisiblechars + 1))
{
finalString = finalString + attrArray(counter)
}
else {
finalString = finalString + maskChar
}
}
finalString
}
else { "***" }
})
val schema = StructType(Seq(
StructField("column_name", StringType, nullable = false),
StructField("featureName", StringType, nullable = false),
StructField("distribution", DoubleType, nullable = false)
))
val data = Seq(
Row("A", "foo", 0.2),
Row("A", "bar", 0.5),
Row("B", "baz", 0.1),
Row("B", "qux", 0.3)
)
var featureDF: DataFrame = session.createDataFrame(data, schema)
featureDF.withColumn("xx",maskColumnsUDF("*",1,1)(col("column_name"))).show()
}
outputStream.toString()
}
}';
而且效果很好。 因为这是临时 UDF,我认为您可能面临的是您正在使用不同的会话,这就是为什么您收到该函数不存在的错误。
你可以尝试这样的事情:
def maskColumnsUDF(session:Session,maskChar : String = "*", noOfLeftVisibleChars : Int = 1, noOfRightVisiblechars : Int = 1):
UserDefinedFunction = {
val udf_name = s"maskColumnsUDF_${noOfLeftVisibleChars}_${noOfRightVisiblechars}"
session.sql(s"drop function if exists ${udf_name}(varchar)")
session.udf.registerPermanent(
udf_name,
(attrVal :String) => {
var finalString = ""
if(attrVal == null){ attrVal }
else if (attrVal.length > 4 ) {
val attrArray = attrVal.toCharArray
for (counter <- 0 to attrArray.length() - 1) {
if (counter < noOfLeftVisibleChars || counter > attrArray.length() - (noOfRightVisiblechars + 1))
{
finalString = finalString + attrArray(counter)
}
else {
finalString = finalString + maskChar
}
}
finalString
}
else { "***" }
},"@mystage")
}
这将使用永久 UDF,它应该在所有会话中工作。我只是建议诊断错误。最好确定会话是否不同。