我之前已经使用 ScalaPB 和以下编组器在 Scala 中进行了解组工作:
implicit def marshaller[T <: GeneratedMessage]: ToEntityMarshaller[T] = PredefinedToEntityMarshallers.ByteArrayMarshaller.compose[T](r => r.toByteArray)
implicit def unmarshaller[T <: GeneratedMessage with GeneratedMessageCompanion[Request]](implicit companion: GeneratedMessageCompanion[Request]): FromEntityUnmarshaller[Request] = {
Unmarshaller.byteArrayUnmarshaller.map[Request](bytes => companion.parseFrom(bytes))
}
这允许我的
Route
接受类型为Request
的传入消息,定义为:
syntax = "proto3";
package PROTOS;
option java_package = "hydra.core.messaging.protobuf";
message RegisterRequest {
string username = 1;
optional string password = 2;
}
message Request {
string hostID = 1;
oneof requestType {
RegisterRequest registerRequest = 2;
}
}
我在系统中添加了另一个
Route
,它接受 DataRequest
类型。这定义为:
syntax = "proto3";
package PROTOS;
option java_package = "hydra.core.messaging.protobuf";
message DataRequest {
string hostID = 1;
string data = 2;
}
因此,我修改了 AKKA 参与者和路由,以使用通配符类型作为它们接收和响应的消息类型,定义为:
final case class ActorRequest[T, E](request: T, replyTo: ActorRef[ActorResponse[E]])
final case class ActorResponse[T](response: T)
为了减少重复代码,我将
Route
创建移至超类中。超类Layer
看起来像:
trait Marshalling extends DefaultJsonProtocol with SprayJsonSupport {
implicit def marshaller[E <: GeneratedMessage]: ToEntityMarshaller[E] = PredefinedToEntityMarshallers.ByteArrayMarshaller.compose[E](r => r.toByteArray)
implicit def unmarshaller[T <: GeneratedMessage with GeneratedMessageCompanion[T]](implicit companion: GeneratedMessageCompanion[T]): FromEntityUnmarshaller[T] = {
Unmarshaller.byteArrayUnmarshaller.map[T](bytes => companion.parseFrom(bytes))
}
}
abstract class Layer[T <: GeneratedMessage, E <: GeneratedMessage](name: String, directivePath: String)
extends CORSHandler with Marshalling {
implicit val timeout: Timeout = Timeout.create(SYSTEM.settings.config.getDuration("my-app.routes.ask-timeout"))
private var systemActor: ActorRef[ActorRequest[T, E]] = null
def createResponse(request: T): ActorResponse[E]
private def createRoutes(): Route = {
pathPrefix(HOST_ID) {
path(directivePath) {
post {
entity(as[T]) { request =>
onComplete(handle(request)) {
case Success(response) =>
complete(response.response)
case Failure(exception) => complete(InternalServerError, s"An error occurred ${exception.getMessage}")
}
}
}
}
}
}
...
}
切换到通配符 Unmarshaller 时,出现以下错误:
I found:
akka.http.scaladsl.unmarshalling.Unmarshaller.
messageUnmarshallerFromEntityUnmarshaller[T](
akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport.
sprayJsonUnmarshaller[T](/* missing */summon[spray.json.RootJsonReader[T]])
)
But no implicit values were found that match type spray.json.RootJsonReader[T].
entity(as[T]) { request =>
有这方面的专家可以帮助我找出问题吗?该错误似乎是在抱怨它不是
FromRequestMarshaller
,但之前定义类类型时的 Unmarshaller 也不是。有什么建议吗?
最小可重现示例:https://github.com/ritcat14/Hydra_broken_marshalling
因此,虽然这“解决”了问题,但这是一个令人讨厌的解决方法,并将修复放在 protobuf 端而不是修复通配符编组,但目前这是可行的。
我创建了一个
ProtoMessage
protobuf,它只有一个包含所有消息类型的 oneof
字段,如下所示:
syntax = "proto3";
import "Request.proto";
import "DataRequest.proto";
import "Response.proto";
import "DataResponse.proto";
package PROTOS;
option java_package = "hydra.core.messaging.protobuf";
message UnknownType {}
message ProtoMessage {
string hostID = 1;
oneof messageType {
Request request = 2;
DataRequest dataRequest = 3;
Response response = 4;
DataResponse dataResponse = 5;
UnknownType unknownType = 6;
}
}
然后我改为编组/取消编组该对象:
implicit def protobufMarshaller[T <: GeneratedMessage]: ToEntityMarshaller[ProtoMessage] = PredefinedToEntityMarshallers.ByteArrayMarshaller.compose[ProtoMessage](r => r.toByteArray)
implicit def requestMarshaller[T <: GeneratedMessage with GeneratedMessageCompanion[ProtoMessage]](implicit companion: GeneratedMessageCompanion[ProtoMessage]): FromEntityUnmarshaller[ProtoMessage] = {
Unmarshaller.byteArrayUnmarshaller.map[ProtoMessage](bytes => companion.parseFrom(bytes))
}
我用
ProtoMessage
替换了所有通配符类类型。
这是一个解决方案,是的。可怕吗?绝对地。这是永久修复吗?没有。与此修复程序一样,我仍然愿意接受建议,每次我收到每个
messageType
的消息时,我都必须在 Route
字段上进行过滤。