我遇到了 Pekko 代码的一些问题,我相信这与默认 Flow 已经具体化数据这一事实有关:
def addSha(request: HttpRequest)(using
as: ActorSystem[Any],
ec: ExecutionContext
): Future[HttpResponse] =
request.entity.dataBytes
.via(computeHashWithPayloadAndPayloadLength)
.map { out =>
request
.withEntity(out._2)
.addHeader(new RawHeader("sha", out._1.digest().map("%02x".format(_)).mkString))
}
.via(Http().outgoingConnection)
.runWith(Sink.head)
private def computeHashWithPayloadAndPayloadLength: Flow[ByteString, (MessageDigest, ByteString, Int), NotUsed] =
Flow[ByteString].fold((MessageDigest.getInstance("SHA-256"), ByteString.empty, 0)) { (acc, chunk) =>
acc._1.update(chunk.toByteBuffer)
(acc._1, acc._2 ++ chunk, acc._3 + chunk.length)
}
基本上,我需要请求正文来计算哈希并将其添加到标头中,迫使我使用源。如果我评论这一行
//.withEntity(out._2)
它返回错误:
子流源不能多次实现
因为我使用的流程是默认的 Pekko Http 流程 (Http().outgoingConnection),并且它似乎实现了数据。通过使用 .withEntity,我正在创建另一个实体流,然后可以在下次使用该实体流。
现在我的问题:有什么方法可以解决这个问题(也许通过使用另一个 pekko http 流),而不必使用哈希计算部分重新实现 Http().outgoingConnection ?
这会产生你想要的:
object HttpRequestWithContentHashHeader extends App {
implicit val system: ActorSystem = ActorSystem()
import system.dispatcher
implicit val http: HttpExt = Http(system)
def computeHashWithPayloadAndPayloadLength: Flow[ByteString, (MessageDigest, ByteString, Int), NotUsed] =
Flow[ByteString].fold((MessageDigest.getInstance("SHA-256"), ByteString.empty, 0)) { (acc, chunk) =>
acc._1.update(chunk.toByteBuffer)
(acc._1, acc._2 ++ chunk, acc._3 + chunk.length)
}
val request = HttpRequest(
method = HttpMethods.POST,
uri = "http://localhost:8080",
entity = HttpEntity("payload")
)
val hashFuture = request.entity.dataBytes
.via(computeHashWithPayloadAndPayloadLength)
.runWith(Sink.head)
.map { case (digest, _, _) =>
RawHeader("X-Content-Hash", digest.digest().map("%02x".format(_)).mkString)
}
hashFuture.flatMap { hashHeader =>
http.singleRequest(request.withHeaders(hashHeader))
}
}