我有三个并行运行的 scrappers,以从动态变化的内容中获取更新。爬虫必须更新(或附加)相同的共享变量。与此同时,我有一个来自 API 的更新流进入一个 IO 函数。我也在寻找一种以 IO 方式处理抓取器的方法,但是当新输入从 API 流到达时,将优先级交还给处理 API 流。
我遇到过很多关于 mulriprocessing 和 asyncio 的帖子、博客和答案,但我从来没有能够以一种相当优雅/有效的方式将它们结合起来。
简而言之,我想寻找一种全时处理爬虫输出的方法,在 API 发送更新时中断,处理更新,然后恢复该过程。这是我卡住的地方(删除不必要的东西后):
import random
import time
import multiprocessing as mp
def mp_scraper(nb, shared_str):
# Here is the code scraping the dynamic content
time.time(3)
r = random.random()
# Update the variable to process
shared_str.value += ":::"str(nb + round(r, 2))
# ":::" is to help parsing later.
# I guess this could be replaced by a queue, somehow.
# I am stuck here
def start_processing(shared_str):
# Listen for updates from API
async def handle_api_updates(data):
# Process API update
global api_stream
api_stream = APIstream(api_key)
api_stream.subscribe(handle_api_updates) # as a callback
# Here is where I want to update the output from scrapers
async def handle_scraping(shared_str)
# Process scrapers updates
if __name__ == '__main__':
# Create manager to share string and pool
manager = mp.Manager()
# Create shared string among workers
shared_str = manager.Value(ctypes.c_char_p, "")
# Pool for workers
pool = mp.Pool(4)
"""Start multiprocessing"""
pool.apply_async(mp_scraper, args=(1, shared_str))
time.sleep(2)
pool.apply_async(mp_scraper, args=(2, shared_str))
time.sleep(2)
pool.apply_async(mp_scraper, args=(3, shared_str))
time.sleep(2)
pool.apply_async(start_processing, args=(shared_str,))
pool.close()
pool.join()
抓取器发送更新的速度可能比处理程序处理更新的速度快,这就是为什么我需要以某种方式存储它们(这里是共享字符串,但我怀疑队列可以做得更好)。