import cats.effect.{IO, IOApp}
import fs2.Pipe
import fs2.Stream
object Test extends IOApp.Simple {
final case class Student(id: Int, name: String)
private val studentData: Map[Int, Student] = Map(1 -> Student(1, "Olivia"), 2 -> Student(2, "Liam"))
override def run: IO[Unit] = {
def firstTransformation: Pipe[IO, Int, Student] = {
stream =>
stream.evalMap(id => IO(studentData(id))) // Here database or an external API call is performed.
}
def secondTransformation: Pipe[IO, Student, String] = {
stream =>
stream.evalMap(student => IO(student.name)) // Here database or an external API call is performed.
}
val sourceStream: Stream[IO, Int] = Stream(1, 2, 3).repeat
sourceStream
.through(firstTransformation)
.through(secondTransformation)
.map(name => println(s"name $name"))
.compile
.drain
}
}
如何处理FS2流转换中的异常,以确保即使转换中途出现异常也能保证流不中断? 在当前设置中,流应无限期地仅打印“Olivia”和“Liam”,尽管有任何例外。
它应该输出: 姓名 奥利维亚 姓名利亚姆 姓名 奥利维亚 姓名利亚姆 姓名 奥利维亚 姓名利亚姆 ………… ………… 无限
我们如何设计流并管理异常以确保它不会停止 FS2 流?先谢谢你了。
简短的回答:在转换步骤中添加正确的错误处理。
另外:我同意@Daenyth 关于如何表示你的转换函数的观点。一般来说,您应该遵循最小功率原则;如果您的转换只是一个简单的函数,那么将其表示为一个简单的函数,而不是一些复杂的包装类型。这可以让你的函数在不同的情况下被更灵活地调用。
// return None instead of throwing if the student doesn't exist
def lookupStudent(id: Int): IO[Option[Student]]
def getStudentName(student: Student): IO[String]
在上面,
lookupStudent
步骤说明了“没有这样的学生”结果作为返回类型的一部分而不是作为抛出的异常的可能性。
在这两个步骤中,您都提到可能会发生 API 调用或数据库查找。由于这些可能会以不可预见的方式失败,因此您需要决定在这些情况下应该发生什么。您可以捕获错误并返回
None
,从而有效地吞掉错误。我一般不建议这样做,但是嘿,这是你的程序。
def lookupStudentResiliently(id: Int): IO[Option[Student]] =
lookupStudent(id).recover {
case e: Exception => None
}
def getStudentNameResiliently(student: Student): IO[Option[String]] =
getStudentName(student).redeem[Option[String]](
thrownException => None,
name => Some(name)
)
上述
*Resiliently
函数将捕获底层 lookupStudent / getStudentName
IO 引发的错误,并将它们视为查找刚刚返回 None
。
然后,您可以使用任何各种
fs2.Stream
组合方法将这些查找合并到您的流中。我将在我的示例中使用 evalMapFilter
,因为它符合所需的(?)语义“如果它不起作用,则跳过它”,当“它不起作用”由返回 None
的转换之一表示时
.
sourceStream
.evalMapFilter(lookupStudentResiliently) // changed
.evalMapFilter(getStudentNameResiliently) // changed
.map(name => println(s"name $name"))
.compile
.drain