我正在尝试为JSON转换添加重试逻辑。将对象转换为json时,如果有任何异常,我将重试3次。我在做 :
var mapper = new ObjectMapper() with ScalaObjectMapper
intializeMapper( )
def intializeMapper() = {
// jackson library does not support seralization and deserialization of
// of scala classes like List and Map, this is needed to support it
mapper.registerModule( DefaultScalaModule )
// enables parsing of NaN. Enabling it here as JsonUtil class currently in
// use supports it.
mapper.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true )
mapper.setSerializationInclusion(Include.NON_NULL)
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
}
def getPersonRDD(result: DataFrame): RDD[(String, String)] = {
val finalValue = result.rdd.map({
r =>
val customerId = r.getAs[String](CUSTOMER_ID)
val itemId = r.getAs[Map[String, Int]](ITEM_ID)
val itemName = r.getAs[Map[String, Int]](ITEM_NAME)
val person = Person(itemId, itemName)
val jsonString = toJson(person)
(customerId, jsonString)
})
return finalValue
}
def fromJson(json: String, clazz: Class[_]) = {
mapper.readValue(json, clazz)
}
def toJson(value: Any): String = {
var jsonString: String = " "
jsonString = mapper.writeValueAsString(value)
try {
fromJson(jsonString, clazz)
return jsonString
} catch {
case Exception => {
publishMetrics(PARSING_EXCEPTION, 1.0)
val result = util.Try(retry() {
jsonString = mapper.writeValueAsString(value)
val features = fromJson(jsonString, clazz)
})
result match {
case util.Success(value) => jsonString
case util.Failure(error) => {
log.error("Error while parsing JSON " + jsonString)
return jsonString
}
}
}
}
}
// Returning T, throwing the exception on failure
@annotation.tailrec
def retry[T](n: Int = 3)(fn: => T): T = {
util.Try {
fn
} match {
case util.Success(x) => x
case _ if n > 1 => retry(n - 1)(fn)
case util.Failure(e) => throw e
}
}
case class Person(itemId: Map[String, Int], itemName: Map[String, Int]) extends Serializable
它是否正确 ?我是Scala的新手。有人可以建议我,如果有更好的方法来实现这一目标? Scala中是否有预定义的重试逻辑?我尝试为JSON转换添加重试逻辑的原因是由于我使用的Jackson版本(我现在无法更改),有时我的writeValueAsString导致不完整的JSON。
你重试功能似乎是正确的。我能想到的唯一缺陷是,如果你期望某些东西会失败,那么最好只返回类型Try[T]
,这样你就可以用scala方式处理它。
这是我的一个实现:
def retry[T](n: Int)(block: => T): Try[T] = {
val stream = Stream.fill(n)(Try(block))
stream find (_.isSuccess) getOrElse stream.head
}