以类似 API 流的方式从另一个 IO 进程访问多进程中的变量

问题描述 投票:0回答:0

我有三个并行运行的 scrappers,以从动态变化的内容中获取更新。爬虫必须更新(或附加)相同的共享变量。与此同时,我有一个来自 API 的更新流进入一个 IO 函数。我也在寻找一种以 IO 方式处理抓取器的方法,但是当新输入从 API 流到达时,将优先级交还给处理 API 流。

我遇到过很多关于 mulriprocessing 和 asyncio 的帖子、博客和答案,但我从来没有能够以一种相当优雅/有效的方式将它们结合起来。

简而言之,我想寻找一种全时处理爬虫输出的方法,在 API 发送更新时中断,处理更新,然后恢复该过程。这是我卡住的地方(删除不必要的东西后):

import random
import time
import multiprocessing as mp

def mp_scraper(nb, shared_str):
    # Here is the code scraping the dynamic content
    time.time(3)
    r = random.random()
    # Update the variable to process
    shared_str.value += ":::"str(nb + round(r, 2))
    # ":::" is to help parsing later.
    # I guess this could be replaced by a queue, somehow. 
 

# I am stuck here
def start_processing(shared_str):

    # Listen for updates from API
    async def handle_api_updates(data):
        # Process API update

    global api_stream
    api_stream = APIstream(api_key)
    api_stream.subscribe(handle_api_updates)  # as a callback

   # Here is where I want to update the output from scrapers
   async def handle_scraping(shared_str)
        # Process scrapers updates

if __name__ == '__main__':
    # Create manager to share string and pool
    manager = mp.Manager()
    # Create shared string among workers
    shared_str = manager.Value(ctypes.c_char_p, "")
    # Pool for workers
    pool = mp.Pool(4) 

    """Start multiprocessing"""
    pool.apply_async(mp_scraper, args=(1, shared_str))
    time.sleep(2)
    pool.apply_async(mp_scraper, args=(2, shared_str))
    time.sleep(2)
    pool.apply_async(mp_scraper, args=(3, shared_str))
    time.sleep(2)
    pool.apply_async(start_processing, args=(shared_str,))

    pool.close()
    pool.join()

抓取器发送更新的速度可能比处理程序处理更新的速度快,这就是为什么我需要以某种方式存储它们(这里是共享字符串,但我怀疑队列可以做得更好)。

python asynchronous web-scraping multiprocessing
© www.soinside.com 2019 - 2024. All rights reserved.