我有以下代码,我试图在其中并行计算句子嵌入。
import multiprocessing
from tqdm import tqdm
# Define the function to be executed in parallel
def process_data(chunk):
results = []
for row in tqdm(chunk):
work_id = row[1]
mentioning_work_id = row[3]
if work_id in df_text and mentioning_work_id in df_text:
title1 = df_text[work_id]['title']
title2 = df_text[mentioning_work_id]['title']
print(title1 + '/n' + title2)
embeddings_title1 = embedding_model.encode(title1,convert_to_numpy=True)
print(embeddings_title1)
embeddings_title2 = embedding_model.encode(title2,convert_to_numpy=True)
print(embeddings_title2)
results.append(np.matmul(embeddings_title1, embeddings_title2.T))
print(results)
else:
continue
return results
from multiprocessing import Pool
# Define the data to be processed
data = df_rud_labels
# Split the data into chunks
chunk_size = len(data) // num_cores
chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]
# Create a pool of worker processes
pool = multiprocessing.Pool(processes=num_cores)
results = []
with tqdm(total=len(chunks)) as pbar:
for i, result_chunk in enumerate(pool.imap_unordered(process_data, chunks)):
# Update the progress bar
pbar.update()
# Add the results to the list
results += result_chunk
# Concatenate the results
final_result = results
但是我在中断内核后得到如下错误:
0%| | 0/2500 [00:00<?, ?it/s]
Financialization and Institutional Change in Capitalisms: A Comparison of the US and Germany/nSingle domain antibodies: promising experimental and therapeutic tools in infection and immunity
0%| | 0/2500 [00:00<?, ?it/s]
Encyclopedia of India-China Cultural Contacts, vol I/nToll-like receptors as a key regulator of mesenchymal stem cell function: An up-to-date review
0%| | 0/2500 [00:00<?, ?it/s]
Ioannis ROMANIDES, Dogmatica patristica ortodoxa, traducere de Dragos Dasca, Editura Ecclesiast, editie de protos Vasile Bîrzu, 2011/nANTHROPOMETRIC MEASUREMENTS, SOMATOTYPES AND PHYSICAL ABILITIES AS A FUNCTION TO PREDICT THE SELECTION OF TALENTS JUNIOR WEIGHTLIFTERS
A prophet of old: Jesus the “public theologian”/nCurriculum alignment at undergraduate level: military geography at the South African Military Academy
0%| | 0/4 [00:09<?, ?it/s]Process ForkPoolWorker-72:
Process ForkPoolWorker-71:
Traceback (most recent call last):
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/opt/conda/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "/opt/conda/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/opt/conda/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "/opt/conda/lib/python3.7/multiprocessing/pool.py", line 110, in worker
task = get()
File "/opt/conda/lib/python3.7/multiprocessing/queues.py", line 351, in get
with self._rlock:
File "/opt/conda/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
File "/opt/conda/lib/python3.7/multiprocessing/pool.py", line 110, in worker
task = get()
File "/opt/conda/lib/python3.7/multiprocessing/queues.py", line 351, in get
with self._rlock:
File "/opt/conda/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
---------------------------------------------------------------------------
IndexError Traceback (most recent call last)
/opt/conda/lib/python3.7/multiprocessing/pool.py in next(self, timeout)
732 try:
--> 733 item = self._items.popleft()
734 except IndexError:
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
KeyboardInterrupt Traceback (most recent call last)
<ipython-input-48-fcb9ab74a032> in <module>
31 results = []
32 with tqdm(total=len(chunks)) as pbar:
---> 33 for i, result_chunk in enumerate(pool.imap_unordered(process_data, chunks)):
34 # Update the progress bar
35 pbar.update()
/opt/conda/lib/python3.7/multiprocessing/pool.py in next(self, timeout)
735 if self._index == self._length:
736 raise StopIteration from None
--> 737 self._cond.wait(timeout)
738 try:
739 item = self._items.popleft()
/opt/conda/lib/python3.7/threading.py in wait(self, timeout)
294 try: # restore state no matter what (e.g., KeyboardInterrupt)
295 if timeout is None:
--> 296 waiter.acquire()
297 gotit = True
298 else:
KeyboardInterrupt:
我尝试在输出中使用相同的标题来计算这些嵌入,我已经成功了。但是我不确定为什么我写的代码不能做到这一点。
任何帮助将不胜感激。