我正在使用一个库来读取 csv 文件,该文件返回行流:
File.stream!("some_file.csv")
|> CSV.decode # returns a stream of rows
第一行通常是 csv 标题,所以我认为拆分标题和正文会很酷:
def split_header_and_body(row_stream) do
header = row_stream |> Stream.take(1) |> Enum.to_list |> List.first
body = row_stream |> Stream.drop(1)
{header, body}
end
这不起作用,可能是因为
Stream.take/1
结束了枚举(如文档中所述)。
我认为一切都是不可变的,所以我可以重复使用最初的
row_stream
,但事实并非如此,我也找不到如何复制流。
这应该可以正常工作。 我相信问题是你想在返回之前在 body 上调用 Enum.to_list ,否则你将返回一个 Stream ,这可能不是你想要的。
Stream 本身是不可变的。 事实上,你可以这样做
row_stream | > Stream.take(1) |> Enum.to_list |> IO.inspect
并立即致电
row_stream | > Stream.take(5) |> Enum.to_list |> IO.inspect
你会看到事实的真相。
我不明白你为什么说“它不起作用”。我已经尝试过你的代码并且它有效。
事实上,流是不可变的,文档中说
惰性地从可枚举中获取下一个计数元素并停止枚举
意味着结果流被停止,而不是保持不变的原始流
无论如何,我已经完成了你的代码的另一个版本,我认为它更简单
def split_header_and_body(row_stream) do
{Enum.at(row_stream,0), Stream.drop(row_stream,1)}
end
可以使用
Stream.transform
来完成此操作。
诀窍是将标题行保护在 acc
的
Stream.transform
内
例如,以下代码将 CSV 解析为地图列表。
alias NimbleCSV.RFC4180, as: CSV
defp parse_csv(row, nil) do
{[], Enum.map(row, &String.to_atom/1)}
end
defp parse_csv(row, columns) do
row = Stream.zip(columns, row) |> Map.new()
{[row], columns}
end
File.stream!("some_file.csv")
|> CSV.parse_stream(skip_headers: false)
|> Stream.map(fn row -> Enum.map(row, &:binary.copy/1) end)
|> Stream.transform(nil, &parse_csv/2)
|> Enum.to_list()