是否有可能动态地反序列化外部,未知长度的,从ByteString
阿卡HTTP流分成域对象?
我称之为无限长HTTP
端点输出,保持增长的一个JSON Array
:
[
{ "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
{ "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
{ "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
{ "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
{ "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
...
] <- Never sees the daylight
我认为play-iteratees-extras必须帮助你。这个库允许通过枚举/ Iteratee模式来解析JSON,当然,不等待接收的所有数据。
例如,以免建立字节“无限”流,它表示“无限大” JSON数组。
import play.api.libs.iteratee.{Enumeratee, Enumerator, Iteratee}
var i = 0
var isFirstWas = false
val max = 10000
val stream = Enumerator("[".getBytes) andThen Enumerator.generateM {
Future {
i += 1
if (i < max) {
val json = Json.stringify(Json.obj(
"prop" -> Random.nextBoolean(),
"prop2" -> Random.nextBoolean(),
"prop3" -> Random.nextInt(),
"prop4" -> Random.alphanumeric.take(5).mkString("")
))
val string = if (isFirstWas) {
"," + json
} else {
isFirstWas = true
json
}
Some(Codec.utf_8.encode(string))
} else if (i == max) Some("]".getBytes) // <------ this is the last jsArray closing tag
else None
}
}
好,该值包含10000(或多个)对象的jsArray。允许定义的情况下的类,将被包含在我们的数组中的每个对象的数据。
case class Props(prop: Boolean, prop2: Boolean, prop3: Int, prop4: String)
现在写解析器,这将是分析每个项目
import play.extras.iteratees._
import JsonBodyParser._
import JsonIteratees._
import JsonEnumeratees._
val parser = jsArray(jsValues(jsSimpleObject)) ><> Enumeratee.map { json =>
for {
prop <- json.\("prop").asOpt[Boolean]
prop2 <- json.\("prop2").asOpt[Boolean]
prop3 <- json.\("prop3").asOpt[Int]
prop4 <- json.\("prop4").asOpt[String]
} yield Props(prop, prop2, prop3, prop4)
}
请参阅doc为jsArray
,jsValues
和jsSimpleObject
。创建结果制作:
val result = stream &> Encoding.decode() ><> parser
从JsonIteratees包Encoding.decode()
将解码字节CharString
。 result
值的类型Enumerator[Option[Item]]
,你可以应用一些iteratee这个枚举启动解析过程。
总体而言,我不知道你是怎么得到字节(解决方案在很大程度上取决于这一点),但我认为这显示您的问题可能的解决方案之一。
我有一个非常类似的问题,试图将Twitter的数据流(无限字符串)解析为一个域对象。我解决了它使用Json4s,就像这样:
case class Tweet(username: String, geolocation: Option[Geo])
case class Geo(latitude: Float, longitude: Float)
object Tweet{
def apply(s: String): Tweet = {
parse(StringInput(s), useBigDecimalForDouble = false, useBigIntForLong = false).extract[Tweet]
}
}
然后,我只是缓冲流,并将其映射到了一条Twitter消息:
val reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(inputStream), "UTF-8"))
var line = reader.readLine()
while(line != null){
store(Tweet.apply(line))
line = reader.readLine()
}
Json4s拥有选项(在该示例中在对象内部或自定义对象,像GEO)的完全支持。因此,你可以把一个选项像我一样,如果现场没有在JSON来了,它将被设置为无。
希望能帮助到你!
我想这应该JsonFraming.objectScanner(Int.MaxValue)
在这种情况下使用。作为文档状态:
返回实现发射有效的JSON块一个“梅开二度计数”的基础框架操作的流程。它扫描输入数据流为有效的JSON对象并返回仅包含这些有效块字节串的数据块。一个可能希望使用此操作符的帧数据的典型实例包括:非常大的阵列
所以,你可以像这样结束了:
val response: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = serviceUrl))
response.onComplete {
case Success(value) =>
value.entity.dataBytes
.via(JsonFraming.objectScanner(Int.MaxValue))
.map(_.utf8String) // In case you have ByteString
.map(decode[MyEntity](_)) // Use any Unmarshaller here
.grouped(20)
.runWith(Sink.ignore) // Do whatever you need here
case Failure(exception) => log.error(exception, "Api call failed")
}