我在多处理中使用 pymongo。我用6个进程来计算一些东西并将计算结果插入到MongoDB中。
import multiprocessing
import sys
import pymongo
import datetime
from multiprocessing import Pool
def query_records_by_date(ticker, start_of_day):
"""
This function looks up records from MongoDB using parameters provided and return a list
"""
mongo = pymongo.MongoClient("mongodb://localhost:27017/")
...
def get_all_post_dates(ticker):
"""
This function uses aggregations to get something from MongoDB and return a list of datetime.datetime objects
"""
mongo = pymongo.MongoClient("mongodb://localhost:27017/")
...
def process_ticker_attention(ticker, finish_num, lock):
mongo_p = pymongo.MongoClient("mongodb://localhost:27017/", connectTimeoutMS=600000)
try:
ticker_dates = get_all_post_dates(ticker)
for date in ticker_dates:
data = query_records_by_date(ticker, date)
mongo_p['stock']['attention'].update_one({'date': date}, {
'$set': {
'post_numbers': {
ticker: len(data)
}
}
})
lock.acquire()
finish_num.value += 1
sys.stdout.write(f'\rfinish_count: {finish_num.value}')
sys.stdout.flush()
lock.release()
except Exception as e:
print('error!', e.__class__.__name__, e)
if __name__ == '__main__':
mongo = pymongo.MongoClient("mongodb://localhost:27017/")
database_name = 'StockForum'
lock = multiprocessing.Lock()
finish_num = multiprocessing.Manager().Value('i', 0)
tickers = mongo[database_name].list_collection_names()
p = Pool(6)
for t in tickers:
p.apply_async(process_ticker_attention, args=(t, finish_num, lock))
p.close()
p.join()
我确保每个子进程创建自己的 mongo_client (如代码片段所示)。
ticker_dates
函数中的process_ticker_attention
通常有数千个元素。
代码一开始运行良好,但大约 20-30 秒后抛出错误:
error! AutoReconnect localhost:27017: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted (configured timeouts: connectTimeoutMS: 20000.0ms)
我该如何解决这个问题?
终于找到了解决办法,希望这对某人有帮助。
出现问题的原因是
get_all_post_dates
在 process_ticker_attention
中被多次调用(每个代码数千次)。解决方法是在每个子进程内共享一个 mongo_client(不跨进程)。
def init_process(ticker, finish_num, lock):
"""Initialize a MongoDB connection for each worker process."""
global mongo_p
mongo_p = pymongo.MongoClient("mongodb://localhost:27017/", connectTimeoutMS=600000)
process_ticker_attention(ticker, finish_num, lock)