我需要使用布隆过滤器的最大可能尺寸。我使用“from pybloom_live import BloomFilter”。可以创建一个占 RAM 49% 的布隆过滤器(要创建它,您需要相同数量的内存来保存它)。然后我做了几个周期来检查布隆过滤器中的数据。我启动了它,脚本按照我的需要工作,但问题是该脚本只使用了 10% 的处理器资源。 也就是说,我需要脚本加载布隆过滤器 1 次并将其分发到线程。我只需要检查是否有值。但任何这样做的尝试都会导致内存溢出。事实上,线程创建布隆过滤器的副本,也就是说,所有可用 RAM 都进入第二个线程。在 Python 上是否可以为多个线程使用布隆过滤器的一份副本? 或者也许有一种方法可以在运行脚本的一个窗口中使用更多 CPU 资源? 以下是我设法做到的最大值,但即使在这里,2 个线程也不会启动。
def load_bloom_filter(file_path):
try:
bloom_filter = BloomFilter.fromfile(open(file_path, 'rb'))
if bloom_filter is None:
print("[-] error load")
else:
print(f"[+] load successful {file_path}, size: {len(bloom_filter)}")
return bloom_filter
except Exception as e:
print(f"[-] error: {e}")
return None
def main(bloom_filter, start, end):
current_process_ = multiprocessing.current_process()
current_process = str(current_process_.name)
print("[+] ", current_process, "main bloom size:", len(bloom_filter.value))
for i, value in enumerate(range(start, end), start=start):
calculated_value_txt = some_calc(value)
if calculated_value_txt in bloom_filter.value:
print('found', calculated_value_txt)
else:
print('not found', calculated_value_txt)
def process_task(bloom_filter, start, end):
main(bloom_filter, start, end)
if __name__ == "__main__":
loaded_bloom_filter = load_bloom_filter(input_bloom_file_path1)
manager = multiprocessing.Manager()
bloom_filter = manager.Value(BloomFilter, loaded_bloom_filter)
task_params = [
(bloom_filter, start1, end1),
(bloom_filter, start2, end2),
]
with multiprocessing.Pool() as pool:
pool.starmap(process_task, task_params)
UDP: 我想再次解释一下我的任务。我有很多数据可以检查其中的值。检查值的速度非常重要,所以我不能使用数据库。一个好的解决方案是创建一个占用空间少几倍的布隆过滤器。我的代码将此布隆过滤器加载到 RAM 中,并在循环中检查是否存在值。但问题是这个进程并没有使用所有的CPU资源。我想以某种方式提高处理速度。我想过使用多处理,但是出现了一个新问题,第二个线程创建了布隆过滤器的副本,它占用了 RAM 的 49%,并且程序终止。 我需要任何能够将处理器负载提高 80% 而不是现在的 10% 的解决方案。
用这个
https://github.com/prashnts/pybloomfiltermmap3
它使用 mmap 文件,这意味着您可以对相同的布隆过滤器运行并行查询,而无需为每个查询单独将其重新加载到内存中。
只是仅供参考,尽管使用大型布隆过滤器,真正的问题不是计算限制,而是内存访问限制