如何在受限内核上运行Python代码

问题描述 投票:0回答:1

我有一段代码,旨在将数据批量转换为嵌入。然而,我当前面临的问题是,当我在服务器上运行此代码时,它会利用所有 32 个可用核心。我的目标是确保此代码仅使用 16 个内核,而其余 16 个内核空闲。尽管尝试了解决方案,代码仍然利用了全部 32 个内核。我正在寻求有关如何有效解决此问题并确保代码仅在 16 个内核上运行的指导。我们将非常感谢您的见解。


def test_function(a):
    batch_num = 10000
    for i in range(0, len(item_names_clear), batch_num):  
        emb = item_names_clear[i: i + batch_num]
        embeddings = model.encode(emb)
        final = pd.DataFrame({"embeddings":embeddings})
        final.index = item_names_clear[i: i + batch_num]
        start_index = i  
        end_index = i + batch_num - 1 
        filename = f'emb{start_index}_{end_index},.csv'
        final.to_csv(filename) 

num_cores_to_use = 16  

pool = multiprocessing.Pool(processes=num_cores_to_use)
pool.map(test_function, [None] * num_cores_to_use) 
pool.close()
pool.join()

尝试使用多处理和chatgpt,但没有任何帮助

我没有GPU

python algorithm machine-learning deep-learning nlp
1个回答
0
投票

hi Asset Ilyasbekov Wellcom 转至 stackoverflow 您可以使用

multiprocessing
来解决您的问题:

import multiprocessing
import pandas as pd

def test_function(data_batch):
    batch_num = 10000
    for i in range(0, len(data_batch), batch_num):  
        emb = data_batch[i: i + batch_num]
        embeddings = model.encode(emb)
        final = pd.DataFrame({"embeddings": embeddings})
        final.index = data_batch[i: i + batch_num]
        start_index = i  
        end_index = i + batch_num - 1 
        filename = f'emb{start_index}_{end_index}.csv'
        final.to_csv(filename) 

# Split data into num_cores_to_use parts
def split_data(data, num_parts):
    avg = len(data) // num_parts
    return [data[i:i + avg] for i in range(0, len(data), avg)]

num_cores_to_use = 16
data_splits = split_data(item_names_clear, num_cores_to_use)

# Use a pool of processes to handle each data split
with multiprocessing.Pool(processes=num_cores_to_use) as pool:
    pool.map(test_function, data_splits)

流程管理:使用

multiprocessing
模块的Pool对象来管理您的流程。

或者你可以使用这个代码:

import multiprocessing

def process_chunk(chunk):
    emb = chunk['emb']
    start_index = chunk['start_index']
    end_index = chunk['end_index']
    embeddings = model.encode(emb)
    final = pd.DataFrame({"embeddings":embeddings})
    final.index = item_names_clear[start_index: end_index + 1]
    filename = f'emb{start_index}_{end_index}.csv'
    final.to_csv(filename)

def test_function():
    batch_num = 10000
    chunks = []
    for i in range(0, len(item_names_clear), batch_num):
        emb = item_names_clear[i: i + batch_num]
        chunk = {'emb': emb, 'start_index': i, 'end_index': i + batch_num - 1}
        chunks.append(chunk)

    pool = multiprocessing.Pool(processes=num_cores_to_use)
    pool.map(process_chunk, chunks)
    pool.close()
    pool.join()

num_cores_to_use = 16
test_function()
© www.soinside.com 2019 - 2024. All rights reserved.