我有一个包含多种不同类型的json的流,每个都与用户事件有关,最有效的分割和聚合方式是什么

问题描述 投票:1回答:1

我有一个包含多种不同类型的json消息的流。共有65种json事件类型,都有不同的模式。它们共享一个共同的用户ID。

{'id': 123, 'event': 'clicked', 'target': 'my_button'}
{'id': 123, 'event': 'viewed', 'website': 'http://xyz1...'}
{'id': 123, 'event': 'viewed', 'website': 'http://xyz2...'}
{'id': 123, 'event': 'login', 'username': 'Bob'}
{'id': 456, 'event': 'viewed', 'website': 'http://xyz3...'}
{'id': 456, 'event': 'login', 'username': 'Susy'}

我想处理所有事件类型,每个类型都有自定义字段,然后按用户在所有过滤器类型中汇总所有事件。

{'id': 123, 'page_view_cnt': 100, 'user': 'Bob', 'click_cnt': 20}
{'id': 456, 'page_view_cnt': 14, 'user': 'Susy'}

有谁知道这样做的有效方法。这是当前的思考过程

  • 从流线开始
  • 使用GSON来解析json,而不是使用可能尝试推断类型的内置json解析器。
  • 根据每种类型创建65个过滤器语句。 json将有event = xyz我可以区分。
  • 将每个筛选器上的自定义属性聚合为用户ID - >属性的映射
  • 合并所有过滤器中的所有地图

听起来是否合理还是有更好的方法吗?

json apache-spark events stream
1个回答
1
投票

以下是我使用RDD API和Jackson提出的建议。我选择了低级Spark API,因为它是无模式的,不确定结构化API如何适应变量输入事件类型。如果提到的Gson支持多态反序列化,它可以代替Jackson使用,我只是选择Jackson,因为我对它更熟悉。

问题可分为几个步骤:

  1. 通过事件类型将输入反序列化为对象。
  2. 减少id和类型。对于不同类型,reduce需要表现不同,例如,视图简单地减少为总和,而用户名需要以不同的方式处理。在这个例子中,让我们假设用户名在id中是唯一的,并选择第一个。
  3. 通过id收集减少的物品。

第2步需要得到最多的关注,因为Spark API中没有这样的功能,如果反序列化的事件属于不同的类,则需要进行某种运行时检查。为了克服这个问题,让我们介绍一个可以封装不同类型的通用特征Reducible

trait Reducible[T] {
    def reduce(that: Reducible[_]): this.type

    def value: T
}

// simply reduces to sum
case class Sum(var value: Int) extends Reducible[Int] {
    override def reduce(that: Reducible[_]): Sum.this.type = that match {
        case Sum(thatValue) =>
            value += thatValue
            this
    }
}

// for picking the first element, i.e. username
case class First(value: String) extends Reducible[String] {
    override def reduce(that: Reducible[_]): First.this.type = this
}

运行时检查在这些类中处理,例如,如果右侧对象的类型不同,Sum将失败。

接下来,让我们定义事件的模型,并告诉Jackson如何处理多态:

@JsonTypeInfo(use=JsonTypeInfo.Id.NAME, include=JsonTypeInfo.As.PROPERTY, property="event", visible=true)
sealed trait Event[T] {
    val id: Int
    val event: String

    def value: Reducible[T]
}

abstract class CountingEvent extends Event[Int] {
    override def value: Reducible[Int] = Sum(1)
}

@JsonTypeName("clicked") case class Click(id: Int, event: String, target: String) extends CountingEvent
@JsonTypeName("viewed") case class View(id: Int, event: String, website: String) extends CountingEvent
@JsonTypeName("login") case class Login(id: Int, event: String, username: String) extends Event[String] {
    override def value: Reducible[String] = First(username)
}

object EventMapper {
    private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
    // the list of classes could be auto-generated, see
    // https://stackoverflow.com/questions/34534002/getting-subclasses-of-a-sealed-trait
    mapper.registerSubtypes(classOf[Click], classOf[View], classOf[Login])

    def apply(v1: String): Event[_] = mapper.readValue(v1, classOf[Event[_]])
}

所有事件预计都有idevent字段。后者用于确定要反序列化的类,杰克逊需要预先知道所有类。特征Event被声明为密封特征,因此所有实现类都可以在编译时确定。我省略了这个反思步骤并简单地对类列表进行了硬编码,这里有一个很好的答案如何自动完成Getting subclasses of a sealed trait

现在我们准备编写应用程序逻辑了。为简单起见,sc.parallelize用于加载示例数据。也可以使用Spark流式传输。

val in = List(
    "{\"id\": 123, \"event\": \"clicked\", \"target\": \"my_button\"}",
    "{\"id\": 123, \"event\": \"viewed\", \"website\": \"http://xyz1...\"}",
    "{\"id\": 123, \"event\": \"viewed\", \"website\": \"http://xyz1...\"}",
    "{\"id\": 123, \"event\": \"login\", \"username\": \"Bob\"}",
    "{\"id\": 456, \"event\": \"login\", \"username\": \"Sue\"}",
    "{\"id\": 456, \"event\": \"viewed\", \"website\": \"http://xyz1...\"}"
)

// partition (id, event) pairs only by id to minimize shuffle
// when we later group by id
val partitioner = new HashPartitioner(10) {

    override def getPartition(key: Any): Int = key match {
        case (id: Int, _) => super.getPartition(id)
        case id: Int => super.getPartition(id)
    }
}

sc.parallelize(in)
    .map(EventMapper.apply)
    .keyBy(e => (e.id, e.event))
    .mapValues(_.value)
    .reduceByKey(partitioner, (left, right) => left.reduce(right))
    .map {
        case ((id, key), wrapper) => (id, (key, wrapper.value))
    }
    .groupByKey(partitioner)
    .mapValues(_.toMap)
    .foreach(println)

输出:

(123,Map(clicked -> 1, viewed -> 2, login -> Bob))
(456,Map(login -> Sue, viewed -> 1))
© www.soinside.com 2019 - 2024. All rights reserved.