spark rdd:分组和过滤

问题描述 投票:-1回答:1

我有一个对象的Rdd“labResults”:

case class LabResult(patientID: String, date: Long, labName: String, value: String)

我想转换这个rdd,使得每个patientID和labName组合只包含一行。此行应该是patientID和labName组合的最新行(我只对患者有此实验的最新日期感兴趣)。我是这样做的:

//group rows by patient and lab and take only the last one
val cleanLab = labResults.groupBy(x => (x.patientID, x.labName)).map(_._2).map { events =>
  val latest_date = events.maxBy(_.date)
  val lab = events.filter(x=> x.date == latest_date)
  lab.take(1)
}

我想要从这个RDD创建边缘:

val edgePatientLab: RDD[Edge[EdgeProperty]] = cleanLab
  .map({ lab =>
    Edge(lab.patientID.toLong, lab2VertexId(lab.labName), PatientLabEdgeProperty(lab).asInstanceOf[EdgeProperty])
  })

我收到一个错误:

value patientID is not a member of Iterable[edu.gatech.cse6250.model.LabResult]

[error] Edge(lab.patientID.toLong,lab2VertexId(lab.labName),PatientLabEdgeProperty(lab).asInstanceOf [EdgeProperty])[error] ^ [error] / hw4 / stu_code / src / main / scala / edu / gatech / cse6250 / graphconstruct / GraphLoader.scala:94:53:值labName不是Iterable的成员[edu.gatech.cse6250.model.LabResult] [error] Edge(lab.patientID.toLong,lab2VertexId(lab.labName),PatientLabEdgeProperty (实验室).asInstanceOf [EdgeProperty])[错误] ^ [错误] /hw4/stu_code/src/main/scala/edu/gatech/cse6250/graphconstruct/GraphLoader.scala:94:86:类型不匹配; [error] found:Iterable [edu.gatech.cse6250.model.LabResult] [error] required:edu.gatech.cse6250.model.LabResult [error] Edge(lab.patientID.toLong,lab2VertexId(lab.labName),PatientLabEdgeProperty (实验室).asInstanceOf [EdgeProperty正是])

所以,看起来问题是“cleanLab”也不像我预期的那样是LabResult的RDD,而是Iterable的RDD [edu.gatech.cse6250.model.LabResult]

我该怎么办呢?

scala apache-spark rdd
1个回答
0
投票

这是第一部分的方法。有关Edge和其他课程的内容我无法帮助,因为我不知道它们来自哪里(来自here?)

scala> val ds = List(("1", 1, "A", "value 1"), ("1", 3, "A", "value 3"), ("1", 3, "B", "value 3"), ("1", 2, "A", "value 2"), ("1", 3, "B", "value 3"), ("1", 5, "B", "value 5") ).toDF("patientID", "date", "labName", "value").as[LabResult]
ds: org.apache.spark.sql.Dataset[LabResult] = [patientID: string, date: int ... 2 more fields]

scala> ds.show
+---------+----+-------+-------+
|patientID|date|labName|  value|
+---------+----+-------+-------+
|        1|   1|      A|value 1|
|        1|   3|      A|value 3|
|        1|   3|      B|value 3|
|        1|   2|      A|value 2|
|        1|   3|      B|value 3|
|        1|   5|      B|value 5|
+---------+----+-------+-------+


scala> val grouped = ds.groupBy("patientID", "labName").agg(max("date") as "date")
grouped: org.apache.spark.sql.DataFrame = [patientID: string, labName: string ... 1 more field]

scala> grouped.show
+---------+-------+----+
|patientID|labName|date|
+---------+-------+----+
|        1|      A|   3|
|        1|      B|   5|
+---------+-------+----+


scala> val cleanLab = ds.join(grouped, Seq("patientID", "labName", "date")).as[LabResult]
cleanLab: org.apache.spark.sql.Dataset[LabResult] = [patientID: string, labName: string ... 2 more fields]

scala> cleanLab.show
+---------+-------+----+-------+
|patientID|labName|date|  value|
+---------+-------+----+-------+
|        1|      A|   3|value 3|
|        1|      B|   5|value 5|
+---------+-------+----+-------+


scala> cleanLab.head
res45: LabResult = LabResult(1,3,A,value 3)

scala>
© www.soinside.com 2019 - 2024. All rights reserved.