处理 Scala FS2 流转换流程中的异常

问题描述 投票:0回答:1
import cats.effect.{IO, IOApp}
import fs2.Pipe
import fs2.Stream

object Test extends IOApp.Simple {

  final case class Student(id: Int, name: String)

  private val studentData: Map[Int, Student] = Map(1 -> Student(1, "Olivia"), 2 -> Student(2, "Liam"))

  override def run: IO[Unit] = {

    def firstTransformation: Pipe[IO, Int, Student] = {
      stream =>
        stream.evalMap(id => IO(studentData(id))) // Here database or an external API call is performed.
    }

    def secondTransformation: Pipe[IO, Student, String] = {
      stream =>
        stream.evalMap(student => IO(student.name)) // Here database or an external API call is performed.
    }

    val sourceStream: Stream[IO, Int] = Stream(1, 2, 3).repeat

    sourceStream
      .through(firstTransformation)
      .through(secondTransformation)
      .map(name => println(s"name $name"))
      .compile
      .drain
  }
}

如何处理FS2流转换中的异常,以确保即使转换中途出现异常也能保证流不中断? 在当前设置中,流应无限期地仅打印“Olivia”和“Liam”,尽管有任何例外。

它应该输出: 姓名 奥利维亚 姓名利亚姆 姓名 奥利维亚 姓名利亚姆 姓名 奥利维亚 姓名利亚姆 ………… ………… 无限

我们如何设计流并管理异常以确保它不会停止 FS2 流?先谢谢你了。

scala scala-cats cats-effect fs2
1个回答
0
投票

简短的回答:在转换步骤中添加正确的错误处理。

另外:我同意@Daenyth 关于如何表示你的转换函数的观点。一般来说,您应该遵循最小功率原则;如果您的转换只是一个简单的函数,那么将其表示为一个简单的函数,而不是一些复杂的包装类型。这可以让你的函数在不同的情况下被更灵活地调用。

// return None instead of throwing if the student doesn't exist
def lookupStudent(id: Int): IO[Option[Student]]

def getStudentName(student: Student): IO[String]

在上面,

lookupStudent
步骤说明了“没有这样的学生”结果作为返回类型的一部分而不是作为抛出的异常的可能性。

在这两个步骤中,您都提到可能会发生 API 调用或数据库查找。由于这些可能会以不可预见的方式失败,因此您需要决定在这些情况下应该发生什么。您可以捕获错误并返回

None
,从而有效地吞掉错误。我一般不建议这样做,但是嘿,这是你的程序。

def lookupStudentResiliently(id: Int): IO[Option[Student]] = 
  lookupStudent(id).recover {
    case e: Exception => None
  }

def getStudentNameResiliently(student: Student): IO[Option[String]] =
  getStudentName(student).redeem[Option[String]](
    thrownException => None,
    name => Some(name)
  )

上述

*Resiliently
函数将捕获底层
lookupStudent / getStudentName
IO 引发的错误,并将它们视为查找刚刚返回
None

然后,您可以使用任何各种

fs2.Stream
组合方法将这些查找合并到您的流中。我将在我的示例中使用
evalMapFilter
,因为它符合所需的(?)语义“如果它不起作用,则跳过它”,当“它不起作用”由返回
None 的转换之一表示时
.

sourceStream
  .evalMapFilter(lookupStudentResiliently) // changed
  .evalMapFilter(getStudentNameResiliently) // changed
  .map(name => println(s"name $name"))
  .compile
  .drain
© www.soinside.com 2019 - 2024. All rights reserved.