我如何使用TQDM进度栏跟踪文本提取的进度

问题描述 投票:0回答:1

您可以通过这种方式修改脚本:
from tqdm import tqdm
import time
from datetime import datetime, timedelta

def process_documents(file_path, index_name):
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"File not found: {file_path}")
    
    with open(file_path, 'rb') as file:
        file_content = file.read()
    
    document_ai_client = DocumentIntelligenceClient(
        endpoint=os.getenv("DOCUMENT_ENDPOINT"),
        credential=AzureKeyCredential(os.getenv("DOCUMENT_KEY"))
    )

    model_id = "prebuilt-layout"
    all_text = []

    # Initialize progress tracking variables
    start_time = time.time()
    progress = {
        'status': 'Processing',
        'percent_complete': 0,
        'eta': None,
        'current_page': 0
    }

    poller = document_ai_client.begin_analyze_document(
        model_id, 
        {"base64Source": file_content}
    )

    # Poll for progress while operation is running
    while not poller.done():
        # Update progress every few seconds
        progress['percent_complete'] = int(poller._polling_method._status * 100)
        elapsed_time = time.time() - start_time
        
        if progress['percent_complete'] > 0:
            total_time_estimate = elapsed_time / (progress['percent_complete'] / 100)
            eta_seconds = total_time_estimate - elapsed_time
            progress['eta'] = str(timedelta(seconds=int(eta_seconds)))

        # You can yield progress here if using as a generator
        # yield progress
        
        # Or print progress to console
        print(f"\rProgress: {progress['percent_complete']}% | ETA: {progress['eta']}", end='')
        
        time.sleep(1)  # Avoid polling too frequently

    result = poller.result()
    page_texts = defaultdict(list)

    # Process paragraphs with progress bar
    total_paragraphs = len(result.paragraphs)
    for i, paragraph in enumerate(tqdm(result.paragraphs, desc="Processing paragraphs")):
        role = paragraph.role if hasattr(paragraph, "role") else None
        page_num = paragraph.bounding_regions[0].page_number if paragraph.bounding_regions else "Unknown"

        if role not in ["pageHeader", "pageFooter", "pageNumber"]:  
            page_texts[page_num].append(paragraph.content)
        
        # Update progress
        progress['percent_complete'] = int((i + 1) / total_paragraphs * 100)

    file_name = os.path.basename(file_path)
    # Process pages with progress bar
    for page_num, paragraphs in tqdm(page_texts.items(), desc="Processing pages"):
        combined_text = " ".join(paragraphs)
        all_text.append({
            'file': file_name,
            'doc_num': 1,  
            'page_num': page_num,
            'text': combined_text
        })
        
        progress['current_page'] = page_num
        
    total_words = sum(len(entry["text"].split()) for entry in all_text)
    print(f"\nTotal words processed: {total_words}")
    
    return all_text, progress
data-science text-extraction tqdm data-engineering azure-document-intelligence
1个回答
0
投票
我不确定您的UI,但这就是您可以使用Websocket进行处理的方式:

from flask import Flask from flask_socketio import SocketIO, emit app = Flask(__name__) socketio = SocketIO(app) @app.route('/process-document') def process_document(): def process_with_updates(): for progress in process_documents(file_path, index_name): socketio.emit('progress_update', progress) # Start processing in a background thread thread = Thread(target=process_with_updates) thread.start() return {"status": "processing started"}
    

最新问题
© www.soinside.com 2019 - 2024. All rights reserved.