fs2并发队列Scala:缺乏见识

问题描述 投票:1回答:1

我是一个新手,试图掌握fs2队列背后的直觉。我正在尝试做一个从Stream[IO, Int]中提取数据的基本示例。但是对我来说文档还不够,因为它直接涉及高级内容。

这里是我到目前为止所做的:

import cats.effect.{ ExitCode, IO, IOApp}
import fs2._
import fs2.concurrent.Queue

class QueueInt(q: Queue[IO, Int]) {
  def startPushingtoQueue: Stream[IO, Unit] = {
    Stream(1, 2, 3).covary[IO].through(q.enqueue)
    q.dequeue.evalMap(n => IO.delay(println(s"Pulling element $n from Queue")))
  }
}

object testingQueues extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {

    val stream = for {
      q <- Queue.bounded(10)
      b = new QueueInt(q)
      _ <- b.startPushingtoQueue.drain
    } yield ()
  }

}

问题1:我得到No implicit argument of type Concurrent[F_],知道我没有使用任何并发效果,我似乎无法弄清楚我想念的是什么?

问题2:如何打印结果。

问题3:有人可以指导我学习一些有关fs2的资源

scala stream queue fs2
1个回答
0
投票

我在您的代码中发现了几个问题:

  1. 如果遇到关于缺少隐式的错误,通常可以使用显式声明类型参数来修复它们:
 q <- Queue.bounded[IO, Unit](10) // it will fix your error with implicits
  1. 您理解的结果类型为IO[Unit],但是要使其运行,您必须从run方法中将其返回。您还需要将类型从单位更改为ExitCode
stream.as(ExitCode.Success)
  1. 在您的方法startPushingToQueue中,您正在创建Steam,但未将其分配到任何地方。它只会创建流的描述,但不会运行。

我想您想要实现的是在方法上创建将元素推入队列,而另一种方法将从队列中获取元素并打印出来。请检查我的解决方案:

import cats.effect.{ ExitCode, IO, IOApp}
import fs2._
import fs2.concurrent.Queue
import scala.concurrent.duration._

class QueueInt(q: Queue[IO, Int])(implicit timer: Timer[IO]) { //I need implicit timer for metered
  def startPushingToQueue: Stream[IO, Unit] = Stream(1, 2, 3)
    .covary[IO]
    .evalTap(n => IO.delay(println(s"Pushing element $n to Queue"))) //eval tap evaluates effect on an element but doesn't change stream
    .metered(500.millis) //it will create 0.5 delay between enqueueing elements of stream,
     // I added it to make visible that elements can be pushed and pulled from queue concurrently
    .through(q.enqueue)

  def pullAndPrintElements: Stream[IO, Unit] = q.dequeue.evalMap(n => IO.delay(println(s"Pulling element $n from Queue")))
}

object testingQueues extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {

    val program = for {
      q <- Queue.bounded[IO, Int](10)
      b = new QueueInt(q)
      _ <- b.startPushingToQueue.compile.drain.start //start at the end will start pushing element to queue in another Fiber
      _ <- b.pullAndPrintElements.compile.drain //compile.draing compiles stream into io byt pulling all elements.
    } yield ()

    program.as(ExitCode.Success)
  }

}

在控制台上,您将看到几行有关交错插入队列的推和拉操作。如果删除start,您将看到在推入所有元素后首先从startPushingToQueue开始的流结束,然后才开始pullAndPrintElements

[如果您正在寻找学习fs2的良好资源,我建议您应该首先查看与fs2相关的演讲。比旧的更喜欢新的演讲,因为他们可以引用旧的API。

您还应该检查fs2文档上的guide

© www.soinside.com 2019 - 2024. All rights reserved.