我有一个“statistic.txt”文件,它可以包含从 0 到 819.2 百万行的数字 [-40 亿; +40 亿]。我需要计算 5 行的组数,其中第 3 个数字比其他数字大。我编写了代码,它可以工作,但需要很长时间才能执行。我怎样才能使我的代码异步以使其运行得更快?
这是我做的代码
defmodule Statistics do
def count_groups(filename) do
File.stream!(filename)
|> Stream.map(&String.trim/1)
|> Stream.map(&String.to_integer/1)
|> Stream.chunk_every(5, 1, :discard)
|> Stream.filter(fn [a, b, c, d, e] -> c > Enum.max([a, b, d, e]) end)
|> Enum.count()
end
end
我尝试使用 ParallelStream、Task 和 Flow,但无法应用任何一个。
更新 这是我使用 Flow 制作的代码
def count_groups_flow(filename) do
File.stream!(filename)
|> Stream.map(&String.trim/1)
|> Stream.map(&String.to_integer/1)
|> Stream.chunk_every(5, 1, :discard)
|> Flow.from_enumerable(stages: 4)
|> Flow.partition(stages: 4)
|> Flow.filter(fn [a, b, c, d, e] -> c > Enum.max([a, b, d, e]) end)
|> Enum.count()
end
更新
输入数据示例(“statistic.txt”文件的前 10 行)
168648312
503340495
-283728446
-762780208
1250431140
-225340028
-72728416
-804793229
-1014960356
-1256160640
-1120932173
这里的罪魁祸首是
Stream.chunk_every(5, 1, :discard)
,在之后应用
Flow
并没有显着改善,因为它计算的表达式几乎是即时的。
也就是说,目标是首先避免将迭代次数乘以 5。为此,我们需要明智地分块。我们知道,总数很大,所以把它分成更大的块,然后用
Flow
分割每个块的计算是有意义的。我们将使用 Stream.chunk_every(input, n, n-4, :discard)
,其中 n
足够大以确保我们不会丢失五重奏,并且需要 n-4
以便每个五重奏完全进入其中一个块。此外,当数据已经被同时处理时,也应该分阶段进行修剪和转换为整数。
到目前为止,还不错。
filename
|> File.stream!()
# chunk almost without overhead
|> Stream.chunk_every(10_000, 10_000 - 4, :discard)
# now we have big chunks and can use `Flow`
|> Flow.from_enumerable()
|> Flow.partition()
|> Flow.map(fn big_chunk ->
big_chunk
|> Enum.reduce({0, {nil, nil, nil, nil}}, process_chunk_with_trim)
|> elem(0)
end)
|> Enum.sum()
唯一剩下的就是实现
process_chunk/2
函数来计算块的中间结果。我们会手动完成,因为与标准核心库实现相比,它会更快。我们需要在累加器中保留之前的四个元素来进行比较。
process_chunk = fn
e, {0, {nil, nil, nil, nil}} -> {0, {e, nil, nil, nil}}
e, {0, {e1, nil, nil, nil}} -> {0, {e, e1, nil, nil}}
e, {0, {e1, e2, nil, nil}} -> {0, {e, e1, e2, nil}}
e, {0, {e1, e2, e3, nil}} -> {0, {e, e1, e2, e3}}
e, {acc, {e1, e2, e3, e4}}
when e2 > e and e2 > e1 and e2 > e3 and e2 > e4 ->
{acc + 1, {e, e1, e2, e3}}
e, {acc, {e1, e2, e3, _}} -> {acc, {e, e1, e2, e3}}
end
process_chunk_with_trim = fn e, acc ->
process_chunk.(e |> String.trim() |> String.to_integer(), acc)
end
请注意,以上不会计算尾随的四重奏和三重奏,但由于您在分块方法中使用了
:discard
,所以应该没问题。
我没有测试上面的代码,但它应该让您正确地了解如何完成任务(尽管我希望它开箱即用。)