pymongo 在使用多处理时抛出 WinError 10048

问题描述 投票:0回答:1

我在多处理中使用 pymongo。我用6个进程来计算一些东西并将计算结果插入到MongoDB中。

import multiprocessing
import sys
import pymongo
import datetime

from multiprocessing import Pool


def query_records_by_date(ticker, start_of_day):
    """
    This function looks up records from MongoDB using parameters provided and return a list
    """
    mongo = pymongo.MongoClient("mongodb://localhost:27017/")
    ...


def get_all_post_dates(ticker):
    """
    This function uses aggregations to get something from MongoDB and return a list of datetime.datetime objects
    """
    mongo = pymongo.MongoClient("mongodb://localhost:27017/")
    ...


def process_ticker_attention(ticker, finish_num, lock):
    mongo_p = pymongo.MongoClient("mongodb://localhost:27017/", connectTimeoutMS=600000)
    try:
        ticker_dates = get_all_post_dates(ticker)
        for date in ticker_dates:
            data = query_records_by_date(ticker, date)
            mongo_p['stock']['attention'].update_one({'date': date}, {
                '$set': {
                    'post_numbers': {
                        ticker: len(data)
                    }
                }
            })
        lock.acquire()
        finish_num.value += 1
        sys.stdout.write(f'\rfinish_count: {finish_num.value}')
        sys.stdout.flush()
        lock.release()
    except Exception as e:
        print('error!', e.__class__.__name__, e)


if __name__ == '__main__':
    mongo = pymongo.MongoClient("mongodb://localhost:27017/")
    database_name = 'StockForum'
    lock = multiprocessing.Lock()
    finish_num = multiprocessing.Manager().Value('i', 0)
    tickers = mongo[database_name].list_collection_names()
    p = Pool(6)
    for t in tickers:
        p.apply_async(process_ticker_attention, args=(t, finish_num, lock))
    p.close()
    p.join()

我确保每个子进程创建自己的 mongo_client (如代码片段所示)。

ticker_dates
函数中的
process_ticker_attention
通常有数千个元素。

代码一开始运行良好,但大约 20-30 秒后抛出错误:

error! AutoReconnect localhost:27017: [WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted (configured timeouts: connectTimeoutMS: 20000.0ms)

我该如何解决这个问题?

python sockets multiprocessing pymongo
1个回答
0
投票

终于找到了解决办法,希望这对某人有帮助。

出现问题的原因是

get_all_post_dates
process_ticker_attention
中被多次调用(每个代码数千次)。解决方法是在每个子进程内共享一个 mongo_client(不跨进程)。

def init_process(ticker, finish_num, lock):
    """Initialize a MongoDB connection for each worker process."""
    global mongo_p
    mongo_p = pymongo.MongoClient("mongodb://localhost:27017/", connectTimeoutMS=600000)
    process_ticker_attention(ticker, finish_num, lock)
© www.soinside.com 2019 - 2024. All rights reserved.