如何用scalatest测试`sc的`scala.rx`?

问题描述 投票:0回答:1

我有一个方法连接到websocket并从一些真正的外部系统获取流消息。

简化版是:

def watchOrders(): Var[Option[Order]] = {
  val value = Var[Option[Order]](None)
  // onMessage( order => value.update(Some(order))
  value
}

当我测试它(使用scalatest)时,我想让它连接到真正的外部系统,并且只检查前4个命令:

test("watchOrders") {
  var result = List.empty[Order]
  val stream = client.watchOrders()
  stream.foreach {
    case Some(order) =>
      result = depth :: result
      if (result.size == 4) {      // 1. 
        assert(orders should ...)  // 2. 
        stream.kill()              // 3. 
      }
    case _ => 
  }
  Thread.sleep(10000)              // 4. 
}

我有4个问题:

  1. 这是检查前4个订单的正确方法吗?在scala.rx中找不到take(4)方法
  2. 如果assert失败,测试仍然通过,如何解决?
  3. 这是阻止流的正确方法吗?
  4. 如果线程没有在这里睡觉,测试将传递case Some(order)中的代码永远不会运行。还有更好的等待方式吗?
scala reactive-programming scala.rx
1个回答
1
投票

您可以考虑从Var中获取List的一种方法是使用.fold组合器。

你遇到的另一个问题是处理数据的异步性质 - 假设你真的想在你的测试代码中与这个外部真实世界系统交谈(也就是说,这更接近集成测试方面),你将会我想看看scalatest对异步测试的支持,并且可能会做一些事情,比如在你积累列表中的4个元素时可以完成的承诺构建未来。

见:http://www.scalatest.org/user_guide/async_testing

© www.soinside.com 2019 - 2024. All rights reserved.