我对多处理不是很有经验,但今天我尝试制作一个小型 http+api 服务器,当我注意到我必须多次按登录按钮才能进入我的帐户时,我尝试在没有 http.server 的情况下重新创建问题,但我不能,所以这是我的代码的最小版本。 跳过的项目在队列中累加,但 get() 忽略它们,我尝试将其与 block=False 一起使用,但得到相同的结果。
from multiprocessing import Queue
import time
from uuid import uuid4
import threading
import requests
from http.server import SimpleHTTPRequestHandler
import socketserver
#Simulate request
def test():
while input()!='q':
try:requests.get('http://127.0.0.1:55467')
except:pass
class database:
def __init__(self):
self.queue=Queue()
self.elements={}
threading.Thread(target=self.db_updater,daemon=True).start()
#The function called in api calls
def create(self,name,data):self.queue.put([name,data])
#The loop that is supposed to update the database
def db_updater(self):
def create_element(element_name,data):self.elements[element_name]=data
while True:
change=self.queue.get()
print('update received',change,'\nqueue length',self.queue.qsize())
create_element(*change)
if __name__ == '__main__':
db=database()
threading.Thread(target=test,daemon=True).start()
class MyHttpRequestHandler(SimpleHTTPRequestHandler):
def log_message(*args):pass
def do_POST(self):pass
def do_GET(self):
print('request received')
db.create('testuser',{'token':str(uuid4()),'expire':int(time.time())+7200})
self.send_response(200)
self.end_headers()
with socketserver.ForkingTCPServer(("127.0.0.1",55467),MyHttpRequestHandler) as httpd:httpd.serve_forever()
输出如下所示:
request received
update received ['testuser', {'token': '86520f1e-524f-4587-8e4f-b826a0b14012', 'expire': 1712054294}]
queue length 1
request received
request received
update received ['testuser', {'token': 'a1c2a5e6-9390-4f31-a25a-df5e36f184ee', 'expire': 1712054295}]
queue length 2
request received
update received ['testuser', {'token': '08eb6fb4-5f4e-4df3-af48-28e854249c3e', 'expire': 1712054295}]
queue length 2
request received
update received ['testuser', {'token': 'dc3128ac-98d1-409b-9ca3-8c32f311b180', 'expire': 1712054295}]
queue length 2
request received
update received ['testuser', {'token': '55cc39a9-74cd-4e8c-b1fd-8db11e0b07c1', 'expire': 1712054296}]
queue length 2
request received
request received
update received ['testuser', {'token': 'a9779e1c-4a8e-45ee-8223-6022e94f67be', 'expire': 1712054296}]
queue length 3
request received
update received ['testuser', {'token': '9f8e2ab4-3c2e-4c90-ae94-be0e946e1ca6', 'expire': 1712054296}]
queue length 3
request received
request received
update received ['testuser', {'token': '6a0835c5-8568-4e53-bb9c-ec8ac9f134c0', 'expire': 1712054297}]
queue length 4
你刚刚在 Cpython 中发现了一个发现......,连续分叉破坏了队列上的互斥锁......你可以使用 Manager.Queue 来代替吗?它速度较慢,但没有这个问题,否则你需要避免使用
ForkingServer
。
from multiprocessing import Manager
...
self.manager = Manager()
self.queue = self.manager.Queue()