我试图在 Http4s 服务中的设定时间间隔后不断刷新一个值。到目前为止,它被定义为一个保持其初始值的 val。我想继续将其视为指定值(例如,与效果相对),但允许它按设定的时间表更改其值。
我已经制作了一个我认为是最小的示例,其中
freshValue
需要每 X 时间重新生成一次(现在它只是一个初始化为对生成新值的函数的单个调用的 val)。
package com.example
import scala.concurrent.ExecutionContext
import cats.effect.{ExitCode, IO, IOApp}
import cats.syntax.all._
import cats.Monad
import org.http4s.blaze.server.BlazeServerBuilder
import org.http4s.dsl.Http4sDsl
import org.http4s.{HttpApp, HttpRoutes}
object ExampleService extends IOApp {
val freshValue: Int = generateNewValue() // re-generate every X minutes
def generateNewValue(): Int = scala.util.Random.nextInt(42)
def routes[F[_]: Monad]: HttpRoutes[F] = {
val dsl = Http4sDsl[F]
import dsl._
HttpRoutes.of[F] {
case GET -> Root / "ping" => Ok("pong")
case GET -> Root / "fresh-value" => Ok(s"$freshValue")
}
}
def allRoutes[F[_]: Monad]: HttpApp[F] =
routes.orNotFound
override def run(args: List[String]): IO[ExitCode] = {
val forkJoinPool = ExecutionContext.fromExecutor(new java.util.concurrent.ForkJoinPool(8))
BlazeServerBuilder[IO]
.withExecutionContext(forkJoinPool)
.bindHttp(1234, "localhost")
.withHttpApp(allRoutes)
.resource
.use(_ => IO.never)
.as(ExitCode.Success)
}
}
我更愿意继续将其作为原始类型处理,而不是效果。如果它需要是一种效果,我希望它是与效果无关的(不一定是
IO
)。刷新的值本身(以及依赖于它的其他值)是在案例类的深处定义的,在整个应用程序中都会引用该案例类。我不想改变案例类及其属性/方法的使用,这些属性/方法一直保持无效果(因为它们从服务开始就没有任何理由进行更改)。在每个时间间隔过去后,我可以创建案例类的新实例/副本。我愿意接受涉及 fs2、cats/cats-effect 和 http4s 的解决方案。
实际用例是令牌需要每 12 小时刷新一次,并且存在一个案例类,它定义了一些在需要新令牌时需要重新生成的客户端和属性。 (基本上,它是一个包含一组资源的案例类,其中一些资源取决于令牌本身。)
在寻找解决方案时,我遇到了使用 fs2 Streams 来创建一种效果,该效果在每个设定的时间间隔后进行评估,但我在调整它时遇到了麻烦,这样我就可以留下一个纯值我可以使用它的预期类型:
import cats.effect.unsafe.implicits.global
import cats.effect.{ExitCode, IO}
import cats.syntax.all._
import fs2.Stream
import scala.concurrent.duration._
val freshValue: IO[ExitCode] = Stream.awakeEvery[IO](3.seconds)
.evalMap(_ => IO({ println("triggered another operation"); scala.util.Random.nextInt(10) }))
.compile
.drain
.as(ExitCode.Success)
我不确定如何创建一个包含我想要的类型(不是 ExitCode)的效果,显然我不想调用类似
freshValue.unsafeRunSync()
的东西来访问案例中的值本身不使用效果的类。我想知道使用此 fs2 Stream 将如何影响依赖于其值的 vals 和 case 类,如果这些需要成为效果,或者如果完全不同的方法在这里更有意义。
如果我的要求没有意义,我很抱歉,我仍在习惯使用 Cats 进行编程,并且我的组织没有其他人具有这些框架的经验,所以我正在边学习边学习。
这个问题其实很常见。
我将分步骤分享一般解决方案,然后您可以根据您的具体用例进行调整。另外,我个人更喜欢直接使用
IO
,但你可以轻松地将代码修改为无标签的最终样式;即 F[_]
。
我们首先需要一个能够计算您需要的值的函数。
此类函数可能独立于当前状态,或者可能希望接收当前状态作为输入以生成新状态。
val generateFreshValue: IO[Value] = ...
// Or.
def generateFreshValue(oldValue: Value): IO[Value] = ...
// Tagless final:
def generateFreshValue[F[_]: TC]: F[OldValue] = ...
// Where TC is whatever constraint you need.
// Also, you may want to put this inside a trait so you can mock it for tests.
trait FreshValueGenerator[F[_]]:
def generateFreshValue: F[Value] // IO[Value]
使依赖于该值的计算接受它作为参数,而不是从全局位置获取它。
// Do this:
def foo(value: Value, ...): Foo = {
...
}
// Rather than this:
def foo(...): Foo = {
val value = SomeGlobalObject.freshValue
...
}
Ref
来包含值由于值会随着时间的推移而发生变化,因此我们需要将其封装成可以安全地处理并发环境中的变化的东西。 cats-effect 已经提供了,称为
Ref
:
val createValueHolder: IO[Ref[IO, Value]] = IO.ref(initialValue)
// TF style:
def createValueHolder[F[_]: Concurrent]: F[Ref[F, Value]] = Ref.of[F](initialValue)
在第二步中,我们重构了该值的所有用法,以将其请求为输入参数。这会导致上层要么执行相同的操作,要么引用常量/全局/虚拟值。
现在,我们将重构它们以从
Ref
获取值,然后使用它。
// Do this:
final class Logic(valueHolder: Ref[IO, Value]) {
def run(...): IO[Unit] =
valueHolder.get.flatMap { freshValue =>
val foo = foo(freshValue, ...)
...
}
}
// Rather than this:
final class Logic {
def run(...): IO[Unit] = {
val foo = foo(someGlobalFreshValue, ...)
...
}
}
首先我们定义一个经常刷新值的程序:
def refreshValue(valueHolder: Ref[IO, Value], refreshEvery: Duration): IO[Unit] =
(
IO.sleep(refreshEvery) >>
generateFreshValue.flatMap(valueHolder.set)
).foreverM
然后你可以让它在你的应用程序后台运行。
有很多方法可以做到这一点,但我更喜欢使用
background
组合器和组合资源。
您可能希望将刷新过程和 Ref 打包在一个模块中。
这就是您真正需要的。
现在,我将把所有这些应用到您的示例代码上,添加一些允许代码轻松修改的结构。
import cats.Monad
import cats.effect.{Async, Concurrent, IO, IOApp, Ref}
import cats.effect.syntax.all._
import cats.syntax.all._
import org.http4s.dsl.Http4sDsl
import org.http4s.server.Server
import org.http4s.ember.server.EmberServerBuilder
object ExampleService extends IOApp.Simple {
private val app: Resource[IO, Server] =
for {
valueGenerator <- ValueGenerator.make[IO]
valueHolder <- ValueHolder.make[IO](valueGenerator)
logic = Logic.make(valueGetter)
server <- HttpServer.make[IO](logic)
} yield server
override final val run: IO[Unit] =
app.useForever
}
trait ValueGenerator[F[_]] {
def generateFreshValue: F[Value]
}
object ValueGetter {
def make[F[_]]: Resource[F, ValueGetter[F]] =
...
}
trait ValueHolder[F[_]] {
def getValue: F[Value]
}
object ValueHolder {
def make[F[_]: Concurrent](
valueGenerator: ValueGenerator[F]
): Resource[F, ValueHolder[F]] =
for {
initialValue <- valueGenerator.generateFreshValue.toResource
valueRef <- Ref.make[F](initialValue).toResource
refreshProcess = valueGenerator.generateFreshValue.flatMap(ref.set)
_ <- refreshProcess.background
} yield new ValueHolder[F] {
override final val getValue: F[Value] =
ref.get
}
}
trait Logic[F[_]] {
def run: F[String]
}
object Logic {
def make[F[_]: Monad](
valueHolder: ValueHolder[F]
): Logic[F] =
new Logic[F] {
override final val run: F[String] =
valueHolder.getValue.map { currentValue =>
s"The current value is: '${currentValue}'"
}
}
}
objects HttpServer {
private def routes[F[_]: Monad](
logic: Logic[F]
): HttpRoutes[F] = {
val dsl = Http4sDsl[F]
import dsl._
HttpRoutes.of[F] {
case GET -> Root / "ping" => Ok("pong")
case GET -> Root / "value" => Ok(logic.run)
}
}
def make[F[_]: Async](
logic: Logic[F]
): Resource[F, Server] =
EmberServerBuilder
.default[F]
.withPort(port"8080")
.withHost(host"0.0.0.0")
.withHttpApp(routes(logic).orNotFound)
.build
}