我有一个由两个函数消耗的迭代器(下面示例中的
mean_summarizer
和std_summarizer
)。我希望这两个函数都能处理迭代器,而无需一次将整个迭代器加载到内存中。
下面是一个最小的示例(也在 Colab 中),它提供了正确的结果,除了它涉及将整个输入立即加载到内存中。无需理解 mean_summarizer
、
std_summarizer
和 last
中的花哨代码 - 主要是为了简洁起见。问题是:在不更改函数签名(仅内部)的情况下重新实现summarize_input_stream
的最简洁方法是什么,使其内存使用量不会随输入流的长度而变化?
import numpy as np
from typing import Iterable, Mapping, Callable, Any
def summarize_input_stream( # Run the input stream through multiple summarizers and collect results
input_stream: Iterable[float],
summarizers: Mapping[str, Callable[[Iterable[float]], float]]
) -> Mapping[str, float]:
inputs = list(input_stream) # PROBLEM IS HERE <-- We load entire stream into memory at once
return {name: summarizer(inputs) for name, summarizer in summarizers.items()}
def last(iterable: Iterable[Any]) -> Any: # Just returns last element of iterable
return max(enumerate(iterable))[1]
def mean_summarizer(stream: Iterable[float]) -> float: # Just computes mean online and returns final value
return last(avg for avg in [0] for i, x in enumerate(stream) for avg in [avg*i/(i+1) + x/(i+1)])
def std_summarizer(stream: Iterable[float]) -> float: # Just computes standard deviation online and returns final value
return last(cumsum_of_sq/(i+1) - (cumsum/(i+1))**2 for cumsum_of_sq, cumsum in [(0, 0)] for i, x in enumerate(stream) for cumsum_of_sq, cumsum in [(cumsum_of_sq+x**2, cumsum+x)])**.5
summary_stats = summarize_input_stream(
input_stream=(np.random.randn()*2+3 for _ in range(1000)),
summarizers={'mean': mean_summarizer, 'std': std_summarizer}
)
print(summary_stats)
# e.g. {'mean': 3.020903422847062, 'std': 1.943724669289156}
summarize_input_stream
签名的解决方案。它为每个摘要器启动一个线程,并通过单独的阻塞队列增量地提供每个线程(
链接到 Colab)。
import numpy as np
from typing import Iterable, Mapping, Callable, Any
from threading import Thread
from queue import Queue
from functools import partial
def summarize_input_stream( # Run the input stream through multiple summarizers and collect results
input_stream: Iterable[float],
summarizers: Mapping[str, Callable[[Iterable[float]], float]]
) -> Mapping[str, float]:
POISON_PILL = object()
def run_summarizer(summarizer: Callable[[Iterable[float]], float], queue: Queue) -> float:
result = summarizer(iter(queue.get, POISON_PILL)) # Waits until the food is ready to eat
queue.put(result) # Use the queue the other way around to return the result
queues = [Queue(maxsize=1) for _ in summarizers] # <-- Note We could can probably be more time-efficient if we increase maxsize, which should cause less thread switching at the cost of more memory usage
threads = [Thread(target=partial(run_summarizer, summarizer, queue)) for summarizer, queue in zip(summarizers.values(), queues)]
for t in threads:
t.start()
for inp in input_stream:
for queue in queues:
queue.put(inp) # Waits until the summarizer is hungry to feed it
for queue in queues:
queue.put(POISON_PILL) # Stop the iteration
for t in threads:
t.join()
results = [queue.get() for queue in queues]
return {name: result for name, result in zip(summarizers, results)}
def last(iterable: Iterable[Any]) -> Any: # Just returns last element of iterable
return max(enumerate(iterable))[1]
def mean_summarizer(stream: Iterable[float]) -> float: # Just computes mean online and returns final value
return last(avg for avg in [0] for i, x in enumerate(stream) for avg in [avg * i / (i + 1) + x / (i + 1)])
def std_summarizer(stream: Iterable[float]) -> float: # Just computes standard deviation online and returns final value
return last(cumsum_of_sq / (i + 1) - (cumsum / (i + 1)) ** 2 for cumsum_of_sq, cumsum in [(0, 0)] for i, x in enumerate(stream) for cumsum_of_sq, cumsum in
[(cumsum_of_sq + x ** 2, cumsum + x)]) ** .5
summary_stats = summarize_input_stream(
input_stream=(np.random.randn() * 2 + 3 for _ in range(1000)),
summarizers={'mean': mean_summarizer, 'std': std_summarizer}
)
print(summary_stats)
# e.g. {'mean': 3.020903422847062, 'std': 1.943724669289156}
list
化它,或者使用
itertools.tee
(如果其中一个 tee
-d 迭代器在另一个迭代器提取任何项目之前被完全消耗,在道德上是等效的;它必须在内部存储所有数据)。 实现此目的的唯一方法是使用单个
摘要器来处理输入一次并同时计算所有相关摘要。
问题没有不涉及多线程的解决方案。 但是,通过对
summarize_input_stream
的签名稍加改动(这确实违反了我自己写的规则),我们就可以得到结果,而无需付出(记忆)代价。
这个技巧(我称之为“genfunctrification”,除非它有一个预先存在的名称)是:我们将摘要器从以下类型的函数转变为:
Callable[[Iterable[float]], float]
Callable[[Iterable[float]], Iterable[Callable[[], float]]]
类型的生成器函数请注意,我们可以让它们直接返回结果(
Callable[[Iterable[float]], Iterable[float]
(cumsum_of_sq/(i+1) - (cumsum/(i+1))**2)**.5
之类的东西,而我们实际上只在最后一次需要它,所以我们让迭代器yield Callable
s,它可以仅在需要时(最后一次迭代之后)计算结果。
)。
import numpy as np
from typing import Iterable, Mapping, Callable, Any, Sequence
import itertools
def summarize_input_stream( # Run the input stream through multiple summarizers and collect results
input_stream: Iterable[float],
summarizers: Mapping[str, Callable[[Iterable[float]], Iterable[Callable[[], float]]]]
) -> Mapping[str, float]:
input_streams_teed = itertools.tee(input_stream, len(summarizers))
result_getter_streams: Sequence[Iterable[Callable[[], float]]] = [summarizer(stream_copy) for summarizer, stream_copy in zip(summarizers.values(), input_streams_teed)]
final_results = [f() for f in last(zip(*result_getter_streams))]
return {name: r for name, r in zip(summarizers, final_results)}
def last(iterable: Iterable[Any]) -> Any: # Just returns last element of iterable
return max(enumerate(iterable))[1]
def mean_summarizer(stream: Iterable[float]) -> Iterable[Callable[[], float]]: # Just computes mean online and returns final value
return ((lambda: avg) for avg in [0] for i, x in enumerate(stream) for avg in [avg*i/(i+1) + x/(i+1)])
def std_summarizer(stream: Iterable[float]) -> Iterable[Callable[[], float]]: # Just computes standard deviation online and returns final value
return ((lambda: (cumsum_of_sq/(i+1) - (cumsum/(i+1))**2)**.5) for cumsum_of_sq, cumsum in [(0, 0)] for i, x in enumerate(stream) for cumsum_of_sq, cumsum in [(cumsum_of_sq+x**2, cumsum+x)])
summary_stats = summarize_input_stream(
input_stream=(np.random.randn()*2+3 for _ in range(1000)),
summarizers={'mean': mean_summarizer, 'std': std_summarizer}
)
print(summary_stats)
# e.g. {'mean': 3.020903422847062, 'std': 1.943724669289156}
dict