Pyspark:Pyspark中的任何函数是否生成相同的唯一值集,以用作两个数据框之间的公共列,以后可用于联接?如果您需要更多说明,请发表评论
下面是我可以想到的使用行的所有列值生成唯一值的方法-
String data = " firstname| lastname| age\n" +
" John | Doe | 21\n" +
" John. | Doe. | 21\n" +
" Mary. | William. | 30";
List<String> list1 = Arrays.stream(data.split(System.lineSeparator()))
.map(s -> Arrays.stream(s.split("\\|"))
.map(s1 -> s1.replaceAll("^[ \t]+|[ \t]+$", ""))
.collect(Collectors.joining(","))
)
.collect(Collectors.toList());
Dataset<Row> df2 = spark.read()
.option("header", true)
.option("inferSchema", true)
.option("sep", ",")
.csv(spark.createDataset(list1, Encoders.STRING()));
df2.show(false);
df2.printSchema();
/**
* +---------+--------+---+
* |firstname|lastname|age|
* +---------+--------+---+
* |John |Doe |21 |
* |John. |Doe. |21 |
* |Mary. |William.|30 |
* +---------+--------+---+
*
* root
* |-- firstname: string (nullable = true)
* |-- lastname: string (nullable = true)
* |-- age: integer (nullable = true)
*/
List<Column> allCols = Arrays.stream(df2.columns()).map(functions::col).collect(Collectors.toList());
Wikipedia页面给出了碰撞可能性的估计。如果运行这些数字,您会发现地球上生产的所有硬盘都无法容纳足够的1MB文件,因此SHA-256发生冲突的可能性甚至为0.01%。基本上,您可以简单地忽略这种可能性。 请注意-
string
格式创建ID df2.withColumn("stringId", sha2(concat_ws(":", toScalaSeq(allCols)), 256))
.show(false);
/**
* run-1
* +---------+--------+---+----------------------------------------------------------------+
* |firstname|lastname|age|stringId |
* +---------+--------+---+----------------------------------------------------------------+
* |John |Doe |21 |95903bdd538bc48810c367d0cbe59364e10068fd2511c1a0377015b02157ad30|
* |John. |Doe. |21 |52092b925014246e67cc80ce460db8791981775f7e2f7a9fc02eed620f7e84f9|
* |Mary. |William.|30 |a782aa33b3a94148fe450b3e251d0a526ecbe83a4e6fbf49781a2f62dbaadc88|
* +---------+--------+---+----------------------------------------------------------------+
* run-2
* +---------+--------+---+----------------------------------------------------------------+
* |firstname|lastname|age|stringId |
* +---------+--------+---+----------------------------------------------------------------+
* |John |Doe |21 |95903bdd538bc48810c367d0cbe59364e10068fd2511c1a0377015b02157ad30|
* |John. |Doe. |21 |52092b925014246e67cc80ce460db8791981775f7e2f7a9fc02eed620f7e84f9|
* |Mary. |William.|30 |a782aa33b3a94148fe450b3e251d0a526ecbe83a4e6fbf49781a2f62dbaadc88|
* +---------+--------+---+----------------------------------------------------------------+
* run-3
* +---------+--------+---+----------------------------------------------------------------+
* |firstname|lastname|age|stringId |
* +---------+--------+---+----------------------------------------------------------------+
* |John |Doe |21 |95903bdd538bc48810c367d0cbe59364e10068fd2511c1a0377015b02157ad30|
* |John. |Doe. |21 |52092b925014246e67cc80ce460db8791981775f7e2f7a9fc02eed620f7e84f9|
* |Mary. |William.|30 |a782aa33b3a94148fe450b3e251d0a526ecbe83a4e6fbf49781a2f62dbaadc88|
* +---------+--------+---+----------------------------------------------------------------+
*/
由于使用大数据集时没有partitionBy子句,因此性能会下降
请注意-
number
格式创建ID df2.withColumn("number", row_number().over(Window.orderBy(toScalaSeq(allCols))))
.show(false);
/**
* run-1
* +---------+--------+---+------+
* |firstname|lastname|age|number|
* +---------+--------+---+------+
* |John |Doe |21 |1 |
* |John. |Doe. |21 |2 |
* |Mary. |William.|30 |3 |
* +---------+--------+---+------+
* run-2
* +---------+--------+---+------+
* |firstname|lastname|age|number|
* +---------+--------+---+------+
* |John |Doe |21 |1 |
* |John. |Doe. |21 |2 |
* |Mary. |William.|30 |3 |
* +---------+--------+---+------+
* run-3
* +---------+--------+---+------+
* |firstname|lastname|age|number|
* +---------+--------+---+------+
* |John |Doe |21 |1 |
* |John. |Doe. |21 |2 |
* |Mary. |William.|30 |3 |
* +---------+--------+---+------+
*/
UUID.nameUUIDFromBytes
UserDefinedFunction id_udf = udf( (String s) ->
UUID.nameUUIDFromBytes(
s.getBytes(StandardCharsets.UTF_8)
).toString(), DataTypes.StringType);
df2.withColumn("stringId", id_udf.apply(concat_ws(":", toScalaSeq(allCols))))
.show(false);
/**
* run-1
* +---------+--------+---+------------------------------------+
* |firstname|lastname|age|stringId |
* +---------+--------+---+------------------------------------+
* |John |Doe |21 |3d319fa5-7a48-3c21-bdb8-f4546a18dffb|
* |John. |Doe. |21 |49ab483f-692d-3e14-aa53-2e35e0cf2a17|
* |Mary. |William.|30 |9b758f70-3723-3623-b262-6d200d6111cf|
* +---------+--------+---+------------------------------------+
* run-2
* +---------+--------+---+------------------------------------+
* |firstname|lastname|age|stringId |
* +---------+--------+---+------------------------------------+
* |John |Doe |21 |3d319fa5-7a48-3c21-bdb8-f4546a18dffb|
* |John. |Doe. |21 |49ab483f-692d-3e14-aa53-2e35e0cf2a17|
* |Mary. |William.|30 |9b758f70-3723-3623-b262-6d200d6111cf|
* +---------+--------+---+------------------------------------+
* run-3
* +---------+--------+---+------------------------------------+
* |firstname|lastname|age|stringId |
* +---------+--------+---+------------------------------------+
* |John |Doe |21 |3d319fa5-7a48-3c21-bdb8-f4546a18dffb|
* |John. |Doe. |21 |49ab483f-692d-3e14-aa53-2e35e0cf2a17|
* |Mary. |William.|30 |9b758f70-3723-3623-b262-6d200d6111cf|
* +---------+--------+---+------------------------------------+
*/
关于-
这些行可能有重复项,重复的行应该生成不同的ID,并且在以后的运行中,这些ID应该相同。
所有这些方法将在每次运行中一致且唯一地创建ID
PS。请保持下面的方法方便转换-
<T> Buffer<T> toScalaSeq(List<T> list) {
return JavaConversions.asScalaBuffer(list);
}