Pyspark:Pyspark中的任何函数都生成相同的连续唯一值集吗?如numpy.arange()

问题描述 投票:0回答:1

Pyspark:Pyspark中的任何函数是否生成相同的唯一值集,以用作两个数据框之间的公共列,以后可用于联接?如果您需要更多说明,请发表评论

dataframe apache-spark hadoop pyspark bigdata
1个回答
0
投票

下面是我可以想到的使用行的所有列值生成唯一值的方法-

加载测试数据

String data = " firstname|  lastname|  age\n" +
                " John      | Doe      | 21\n" +
                " John.     | Doe.     | 21\n" +
                " Mary.     | William. | 30";

        List<String> list1 = Arrays.stream(data.split(System.lineSeparator()))
                .map(s -> Arrays.stream(s.split("\\|"))
                        .map(s1 -> s1.replaceAll("^[ \t]+|[ \t]+$", ""))
                        .collect(Collectors.joining(","))
                )
                .collect(Collectors.toList());

        Dataset<Row> df2 = spark.read()
                .option("header", true)
                .option("inferSchema", true)
                .option("sep", ",")
                .csv(spark.createDataset(list1, Encoders.STRING()));

        df2.show(false);
        df2.printSchema();
        /**
         * +---------+--------+---+
         * |firstname|lastname|age|
         * +---------+--------+---+
         * |John     |Doe     |21 |
         * |John.    |Doe.    |21 |
         * |Mary.    |William.|30 |
         * +---------+--------+---+
         *
         * root
         *  |-- firstname: string (nullable = true)
         *  |-- lastname: string (nullable = true)
         *  |-- age: integer (nullable = true)
         */

创建数据框中存在的所有列的列表

 List<Column> allCols = Arrays.stream(df2.columns()).map(functions::col).collect(Collectors.toList());

方法1。计算所有列的sha2

Wikipedia页面给出了碰撞可能性的估计。如果运行这些数字,您会发现地球上生产的所有硬盘都无法容纳足够的1MB文件,因此SHA-256发生冲突的可能性甚至为0.01%。基本上,您可以简单地忽略这种可能性。 请注意-

  1. 此方法将以string格式创建ID
  df2.withColumn("stringId", sha2(concat_ws(":", toScalaSeq(allCols)), 256))
                .show(false);
        /**
         * run-1
         * +---------+--------+---+----------------------------------------------------------------+
         * |firstname|lastname|age|stringId                                                        |
         * +---------+--------+---+----------------------------------------------------------------+
         * |John     |Doe     |21 |95903bdd538bc48810c367d0cbe59364e10068fd2511c1a0377015b02157ad30|
         * |John.    |Doe.    |21 |52092b925014246e67cc80ce460db8791981775f7e2f7a9fc02eed620f7e84f9|
         * |Mary.    |William.|30 |a782aa33b3a94148fe450b3e251d0a526ecbe83a4e6fbf49781a2f62dbaadc88|
         * +---------+--------+---+----------------------------------------------------------------+
         * run-2
         * +---------+--------+---+----------------------------------------------------------------+
         * |firstname|lastname|age|stringId                                                        |
         * +---------+--------+---+----------------------------------------------------------------+
         * |John     |Doe     |21 |95903bdd538bc48810c367d0cbe59364e10068fd2511c1a0377015b02157ad30|
         * |John.    |Doe.    |21 |52092b925014246e67cc80ce460db8791981775f7e2f7a9fc02eed620f7e84f9|
         * |Mary.    |William.|30 |a782aa33b3a94148fe450b3e251d0a526ecbe83a4e6fbf49781a2f62dbaadc88|
         * +---------+--------+---+----------------------------------------------------------------+
         * run-3
         * +---------+--------+---+----------------------------------------------------------------+
         * |firstname|lastname|age|stringId                                                        |
         * +---------+--------+---+----------------------------------------------------------------+
         * |John     |Doe     |21 |95903bdd538bc48810c367d0cbe59364e10068fd2511c1a0377015b02157ad30|
         * |John.    |Doe.    |21 |52092b925014246e67cc80ce460db8791981775f7e2f7a9fc02eed620f7e84f9|
         * |Mary.    |William.|30 |a782aa33b3a94148fe450b3e251d0a526ecbe83a4e6fbf49781a2f62dbaadc88|
         * +---------+--------+---+----------------------------------------------------------------+
         */

方法2:使用row_number功能

由于使用大数据集时没有partitionBy子句,因此性能会下降

请注意-

  1. 此方法将以number格式创建ID
 df2.withColumn("number", row_number().over(Window.orderBy(toScalaSeq(allCols))))
                .show(false);
        /**
         * run-1
         * +---------+--------+---+------+
         * |firstname|lastname|age|number|
         * +---------+--------+---+------+
         * |John     |Doe     |21 |1     |
         * |John.    |Doe.    |21 |2     |
         * |Mary.    |William.|30 |3     |
         * +---------+--------+---+------+
         * run-2
         * +---------+--------+---+------+
         * |firstname|lastname|age|number|
         * +---------+--------+---+------+
         * |John     |Doe     |21 |1     |
         * |John.    |Doe.    |21 |2     |
         * |Mary.    |William.|30 |3     |
         * +---------+--------+---+------+
         * run-3
         * +---------+--------+---+------+
         * |firstname|lastname|age|number|
         * +---------+--------+---+------+
         * |John     |Doe     |21 |1     |
         * |John.    |Doe.    |21 |2     |
         * |Mary.    |William.|30 |3     |
         * +---------+--------+---+------+
         */

方法3:使用UUID.nameUUIDFromBytes

 UserDefinedFunction id_udf = udf( (String s) ->
                UUID.nameUUIDFromBytes(
                        s.getBytes(StandardCharsets.UTF_8)
                ).toString(), DataTypes.StringType);

        df2.withColumn("stringId", id_udf.apply(concat_ws(":", toScalaSeq(allCols))))
        .show(false);
        /**
         * run-1
         * +---------+--------+---+------------------------------------+
         * |firstname|lastname|age|stringId                            |
         * +---------+--------+---+------------------------------------+
         * |John     |Doe     |21 |3d319fa5-7a48-3c21-bdb8-f4546a18dffb|
         * |John.    |Doe.    |21 |49ab483f-692d-3e14-aa53-2e35e0cf2a17|
         * |Mary.    |William.|30 |9b758f70-3723-3623-b262-6d200d6111cf|
         * +---------+--------+---+------------------------------------+
         * run-2
         * +---------+--------+---+------------------------------------+
         * |firstname|lastname|age|stringId                            |
         * +---------+--------+---+------------------------------------+
         * |John     |Doe     |21 |3d319fa5-7a48-3c21-bdb8-f4546a18dffb|
         * |John.    |Doe.    |21 |49ab483f-692d-3e14-aa53-2e35e0cf2a17|
         * |Mary.    |William.|30 |9b758f70-3723-3623-b262-6d200d6111cf|
         * +---------+--------+---+------------------------------------+
         * run-3
         * +---------+--------+---+------------------------------------+
         * |firstname|lastname|age|stringId                            |
         * +---------+--------+---+------------------------------------+
         * |John     |Doe     |21 |3d319fa5-7a48-3c21-bdb8-f4546a18dffb|
         * |John.    |Doe.    |21 |49ab483f-692d-3e14-aa53-2e35e0cf2a17|
         * |Mary.    |William.|30 |9b758f70-3723-3623-b262-6d200d6111cf|
         * +---------+--------+---+------------------------------------+
         */

关于-

这些行可能有重复项,重复的行应该生成不同的ID,并且在以后的运行中,这些ID应该相同。

所有这些方法将在每次运行中一致且唯一地创建ID

PS。请保持下面的方法方便转换-

 <T> Buffer<T> toScalaSeq(List<T> list) {
        return JavaConversions.asScalaBuffer(list);
    }
© www.soinside.com 2019 - 2024. All rights reserved.