我们定义了一个用户定义函数,如下:
def hashIt(key):
key=key.upper()
hashKey = H.md5(key.encode('utf-8')).hexdigest()
return hashKey
udf_hashIt = F.udf(hashIt, T.BinaryType())
spark.udf.register("HashtUDF", udf_hashIt)
现在作为 DataFrame 的一部分,我们正在使用:
df=df.withColumn('HashText', F.concat_ws('|', *bkp_col_list))
df=df.withColumn('HashKey', udf_hashIt('HashText'))
然后删除 HashText,但保留 HashKey 并将其保存到 Delta Table 中
但是基于HashKey,有没有办法取回HashText。
需要 HashKey 中的 HashText
我尝试过以下方法:
import hashlib
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, BinaryType
def hashIt(key):
key = key.upper()
hashKey = hashlib.md5(key.encode('utf-8')).hexdigest()
return hashKey
udf_hashIt = F.udf(hashIt, StringType())
spark.udf.register("HashUDF", udf_hashIt)
data = [("John Doe", 25), ("Jane Smith", 30), ("Bob Johnson", 35)]
df = spark.createDataFrame(data, ["Name", "Age"])
df = df.withColumn("HashText", F.concat_ws("|", "Name", "Age"))
df = df.withColumn("HashKey", udf_hashIt("HashText"))
df.write.format("delta").mode("overwrite").save("/FileStore/tables/delta_table")
delta_df = spark.read.format("delta").load("/FileStore/tables/delta_table")
hash_text_df = delta_df.select("HashText", "HashKey")
hash_text_df.show()
结果:
+--------------+--------------------+
| HashText| HashKey|
+--------------+--------------------+
|Bob Johnson|35|6482eb6af25b91ee9...|
| Jane Smith|30|687d685e316f35604...|
| John Doe|25|636468cf1942a6989...|
+--------------+--------------------+
在上面的代码中,将
hashIt
函数包装在名为 udf_hashIt 的 PySpark UDF
中,返回类型为 StringType。
在 Spark SQL 上下文中将此 UDF 注册为 HashUDF。
用 | 连接 Name 和 Age 列分隔符并将结果存储在新列中
HashText
。
使用 udf_hashIt 计算 HashText 列的哈希值并将其存储在新列 HashKey 中。 接下来,将 DataFrame df 写入 Delta 表