Elixir:如何在多个管道之间保持Stream状态?

问题描述 投票:0回答:2

我的目标:我想读取文件的第一行,在管道中重组输入,然后在另一个管道中处理文件的其余部分。

我的问题:流在每个新管道上重置。

示例代码

defmodule StrangeStream do
  fs = File.stream!("file.txt")

  Stream.take(fs, 1) |> Enum.to_list() |> IO.inspect()

  Stream.take(fs, 1) |> Enum.to_list() |> IO.inspect()
end

文本文件file.txt

First line.
Second line.
Third line.

输出

["First line.\n"]
["First line.\n"]

如您所见,流在每个管道中重置。每个管道都从文件的第一行开始。如何维护两次调用管道之间流的状态?预先感谢!

stream elixir
2个回答
0
投票

TL; DR:您不能。


中没有可变状态,因此无法维持资源状态。

唯一类似的事情是归约期间的suspend可枚举,但即使直接使用流也无法实现。

您可以求助于Stream.transform/4并自己维护状态,并相应地选择管道。

Sidenote: Stream.transform/4已经终止了流,因此问题中的方法根本行不通。


0
投票

这是我如何获得想要的效果。希望它可以帮助其他调查此问题的人。

再次,非常感谢Aleksei节省了我很多时间。

Enum.to_list/1

输出

defmodule StrangeStream do
  do_stuff = fn(something) ->
    # We'd do something useful here
    something
  end

  {:ok, file} = File.open("file.txt", [:read, :line])

  # Read the first line
  first_line = IO.read(file, :line)
  |>  String.trim()
  |>  do_stuff.()
  |>  IO.inspect([label: "first_line"])

  # Create side-effect streams
  print_stream = IO.binstream(:stdio, :line)
  file_stream  = File.stream!("output.txt", [:write, :append])

  # Convert IO to Stream and process
  IO.stream(file, :line)
  |>  Stream.map(&String.trim(&1))
  |>  do_stuff.()
  |>  Stream.into(print_stream, fn(s)-> s <>"\n" end)
  |>  do_stuff.()
  |>  Stream.into(file_stream)
  |>  do_stuff.()
  |>  Enum.to_list()
  |>  IO.inspect([label: "rest of file"])
end
© www.soinside.com 2019 - 2024. All rights reserved.