作为我的程序的一部分,我尝试使用Python中的
pdfminer
第三方库来打开和阅读PDF页面,然后使用正则表达式来搜索特定模式。我还使用 multiprocessing
来并行化,因为我有大量 PDF 需要分析。每个进程应该处理一个 PDF。
我有这个代码来设置多处理:
def process_theme_files(theme_dir: Path, processed_files: Optional[Set[str]] = None, theme_processed_files: Optional[Set[str]] = None) -> Tuple[List[Tuple[str, int, str, str, str, str, str, str]], List[Exception]]:
"""
Processes files of a specific theme in a multiprocessed manner.
Parameters:
- theme_dir (Path): Path object pointing to the theme directory.
- processed_files (set, optional): Set of globally processed files. Defaults to None.
- theme_processed_files (set, optional): Set of files processed in the current theme. Defaults to None.
Returns:
- Tuple[List[Tuple[str, int, str, str, str, str, str, str]], List[Exception]]: A tuple containing a list of tuples
containing extracted information from the theme files and a list of exceptions encountered during the processing.
"""
results = []
exceptions = []
# Number of processes to be used (can be adjusted as needed)
num_processes = multiprocessing.cpu_count()
# Initialize processed_files and theme_processed_files as empty sets if not provided
if processed_files is None:
processed_files = set()
if theme_processed_files is None:
theme_processed_files = set()
# Get PDF files in the theme directory
pdf_files = list(theme_dir.glob('**/*.pdf'))
# Create progress bar
with tqdm(total=len(pdf_files), desc='Processing PDFs', unit='file') as pbar:
# Process PDF files in parallel using ProcessPoolExecutor
with concurrent.futures.ProcessPoolExecutor(max_workers=num_processes) as executor:
# Map the process_file function to each PDF file in the list
future_to_file = {executor.submit(process_file, pdf_file): pdf_file for pdf_file in pdf_files}
# Iterate over results as they become available
for future in concurrent.futures.as_completed(future_to_file):
pdf_file = future_to_file[future]
try:
# Get the result of the task
file_results, file_exceptions = future.result()
# Extend the results list
results.extend(file_results)
# Append specific exceptions to the exceptions list
exceptions.extend(file_exceptions)
except FileNotFoundError as fnfe:
exceptions.append(f"File not found: {fnfe.filename}")
except Exception as e:
# Capture and log the generic exception
exceptions.append(f"Error processing file '{pdf_file}': {e}")
# Update the progress bar
pbar.update(1)
return results, exceptions
以及用于处理单个文件的代码:
def process_file(file_path: Path):
"""
Process a PDF file to extract text and information.
Args:
- file_path (Path): Path object representing the location of the PDF file.
Returns:
- Tuple[List, List]: A tuple containing two lists:
1. List of extracted results.
2. List of encountered exceptions during processing.
Raises:
- FileNotFoundError: If the specified file_path does not exist.
- Exception: For any other unexpected errors during processing.
"""
results = [] # List to store extracted information from each page
exceptions = [] # List to store exceptions encountered during processing
try:
# Check the size of the PDF file
pdf_size_bytes = os.path.getsize(file_path)
pdf_size_mb = pdf_size_bytes / (1024 * 1024)
# Check if the PDF file size exceeds the maximum allowed size
if pdf_size_mb > MAX_PDF_SIZE_MB:
exceptions.append((file_path, f"The file exceeds the maximum allowed size of {MAX_PDF_SIZE_MB} MB."))
print(f'{file_path} - Exceeds maximum allowed size of {MAX_PDF_SIZE_MB} MB.')
return results, exceptions
# Open the PDF file and read its content into a BytesIO buffer
with file_path.open('rb') as file:
pdf_data_buffer = BytesIO(file.read())
# Iterate through each page of the PDF
for page_number, page in enumerate(PDFPage.get_pages(pdf_data_buffer, check_extractable=True)):
# Extract text from the current page
page_text = extract_text(pdf_data_buffer, page_numbers=[page_number])
# Process the extracted text to extract information
page_results, page_exceptions = extract_information(page_text, file_path.name, page_number + 1) # Page numbers are 1-based
# Extend results and exceptions lists with page-specific results and exceptions
results.extend(page_results)
exceptions.extend(page_exceptions)
except FileNotFoundError as e:
# Handle case where the file does not exist
exceptions.append(e)
print(f"FileNotFoundError: {e}")
raise
except Exception as e:
# Handle any other unexpected exceptions
exceptions.append(e)
print(f"Exception: {e}")
raise
return results, exceptions
问题是我的 RAM 不足,即使安装了 32 GB:
通过研究,我了解到PDF不能随意阅读;它们必须从文件的开头到结尾按顺序读取,这就是我的实现方式。
我的一些 PDF 大小约为 100 MB,从未超过 200 MB,有些则相当长(1000 页)且包含许多图像。由于我在处理 PDF 时必须阅读所有页面,因此我能找到的唯一解决方法是将我阅读的 PDF 的大小限制为小于 100 MB。我也想不出限制页数的方法 - 因为要确定页数,我需要打开并读取文件。
如何限制此程序中的 RAM 使用量?
您似乎正在使用 pdfminer.
我的内存不足了
如何限制该程序中的 RAM 使用量?
您抱怨您选择的多道程序设计程度太高。 所以减少它。 运行更少的并发实例。
您需要一些测量和估计来帮助实现这一点。
您可能会发现检查 PDF 样本很有趣, 根据文件大小估计页数, 提出打印页数/兆字节转换系数。 当然,它会不准确,但仍然提供信息。
同样,经过几次运行后,您可以根据文件大小估计 RAM 消耗。 使用 psutil 了解操作系统报告的虚拟内存大小(“malloc 大小”) 处理单个 PDF 文件的 pdfminer 子进程的视图。 用它来将文件大小与内存消耗联系起来。 这将是一个嘈杂的预测。 如果你要犯错误,那就犯保守的错误, 预测相对较大的足迹。
>>> p = psutil.Process(child_pid)
>>>
>>> p.memory_info()
pmem(rss= ..., vms=67608576, ... )
现在按文件大小降序对输入的 PDF 进行排序, 创建一个池 与一个或几个插槽,并分析所有大型 PDF。 理想情况下,您将把所有 foo.pdf 文本写入 foo.txt 文件, 后续分析步骤可以廉价地处理。
处理完大的,继续处理中型的 池大小可能为 4 的文档。
最后,使用更大的池来处理所有小文件。
这里的想法是永远不要安排超出 RAM 处理能力的任务。 从只有一名工人的池开始,并进行基线测量。 然后验证两个工作人员的池实际上运行得更快,然后是三个。 当您发现额外的并发性没有改善时停止 处理时间。
我们先安排大工作,避免出现 一个核心正在做一项长期落后的工作 在所有其他核心都空闲之后的最后阶段。 当短暂的“轻松”工作成为最后剩下的一切时 在一次运行中,所有核心很可能几乎同时空闲。
GNU make 的
-j
开关提供了一种方便的方法来调度 N 个并发
任务,类似于Python的多处理池大小。
您可能想在交互式测试中利用它,
或在生产运行期间。