查看使用 fs2 实现错误的 Doobie 查询结果

问题描述 投票:0回答:1

我想使用 htpp4s 将 Doobie 查询的结果作为 JSON 进行流式传输。核心问题是,错误(例如格式错误的查询)仅在 http4s 已发送标头(状态正常)并开始传输正文时在流评估期间发生。

因此,我尝试创建一个函数

peekFirstElement
,它将实现将第一个块提取到封闭 IO 期间发生的错误。 我已经陷入了死胡同,因为当前的实现在流式传输第一个块后失败,并且 Postgres 出现异常,结果集已经关闭。

如果只有一个块,则传递第一个块的最后一项的

next()
表示结果集结束。但不知何故,我的代码试图从封闭集中拉出另一个块。

在有多个块的情况下,doobie 在查看第一个块后显然会调用

close
。所以我的代码在第一个块之后也会失败,但是以不同的方式。

如果我不使用

peekFirstElement
功能,我可以成功传输结果。

有什么想法吗?

编辑

当将 Doobie 流上的块大小更改为仅 2 时,我也仅在结果集关闭错误之前将第一个块作为输出。在这种情况下,Doobie 以某种方式调用了

close
PgResultSet

代码

  /** Peeks at first element of stream to materialize any exceptions that might occur. */
  def peekFirstElement[O](stream: Stream[IO, O]): IO[Stream[IO, O]] = {
    val x: Pull[IO, Stream[IO, O], Unit] = stream.pull.uncons.flatMap {
      case Some((first, tl)) =>
        Pull.output1(Stream.chunk(first) ++ tl)
      case None =>
        Pull.output1(Stream.empty)
    }
    x.stream.compile.onlyOrError
  }

  def routes(conns: List[(Regex, Transactor[IO])]): HttpRoutes[IO] = {
    val config = FileService.Config[IO](absolutePath)
    //post process the file content by interpreting it as a query and executing it
    val pathCollector: FileService.Fs2PathCollector[IO] = (f, cfg, request) => {
      conns.find(_._1.matches(request.pathInfo.toString))
        .map { case (_, transactor) =>
          for {
            content <- config.fs2PathCollector(f, cfg, request)
            query <- OptionT.liftF(content.body.compile.toList.map(bytes => new String(bytes.toArray)))
            _ <- OptionT.liftF(logger[IO].debug(s"executing query: \"$query\""))
            outerQuery = sql"select to_json(querydata) from (${Fragment.const(query)}) as querydata".query[Json]
            queryResult = outerQuery.stream.transact(transactor)
            queryPeeked <- OptionT.liftF(peekFirstElement(queryResult))
            streamList = Stream.emit(Token.StartArray) ++ queryPeeked.through(tokenize) ++ Stream(Token.EndArray)
            response <- OptionT.liftF(
              Ok(streamList.through(fs2.data.json.render.compact).through(fs2.text.utf8.encode)).handleErrorWith(
                e => InternalServerError(e.toString)
              )
            )
          } yield response
        }.getOrElse(config.fs2PathCollector(f, cfg, request))
    }

    fileService[IO](FileService.Config[IO](absolutePath, pathCollector, "", NoopCacheStrategy[IO], 32 * 1024))
  }
scala fs2 doobie
1个回答
0
投票

我能够使用 Postgres 在本地重现,无需 htt4s 部分。

我认为问题在于

fs2.Stream
在后台处理资源范围,并在底层
ResultSet
完成时关闭对
Pull
的读取,因此使用底层拉动是不安全的。

queryResult
拆分为
queryResult.head
queryResult.tail
,然后在返回响应之前显式读取头部可以解决该问题。我已经检查过并且它对我有用。因此,请考虑使用下面的代码片段调整相应的行:

            headOpt <- OptionT.liftF(queryResult.head.compile.last) // evaluate head
            queryPeeked = Stream.fromOption(headOpt) ++ queryResult.tail
© www.soinside.com 2019 - 2024. All rights reserved.