如何防止实体在 Pekko/Akka 中多次实体化?

问题描述 投票:0回答:1

我遇到了 Pekko 代码的一些问题,我相信这与默认 Flow 已经具体化数据这一事实有关:

  def addSha(request: HttpRequest)(using
    as: ActorSystem[Any],
    ec: ExecutionContext
  ): Future[HttpResponse] =
    request.entity.dataBytes
      .via(computeHashWithPayloadAndPayloadLength)
      .map { out =>
        request
          .withEntity(out._2)
          .addHeader(new RawHeader("sha", out._1.digest().map("%02x".format(_)).mkString))

      }
      .via(Http().outgoingConnection)
      .runWith(Sink.head)

  private def computeHashWithPayloadAndPayloadLength: Flow[ByteString, (MessageDigest, ByteString, Int), NotUsed] =
    Flow[ByteString].fold((MessageDigest.getInstance("SHA-256"), ByteString.empty, 0)) { (acc, chunk) =>
      acc._1.update(chunk.toByteBuffer)
      (acc._1, acc._2 ++ chunk, acc._3 + chunk.length)
    }

基本上,我需要请求正文来计算哈希并将其添加到标头中,迫使我使用源。如果我评论这一行

//.withEntity(out._2)
它返回错误:

子流源不能多次实现

因为我使用的流程是默认的 Pekko Http 流程 (Http().outgoingConnection),并且它似乎实现了数据。通过使用 .withEntity,我正在创建另一个实体流,然后可以在下次使用该实体流。

现在我的问题:有什么方法可以解决这个问题(也许通过使用另一个 pekko http 流),而不必使用哈希计算部分重新实现 Http().outgoingConnection ?

scala akka akka-stream akka-http pekko
1个回答
0
投票

这会产生你想要的:

object HttpRequestWithContentHashHeader extends App {
  implicit val system: ActorSystem = ActorSystem()

  import system.dispatcher
  implicit val http: HttpExt = Http(system)

  def computeHashWithPayloadAndPayloadLength: Flow[ByteString, (MessageDigest, ByteString, Int), NotUsed] =
    Flow[ByteString].fold((MessageDigest.getInstance("SHA-256"), ByteString.empty, 0)) { (acc, chunk) =>
      acc._1.update(chunk.toByteBuffer)
      (acc._1, acc._2 ++ chunk, acc._3 + chunk.length)
    }

  val request = HttpRequest(
    method = HttpMethods.POST,
    uri = "http://localhost:8080",
    entity = HttpEntity("payload")
  )

  val hashFuture = request.entity.dataBytes
    .via(computeHashWithPayloadAndPayloadLength)
    .runWith(Sink.head)
    .map { case (digest, _, _) =>
      RawHeader("X-Content-Hash", digest.digest().map("%02x".format(_)).mkString)
    }

   hashFuture.flatMap { hashHeader =>
     http.singleRequest(request.withHeaders(hashHeader))
  }
}

© www.soinside.com 2019 - 2024. All rights reserved.