Python:如何将迭代器发送给两个不同的使用者而不将整个事物加载到内存中?

问题描述 投票:0回答:3

我有一个由两个函数消耗的迭代器(下面示例中的

mean_summarizer
std_summarizer
)。我希望这两个函数都能处理迭代器,而无需一次将整个迭代器加载到内存中。 下面是一个最小的示例(

也在 Colab 中

),它提供了正确的结果,除了它涉及将整个输入立即加载到内存中。无需理解 mean_summarizer

std_summarizer
last
中的花哨代码 - 主要是为了简洁起见。
问题是:

在不更改函数签名(仅内部)的情况下重新实现

summarize_input_stream的最简洁方法是什么,使其内存使用量不会随输入流的长度而变化?


我感觉涉及到协程,但我不知道如何使用它们。

import numpy as np from typing import Iterable, Mapping, Callable, Any def summarize_input_stream( # Run the input stream through multiple summarizers and collect results input_stream: Iterable[float], summarizers: Mapping[str, Callable[[Iterable[float]], float]] ) -> Mapping[str, float]: inputs = list(input_stream) # PROBLEM IS HERE <-- We load entire stream into memory at once return {name: summarizer(inputs) for name, summarizer in summarizers.items()} def last(iterable: Iterable[Any]) -> Any: # Just returns last element of iterable return max(enumerate(iterable))[1] def mean_summarizer(stream: Iterable[float]) -> float: # Just computes mean online and returns final value return last(avg for avg in [0] for i, x in enumerate(stream) for avg in [avg*i/(i+1) + x/(i+1)]) def std_summarizer(stream: Iterable[float]) -> float: # Just computes standard deviation online and returns final value return last(cumsum_of_sq/(i+1) - (cumsum/(i+1))**2 for cumsum_of_sq, cumsum in [(0, 0)] for i, x in enumerate(stream) for cumsum_of_sq, cumsum in [(cumsum_of_sq+x**2, cumsum+x)])**.5 summary_stats = summarize_input_stream( input_stream=(np.random.randn()*2+3 for _ in range(1000)), summarizers={'mean': mean_summarizer, 'std': std_summarizer} ) print(summary_stats) # e.g. {'mean': 3.020903422847062, 'std': 1.943724669289156}


python iterator coroutine
3个回答
2
投票
summarize_input_stream

签名的解决方案。它为每个摘要器启动一个线程,并通过单独的阻塞队列增量地提供每个线程(

链接到 Colab
)。 import numpy as np from typing import Iterable, Mapping, Callable, Any from threading import Thread from queue import Queue from functools import partial def summarize_input_stream( # Run the input stream through multiple summarizers and collect results input_stream: Iterable[float], summarizers: Mapping[str, Callable[[Iterable[float]], float]] ) -> Mapping[str, float]: POISON_PILL = object() def run_summarizer(summarizer: Callable[[Iterable[float]], float], queue: Queue) -> float: result = summarizer(iter(queue.get, POISON_PILL)) # Waits until the food is ready to eat queue.put(result) # Use the queue the other way around to return the result queues = [Queue(maxsize=1) for _ in summarizers] # <-- Note We could can probably be more time-efficient if we increase maxsize, which should cause less thread switching at the cost of more memory usage threads = [Thread(target=partial(run_summarizer, summarizer, queue)) for summarizer, queue in zip(summarizers.values(), queues)] for t in threads: t.start() for inp in input_stream: for queue in queues: queue.put(inp) # Waits until the summarizer is hungry to feed it for queue in queues: queue.put(POISON_PILL) # Stop the iteration for t in threads: t.join() results = [queue.get() for queue in queues] return {name: result for name, result in zip(summarizers, results)} def last(iterable: Iterable[Any]) -> Any: # Just returns last element of iterable return max(enumerate(iterable))[1] def mean_summarizer(stream: Iterable[float]) -> float: # Just computes mean online and returns final value return last(avg for avg in [0] for i, x in enumerate(stream) for avg in [avg * i / (i + 1) + x / (i + 1)]) def std_summarizer(stream: Iterable[float]) -> float: # Just computes standard deviation online and returns final value return last(cumsum_of_sq / (i + 1) - (cumsum / (i + 1)) ** 2 for cumsum_of_sq, cumsum in [(0, 0)] for i, x in enumerate(stream) for cumsum_of_sq, cumsum in [(cumsum_of_sq + x ** 2, cumsum + x)]) ** .5 summary_stats = summarize_input_stream( input_stream=(np.random.randn() * 2 + 3 for _ in range(1000)), summarizers={'mean': mean_summarizer, 'std': std_summarizer} ) print(summary_stats) # e.g. {'mean': 3.020903422847062, 'std': 1.943724669289156}



1
投票
list

化它,或者使用

itertools.tee
(如果其中一个
tee
-d 迭代器在另一个迭代器提取任何项目之前被完全消耗,在道德上是等效的;它必须在内部存储
所有
数据)。 实现此目的的唯一方法是使用

单个

摘要器来处理输入一次并同时计算所有相关摘要。


0
投票
如所述

问题没有不涉及多线程的解决方案。 但是,通过对

summarize_input_stream

的签名稍加改动(这确实违反了我自己写的规则),我们就可以得到结果,而无需付出(记忆)代价。

这个技巧(我称之为“genfunctrification”,除非它有一个预先存在的名称)是:

我们将摘要器从以下类型的函数转变为:
    Callable[[Iterable[float]], float]
  • ...
    .. 进入 
  • Callable[[Iterable[float]], Iterable[Callable[[], float]]]
  •  类型的生成器函数
    
    请注意,我们可以让它们直接返回结果(
  • Callable[[Iterable[float]], Iterable[float]
  • ),但这会涉及到在每次迭代时浪费性地重新计算
    (cumsum_of_sq/(i+1) - (cumsum/(i+1))**2)**.5
    之类的东西,而我们实际上只在最后一次需要它,所以我们让迭代器yield
    Callable
    s,它可以仅在需要时(最后一次迭代之后)计算结果。
    
    
  • 修改后的代码(
和Colab链接

)。 import numpy as np from typing import Iterable, Mapping, Callable, Any, Sequence import itertools def summarize_input_stream( # Run the input stream through multiple summarizers and collect results input_stream: Iterable[float], summarizers: Mapping[str, Callable[[Iterable[float]], Iterable[Callable[[], float]]]] ) -> Mapping[str, float]: input_streams_teed = itertools.tee(input_stream, len(summarizers)) result_getter_streams: Sequence[Iterable[Callable[[], float]]] = [summarizer(stream_copy) for summarizer, stream_copy in zip(summarizers.values(), input_streams_teed)] final_results = [f() for f in last(zip(*result_getter_streams))] return {name: r for name, r in zip(summarizers, final_results)} def last(iterable: Iterable[Any]) -> Any: # Just returns last element of iterable return max(enumerate(iterable))[1] def mean_summarizer(stream: Iterable[float]) -> Iterable[Callable[[], float]]: # Just computes mean online and returns final value return ((lambda: avg) for avg in [0] for i, x in enumerate(stream) for avg in [avg*i/(i+1) + x/(i+1)]) def std_summarizer(stream: Iterable[float]) -> Iterable[Callable[[], float]]: # Just computes standard deviation online and returns final value return ((lambda: (cumsum_of_sq/(i+1) - (cumsum/(i+1))**2)**.5) for cumsum_of_sq, cumsum in [(0, 0)] for i, x in enumerate(stream) for cumsum_of_sq, cumsum in [(cumsum_of_sq+x**2, cumsum+x)]) summary_stats = summarize_input_stream( input_stream=(np.random.randn()*2+3 for _ in range(1000)), summarizers={'mean': mean_summarizer, 'std': std_summarizer} ) print(summary_stats) # e.g. {'mean': 3.020903422847062, 'std': 1.943724669289156}

注意:由于此特定实现依赖于 
    dict
  • 维护其顺序,因此它只能在 Python 3.7+ 中可靠地工作 - 尽管如果需要,它可以轻松向后移植。
    
        
© www.soinside.com 2019 - 2024. All rights reserved.