我想使用 htpp4s 将 Doobie 查询的结果作为 JSON 进行流式传输。核心问题是,错误(例如格式错误的查询)仅在 http4s 已发送标头(状态正常)并开始传输正文时在流评估期间发生。
因此,我尝试创建一个函数
peekFirstElement
,它将实现将第一个块提取到封闭 IO 期间发生的错误。
我已经陷入了死胡同,因为当前的实现在流式传输第一个块后失败,并且 Postgres 出现异常,结果集已经关闭。
如果只有一个块,则传递第一个块的最后一项的
next()
表示结果集结束。但不知何故,我的代码试图从封闭集中拉出另一个块。
在有多个块的情况下,doobie 在查看第一个块后显然会调用
close
。所以我的代码在第一个块之后也会失败,但是以不同的方式。
如果我不使用
peekFirstElement
功能,我可以成功传输结果。
有什么想法吗?
当将 Doobie 流上的块大小更改为仅 2 时,我也仅在结果集关闭错误之前将第一个块作为输出。在这种情况下,Doobie 以某种方式调用了
close
的 PgResultSet
。
/** Peeks at first element of stream to materialize any exceptions that might occur. */
def peekFirstElement[O](stream: Stream[IO, O]): IO[Stream[IO, O]] = {
val x: Pull[IO, Stream[IO, O], Unit] = stream.pull.uncons.flatMap {
case Some((first, tl)) =>
Pull.output1(Stream.chunk(first) ++ tl)
case None =>
Pull.output1(Stream.empty)
}
x.stream.compile.onlyOrError
}
def routes(conns: List[(Regex, Transactor[IO])]): HttpRoutes[IO] = {
val config = FileService.Config[IO](absolutePath)
//post process the file content by interpreting it as a query and executing it
val pathCollector: FileService.Fs2PathCollector[IO] = (f, cfg, request) => {
conns.find(_._1.matches(request.pathInfo.toString))
.map { case (_, transactor) =>
for {
content <- config.fs2PathCollector(f, cfg, request)
query <- OptionT.liftF(content.body.compile.toList.map(bytes => new String(bytes.toArray)))
_ <- OptionT.liftF(logger[IO].debug(s"executing query: \"$query\""))
outerQuery = sql"select to_json(querydata) from (${Fragment.const(query)}) as querydata".query[Json]
queryResult = outerQuery.stream.transact(transactor)
queryPeeked <- OptionT.liftF(peekFirstElement(queryResult))
streamList = Stream.emit(Token.StartArray) ++ queryPeeked.through(tokenize) ++ Stream(Token.EndArray)
response <- OptionT.liftF(
Ok(streamList.through(fs2.data.json.render.compact).through(fs2.text.utf8.encode)).handleErrorWith(
e => InternalServerError(e.toString)
)
)
} yield response
}.getOrElse(config.fs2PathCollector(f, cfg, request))
}
fileService[IO](FileService.Config[IO](absolutePath, pathCollector, "", NoopCacheStrategy[IO], 32 * 1024))
}
我能够使用 Postgres 在本地重现,无需 htt4s 部分。
我认为问题在于
fs2.Stream
在后台处理资源范围,并在底层 ResultSet
完成时关闭对 Pull
的读取,因此使用底层拉动是不安全的。
将
queryResult
拆分为 queryResult.head
和 queryResult.tail
,然后在返回响应之前显式读取头部可以解决该问题。我已经检查过并且它对我有用。因此,请考虑使用下面的代码片段调整相应的行:
headOpt <- OptionT.liftF(queryResult.head.compile.last) // evaluate head
queryPeeked = Stream.fromOption(headOpt) ++ queryResult.tail