我的脚本执行 ~20000 个异步请求,然后处理响应。我想通过使用
multiprocessing
来加快处理响应的过程。来自响应的数据保存在约 3000 个 json 文件中,并且这个数字还在增长(如果数据与同一类别相关,但文件大小已经太大)。我试图通过创建服务器进程管理器对象来解决它,例如dict
,其中键是文件路径,默认情况下值为True
。在handle_results()
中,我正在检查文件路径的状态以确定是否有任何其他进程开始使用它,但是当两个进程在同时...
[2023-02-21 21:26:06,868]:[INFO]:Process i: 9 | locking: product_data/Connectors, Interconnects;Card Edge Connectors;Edgeboard Connectors_3.json | state: False
[2023-02-21 21:26:06,868]:[INFO]:Process i: 8 | locking: product_data/Connectors, Interconnects;Card Edge Connectors;Edgeboard Connectors_3.json | state: False
换句话说,不同的进程在同一时间改变一个对象,并且都认为他们在这种状态下是孤立的。我怎么知道那叫
critical section
。所以我正在写它,因为我真的确定使用
Manager
是处理这个问题的一种方法。我将分享使用multiprocessing
的部分代码:
def handle_results(
process_index: int,
queue: mp.Queue,
files_state: dict,
results: list,
urls: list[str]
) -> None:
collected_urls = set()
for url, result in zip(urls, results):
if isinstance(result, str):
try:
page_data = collect_page_data(result, url)
is_new_file, filepath = get_filepath(page_data['attributes']['Category'], FOLDER_PRODUCT_DATA)
if is_new_file and files_state.get(str(filepath)) is None:
files_state[str(filepath)] = True
while True:
if not files_state[str(filepath)]:
logging.info(f'Process i: {process_index} | {str(filepath)} ALREADY LOCKED! |'
f' state: {files_state[str(filepath)]}')
time.sleep(0.1)
continue
files_state[str(filepath)] = False
logging.info(f'Process i: {process_index} | locking: {str(filepath)} |'
f' state: {files_state[str(filepath)]}')
append_to_json(filepath, page_data, json_type=dict)
files_state[str(filepath)] = True
logging.info(f'Process i: {process_index} | unlocking: {str(filepath)} |'
f' state: {files_state[str(filepath)]}')
break
collected_urls.add(url)
except RequestDenied:
pass
except Exception as ex:
logging.error(ex, exc_info=True)
queue.put(collected_urls)
async def multihandling_results(
results: tuple,
urls: list[str],
count_processes: int = mp.cpu_count()
) -> set[str]:
""" Handle results with multiprocessing """
### probably not interesting part
filtered_results = []
filtered_urls = []
for res, url in zip(results, urls):
if isinstance(res, str):
filtered_results.append(res)
filtered_urls.append(url)
if not filtered_results:
return set()
elif len(filtered_results) <= count_processes:
max_to_task = count_processes
else:
max_to_task = len(filtered_results) // count_processes
divided_results = [
[filtered_results[i:i+max_to_task], filtered_urls[i:i+max_to_task]]
for i in range(0, len(filtered_results), max_to_task)
]
### probably not interesting part
queue = mp.Queue()
processes = []
with mp.Manager() as manager:
files_state = manager.dict()
for filepath in Path(FOLDER_PRODUCT_DATA).iterdir():
files_state[str(filepath)] = True
for process_index, process_args in enumerate(divided_results):
process = mp.Process(
target=handle_results,
args=[process_index, queue, files_state] + process_args
)
process.start()
processes.append(process)
for process in processes:
process.join()
collected_urls = set()
while not queue.empty():
collected_urls.update(queue.get())
return collected_urls
来自但我还是不确定……这会让我的目标无法到达吗?请任何解决这个问题的建议