什么是在不相交集上对行进行聚类的正确JavaRDD转换

问题描述 投票:0回答:1

我在JavaPairRDD<String, MyPojo>设置了我的行,其中MyPojo是带有属性的pojo(让我们称之为HashSet<String> values)。

现在我想基于与MyPojo.values的任何交集来聚类(合并)我的行。

例如:

<Row K1 : MyPojo (values: [A,B,C])>

<Row K2 : MyPojo (values: [A,B])>

<Row K3 : MyPojo (values: [D,E,F])>

我想将行与密钥K1, K2合并。

java apache-spark rdd java-pair-rdd
1个回答
0
投票

如果必须找到具有值交集的键,则可以使用以下方法:

    List<Tuple2<String, MyPojo>> data = Lists.newArrayList(
            new Tuple2("K1", new MyPojo("A", "B", "C")),
            new Tuple2("K2", new MyPojo("A", "B")),
            new Tuple2("K3", new MyPojo("D", "E", "F")));
    JavaPairRDD<String, MyPojo> original = jsc().parallelizePairs(data);

    JavaPairRDD<String, String> preparedToJoin = original.flatMapToPair(
            v ->
                    v._2().getValues().stream().map(
                            s -> new Tuple2<String, String>(s, v._1()))
                            .collect(Collectors.toList()).iterator()
    );

    preparedToJoin.join(preparedToJoin)
            .filter(v -> !v._2()._1().equals(v._2()._2()))
             // remove one of: (K1,K2), (K2,K1)
            .filter(v -> v._2()._1().compareTo(v._2()._2()) <= 0)
            .values()
            .distinct().foreach(v -> System.out.println(v));

输出是:

(K1,K2)
© www.soinside.com 2019 - 2024. All rights reserved.