多处理队列与分叉的 http.server 一起使用时会随机跳过项目

问题描述 投票:0回答:1

我对多处理不是很有经验,但今天我尝试制作一个小型 http+api 服务器,当我注意到我必须多次按登录按钮才能进入我的帐户时,我尝试在没有 http.server 的情况下重新创建问题,但我不能,所以这是我的代码的最小版本。 跳过的项目在队列中累加,但 get() 忽略它们,我尝试将其与 block=False 一起使用,但得到相同的结果。

from multiprocessing import Queue
import time
from uuid import uuid4
import threading
import requests
from http.server import SimpleHTTPRequestHandler
import socketserver
#Simulate request
def test():
    while input()!='q':
        try:requests.get('http://127.0.0.1:55467')
        except:pass
class database:
    def __init__(self):
        self.queue=Queue()
        self.elements={}
        threading.Thread(target=self.db_updater,daemon=True).start()
    #The function called in api calls
    def create(self,name,data):self.queue.put([name,data])
    #The loop that is supposed to update the database
    def db_updater(self):
        def create_element(element_name,data):self.elements[element_name]=data
        while True:
            change=self.queue.get()
            print('update received',change,'\nqueue length',self.queue.qsize())
            create_element(*change)
if __name__ == '__main__':
    db=database()
    threading.Thread(target=test,daemon=True).start()
    class MyHttpRequestHandler(SimpleHTTPRequestHandler):
        def log_message(*args):pass
        def do_POST(self):pass
        def do_GET(self):
            print('request received')
            db.create('testuser',{'token':str(uuid4()),'expire':int(time.time())+7200})
            self.send_response(200)
            self.end_headers()
    with socketserver.ForkingTCPServer(("127.0.0.1",55467),MyHttpRequestHandler) as httpd:httpd.serve_forever()

输出如下所示:

request received
update received ['testuser', {'token': '86520f1e-524f-4587-8e4f-b826a0b14012', 'expire': 1712054294}] 
queue length 1

request received

request received
update received ['testuser', {'token': 'a1c2a5e6-9390-4f31-a25a-df5e36f184ee', 'expire': 1712054295}] 
queue length 2

request received
update received ['testuser', {'token': '08eb6fb4-5f4e-4df3-af48-28e854249c3e', 'expire': 1712054295}] 
queue length 2

request received
update received ['testuser', {'token': 'dc3128ac-98d1-409b-9ca3-8c32f311b180', 'expire': 1712054295}] 
queue length 2

request received
update received ['testuser', {'token': '55cc39a9-74cd-4e8c-b1fd-8db11e0b07c1', 'expire': 1712054296}] 
queue length 2

request received

request received
update received ['testuser', {'token': 'a9779e1c-4a8e-45ee-8223-6022e94f67be', 'expire': 1712054296}] 
queue length 3

request received
update received ['testuser', {'token': '9f8e2ab4-3c2e-4c90-ae94-be0e946e1ca6', 'expire': 1712054296}] 
queue length 3

request received

request received
update received ['testuser', {'token': '6a0835c5-8568-4e53-bb9c-ec8ac9f134c0', 'expire': 1712054297}] 
queue length 4
python multiprocessing http.server
1个回答
0
投票

你刚刚在 Cpython 中发现了一个发现......,连续分叉破坏了队列上的互斥锁......你可以使用 Manager.Queue 来代替吗?它速度较慢,但没有这个问题,否则你需要避免使用

ForkingServer

from multiprocessing import Manager

...

self.manager = Manager()
self.queue = self.manager.Queue()
© www.soinside.com 2019 - 2024. All rights reserved.