如何在每个设定的时间间隔更新 Http4s 服务中的变量?

问题描述 投票:0回答:1

我试图在 Http4s 服务中的设定时间间隔后不断刷新一个值。到目前为止,它被定义为一个保持其初始值的 val。我想继续将其视为指定值(例如,与效果相对),但允许它按设定的时间表更改其值。

我已经制作了一个我认为是最小的示例,其中

freshValue
需要每 X 时间重新生成一次(现在它只是一个初始化为对生成新值的函数的单个调用的 val)。

package com.example

import scala.concurrent.ExecutionContext

import cats.effect.{ExitCode, IO, IOApp}
import cats.syntax.all._
import cats.Monad
import org.http4s.blaze.server.BlazeServerBuilder
import org.http4s.dsl.Http4sDsl
import org.http4s.{HttpApp, HttpRoutes}

object ExampleService extends IOApp {
  val freshValue: Int = generateNewValue() // re-generate every X minutes

  def generateNewValue(): Int = scala.util.Random.nextInt(42)

  def routes[F[_]: Monad]: HttpRoutes[F] = {
    val dsl = Http4sDsl[F]
    import dsl._

    HttpRoutes.of[F] {
      case GET -> Root / "ping"        => Ok("pong")
      case GET -> Root / "fresh-value" => Ok(s"$freshValue")
    }
  }

  def allRoutes[F[_]: Monad]: HttpApp[F] =
    routes.orNotFound

  override def run(args: List[String]): IO[ExitCode] = {
    val forkJoinPool = ExecutionContext.fromExecutor(new java.util.concurrent.ForkJoinPool(8))

    BlazeServerBuilder[IO]
      .withExecutionContext(forkJoinPool)
      .bindHttp(1234, "localhost")
      .withHttpApp(allRoutes)
      .resource
      .use(_ => IO.never)
      .as(ExitCode.Success)
  }
}

我更愿意继续将其作为原始类型处理,而不是效果。如果它需要是一种效果,我希望它是与效果无关的(不一定是

IO
)。刷新的值本身(以及依赖于它的其他值)是在案例类的深处定义的,在整个应用程序中都会引用该案例类。我不想改变案例类及其属性/方法的使用,这些属性/方法一直保持无效果(因为它们从服务开始就没有任何理由进行更改)。在每个时间间隔过去后,我可以创建案例类的新实例/副本。我愿意接受涉及 fs2、cats/cats-effect 和 http4s 的解决方案。

实际用例是令牌需要每 12 小时刷新一次,并且存在一个案例类,它定义了一些在需要新令牌时需要重新生成的客户端和属性。 (基本上,它是一个包含一组资源的案例类,其中一些资源取决于令牌本身。)

在寻找解决方案时,我遇到了使用 fs2 Streams 来创建一种效果,该效果在每个设定的时间间隔后进行评估,但我在调整它时遇到了麻烦,这样我就可以留下一个纯值我可以使用它的预期类型:

import cats.effect.unsafe.implicits.global
import cats.effect.{ExitCode, IO}
import cats.syntax.all._
import fs2.Stream
import scala.concurrent.duration._

val freshValue: IO[ExitCode] = Stream.awakeEvery[IO](3.seconds)
  .evalMap(_ => IO({ println("triggered another operation"); scala.util.Random.nextInt(10) }))
  .compile
  .drain
  .as(ExitCode.Success)

我不确定如何创建一个包含我想要的类型(不是 ExitCode)的效果,显然我不想调用类似

freshValue.unsafeRunSync()
的东西来访问案例中的值本身不使用效果的类。我想知道使用此 fs2 Stream 将如何影响依赖于其值的 vals 和 case 类,如果这些需要成为效果,或者如果完全不同的方法在这里更有意义。

如果我的要求没有意义,我很抱歉,我仍在习惯使用 Cats 进行编程,并且我的组织没有其他人具有这些框架的经验,所以我正在边学习边学习。

scala scala-cats cats-effect http4s fs2
1个回答
0
投票

这个问题其实很常见。
我将分步骤分享一般解决方案,然后您可以根据您的具体用例进行调整。另外,我个人更喜欢直接使用

IO
,但你可以轻松地将代码修改为无标签的最终样式;即
F[_]

1.计算值

我们首先需要一个能够计算您需要的值的函数。
此类函数可能独立于当前状态,或者可能希望接收当前状态作为输入以生成新状态。

val generateFreshValue: IO[Value] = ...

// Or.
def generateFreshValue(oldValue: Value): IO[Value] = ...

// Tagless final:
def generateFreshValue[F[_]: TC]: F[OldValue] = ...
// Where TC is whatever constraint you need.

// Also, you may want to put this inside a trait so you can mock it for tests.
trait FreshValueGenerator[F[_]]:
  def generateFreshValue: F[Value] // IO[Value]

2.调整价值使用

使依赖于该值的计算接受它作为参数,而不是从全局位置获取它。

// Do this:
def foo(value: Value, ...): Foo = {
  ...
}

// Rather than this:
def foo(...): Foo = {
  val value = SomeGlobalObject.freshValue
  ...
}

3.使用
Ref
来包含值

由于值会随着时间的推移而发生变化,因此我们需要将其封装成可以安全地处理并发环境中的变化的东西。 cats-effect 已经提供了,称为

Ref
:

val createValueHolder: IO[Ref[IO, Value]] = IO.ref(initialValue)

// TF style:
def createValueHolder[F[_]: Concurrent]: F[Ref[F, Value]] = Ref.of[F](initialValue)

4.修改上面一层的value的用法,从Ref

获取

在第二步中,我们重构了该值的所有用法,以将其请求为输入参数。这会导致上层要么执行相同的操作,要么引用常量/全局/虚拟值。
现在,我们将重构它们以从

Ref
获取值,然后使用它。

// Do this:
final class Logic(valueHolder: Ref[IO, Value]) {
  def run(...): IO[Unit] =
    valueHolder.get.flatMap { freshValue =>
      val foo = foo(freshValue, ...)
      ...
    }
}

// Rather than this:
final class Logic {
  def run(...): IO[Unit] = {
    val foo = foo(someGlobalFreshValue, ...)
    ...
  }
}

5.创建一个后台进程,每 X 分钟刷新一次值。

首先我们定义一个经常刷新值的程序:

def refreshValue(valueHolder: Ref[IO, Value], refreshEvery: Duration): IO[Unit] =
  (
    IO.sleep(refreshEvery) >>
    generateFreshValue.flatMap(valueHolder.set)
  ).foreverM

然后你可以让它在你的应用程序后台运行。
有很多方法可以做到这一点,但我更喜欢使用

background
组合器和组合资源。

您可能希望将刷新过程和 Ref 打包在一个模块中。


这就是您真正需要的。

现在,我将把所有这些应用到您的示例代码上,添加一些允许代码轻松修改的结构。

import cats.Monad
import cats.effect.{Async, Concurrent, IO, IOApp, Ref}
import cats.effect.syntax.all._
import cats.syntax.all._
import org.http4s.dsl.Http4sDsl
import org.http4s.server.Server
import org.http4s.ember.server.EmberServerBuilder

object ExampleService extends IOApp.Simple {
  private val app: Resource[IO, Server] =
    for {
      valueGenerator <- ValueGenerator.make[IO]
      valueHolder <- ValueHolder.make[IO](valueGenerator)
      logic = Logic.make(valueGetter)
      server <- HttpServer.make[IO](logic)
    } yield server

  override final val run: IO[Unit] =
    app.useForever
}

trait ValueGenerator[F[_]] {
  def generateFreshValue: F[Value]
}

object ValueGetter {
  def make[F[_]]: Resource[F, ValueGetter[F]] =
    ...
}

trait ValueHolder[F[_]] {
  def getValue: F[Value]
}

object ValueHolder {
  def make[F[_]: Concurrent](
    valueGenerator: ValueGenerator[F]
  ): Resource[F, ValueHolder[F]] =
    for {
      initialValue <- valueGenerator.generateFreshValue.toResource
      valueRef <- Ref.make[F](initialValue).toResource
      refreshProcess = valueGenerator.generateFreshValue.flatMap(ref.set)
      _ <- refreshProcess.background
    } yield new ValueHolder[F] {
      override final val getValue: F[Value] =
        ref.get
    }
}

trait Logic[F[_]] {
  def run: F[String]
}

object Logic {
  def make[F[_]: Monad](
    valueHolder: ValueHolder[F]
  ): Logic[F] =
    new Logic[F] {
      override final val run: F[String] =
        valueHolder.getValue.map { currentValue =>
          s"The current value is: '${currentValue}'"
        }
    }
}

objects HttpServer {
  private def routes[F[_]: Monad](
    logic: Logic[F]
  ): HttpRoutes[F] = {
    val dsl = Http4sDsl[F]
    import dsl._

    HttpRoutes.of[F] {
      case GET -> Root / "ping"  => Ok("pong")
      case GET -> Root / "value" => Ok(logic.run)
    }
  }

  def make[F[_]: Async](
    logic: Logic[F]
  ): Resource[F, Server] =
    EmberServerBuilder
      .default[F]
      .withPort(port"8080")
      .withHost(host"0.0.0.0")
      .withHttpApp(routes(logic).orNotFound)
      .build
}
© www.soinside.com 2019 - 2024. All rights reserved.