在 Stream.take(1) 之后使用流的其余部分

问题描述 投票:0回答:3

我正在使用一个库来读取 csv 文件,该文件返回行流:

File.stream!("some_file.csv")
|> CSV.decode  # returns a stream of rows

第一行通常是 csv 标题,所以我认为拆分标题和正文会很酷:

def split_header_and_body(row_stream) do
  header = row_stream |> Stream.take(1) |> Enum.to_list |> List.first
  body = row_stream |> Stream.drop(1)

  {header, body}
end

这不起作用,可能是因为

Stream.take/1
结束了枚举(如文档中所述)。

我认为一切都是不可变的,所以我可以重复使用最初的

row_stream
,但事实并非如此,我也找不到如何复制流。

elixir
3个回答
0
投票

这应该可以正常工作。 我相信问题是你想在返回之前在 body 上调用 Enum.to_list ,否则你将返回一个 Stream ,这可能不是你想要的。

Stream 本身是不可变的。 事实上,你可以这样做

row_stream | > Stream.take(1) |> Enum.to_list |> IO.inspect

并立即致电

row_stream | > Stream.take(5) |> Enum.to_list |> IO.inspect

你会看到事实的真相。


0
投票

我不明白你为什么说“它不起作用”。我已经尝试过你的代码并且它有效。

事实上,流是不可变的,文档中说

惰性地从可枚举中获取下一个计数元素并停止枚举

意味着结果流被停止,而不是保持不变的原始流

无论如何,我已经完成了你的代码的另一个版本,我认为它更简单

def split_header_and_body(row_stream) do
  {Enum.at(row_stream,0), Stream.drop(row_stream,1)}
end

0
投票

可以使用

Stream.transform
来完成此操作。 诀窍是将标题行保护在
acc
Stream.transform

例如,以下代码将 CSV 解析为地图列表。

alias NimbleCSV.RFC4180, as: CSV

defp parse_csv(row, nil) do
  {[], Enum.map(row, &String.to_atom/1)}
end

defp parse_csv(row, columns) do
  row = Stream.zip(columns, row) |> Map.new()
  {[row], columns}
  end

File.stream!("some_file.csv")
|> CSV.parse_stream(skip_headers: false)
|> Stream.map(fn row -> Enum.map(row, &:binary.copy/1) end)
|> Stream.transform(nil, &parse_csv/2)
|> Enum.to_list()
© www.soinside.com 2019 - 2024. All rights reserved.