aggregateByKey不更新初始集合的值

问题描述 投票:0回答:1

hllTotal中的值已更新,但每个键的hllToday仍为零。谁能帮忙,为什么hllToday此处未更新?

 val hllToday: HllSerializable = new HllSerializable(new HLL(13, 5))
 val hllTotal: HllSerializable = new HllSerializable(new HLL(13, 5))

 val initialSet = (hllToday, hllTotal)
 val rdd7 = combinedRdd.map(tuple => {
      tuple.segmentId -> (tuple.entryTime, tuple.exitTime, tuple.hll)`
    }).aggregateByKey(initialSet)(
      (acc: (HllSerializable, HllSerializable), v: (Long, Long, HllSerializable)) => {
        if (v._1 == today_ts) {
          acc._1.getHll.union(v._3.getHll)
        }
        acc._2.getHll.union(v._3.getHll)
        acc
      }, (acc1: (HllSerializable, HllSerializable), acc2: (HllSerializable, HllSerializable)) =>    {
        acc1._1.getHll.union(acc2._1.getHll)
        acc1._2.getHll.union(acc2._2.getHll)
        acc1
      }
    )
scala apache-spark hadoop bigdata
1个回答
0
投票

显然,此代码从未调用过:

if (v._1 == today_ts) {
    acc._1.getHll.union(v._3.getHll)
 }

尝试用if (v._1 == today_ts)替换if(true)以查看hllToday是否被更新

© www.soinside.com 2019 - 2024. All rights reserved.