使用concurrent.futures优化Python Web Scraping脚本以减少执行时间

问题描述 投票:0回答:1

我目前正在使用 Python 编写一个网页抓取脚本,该脚本使用 urllib、BeautifulSoup 和 pandas 从网站的多个页面中提取表数据。该脚本旨在处理 gzip 和 brotli 等内容编码,并且它会重试某些 HTTP 错误,例如具有指数退避的 429(请求过多)。

我使用 ProcessPoolExecutor 实现了并发处理,以加快处理速度。然而,该脚本仍然需要大量的时间来运行,大约 395 秒。我相信还有巨大的优化空间。

下面是我正在使用的完整脚本:

import urllib.request
from bs4 import BeautifulSoup
import pandas as pd
import gzip
import brotli
import io
import time
import traceback
from concurrent.futures import ProcessPoolExecutor, as_completed
import logging

# Setup logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
log_stream = io.StringIO()
handler = logging.StreamHandler(log_stream)
formatter = logging.Formatter('%(asctime)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)

def get_page_content(url):
    req = urllib.request.Request(url, headers={
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
        'Accept-Language': 'en-US,en;q=0.5',
        'Accept-Encoding': 'gzip, deflate, br',
        'Connection': 'keep-alive',
        'Upgrade-Insecure-Requests': '1'
    })
    response = urllib.request.urlopen(req)
    if response.info().get('Content-Encoding') == 'gzip':
        buf = io.BytesIO(response.read())
        data = gzip.GzipFile(fileobj=buf).read()
    elif response.info().get('Content-Encoding') == 'br':
        data = brotli.decompress(response.read())
    else:
        data = response.read()
    return data

def extract_table_data(page_url, page_number):
    try:
        webpage = get_page_content(page_url)
        soup = BeautifulSoup(webpage, 'html.parser')
        div_element = soup.find('div', class_='tw-mb-6 lg:tw-mb-12')
        if div_element:
            html_table = div_element.find('table')
            if html_table:
                df = pd.read_html(io.StringIO(str(html_table)))[0]
                df = df.loc[:, df.columns[1:-1]]
                df['Page Number'] = page_number
                return df
            else:
                logger.info(f"No table found in the specified div for URL: {page_url}")
        else:
            logger.info(f"Specified div element not found for URL: {page_url}")
    except urllib.error.HTTPError as e:
        if e.code == 404:
            logger.info(f"HTTP Error 404 on page {page_number}. Stopping scraping.")
            raise e
        logger.error(f"HTTP Error on page {page_number}: {str(e)}")
        traceback.print_exc()
    except Exception as e:
        logger.error(f"An error occurred for URL {page_url}: {str(e)}")
        traceback.print_exc()
    return None

def process_page(page):
    logger.info(f"Starting to process page {page}")
    try:
        url = base_url + str(page)
        logger.info(f"Fetching URL: {url}")
        retries = 0
        while retries < max_retries:
            try:
                df = extract_table_data(url, page)
                if df is not None:
                    return df
                else:
                    logger.info(f"No data found on page {page}, stopping.")
                    return None
            except urllib.error.HTTPError as e:
                if e.code == 404:
                    raise e
                elif e.code == 429:
                    logger.warning(f"HTTP Error 429 on page {page}: Too Many Requests. Retrying after delay...")
                    retries += 1
                    time.sleep(retry_delay * retries)
                else:
                    logger.info(f"HTTP Error on page {page}: {e.code}. Retrying...")
                    retries += 1
                    time.sleep(retry_delay)
            except Exception as e:
                logger.error(f"An error occurred on page {page}: {str(e)}. Retrying...")
                traceback.print_exc()
                retries += 1
                time.sleep(retry_delay)
    except Exception as e:
        logger.error(f"Failed to process page {page}: {str(e)}")
        traceback.print_exc()
    logger.info(f"Finished processing page {page}")
    return None

base_url = 'https://www.coingecko.com/en/coins/1/markets/spot?page='
all_data = pd.DataFrame()
start_page = 1
max_retries = 3
retry_delay = 5
max_consecutive_errors = 5
start_time = time.time()

with ProcessPoolExecutor(max_workers=2) as executor:
    futures = {}
    consecutive_errors = 0
    current_page = start_page

    while True:
        try:
            future = executor.submit(process_page, current_page)
            futures[future] = current_page
            current_page += 1
            completed_futures = [future for future in as_completed(futures) if future.done()]
            for future in completed_futures:
                page = futures.pop(future)
                try:
                    df = future.result()
                    if df is not None:
                        all_data = pd.concat([all_data, df], ignore_index=True)
                        consecutive_errors = 0
                    else:
                        consecutive_errors += 1
                except urllib.error.HTTPError as e:
                    if e.code == 404:
                        logger.info("Reached a page that does not exist. Stopping.")
                        break
                    consecutive_errors += 1
                except Exception as e:
                    logger.error(f"An error occurred while processing page {page}: {str(e)}")
                    consecutive_errors += 1
                if consecutive_errors >= max_consecutive_errors:
                    logger.info(f"Stopping due to {max_consecutive_errors} consecutive errors.")
                    break
            if consecutive_errors >= max_consecutive_errors or 'HTTP Error 404' in log_stream.getvalue():
                break
        except Exception as e:
            logger.error(f"Process pool encountered an error: {str(e)}")
            break

end_time = time.time()
duration = end_time - start_time
logger.info(f"Total time taken: {duration:.2f} seconds")
print(f"Total time taken: {duration:.2f} seconds")

save_path = r'C:\Users\hamid\Downloads\Crypto_Data_Table.csv'
all_data.to_csv(save_path, index=False)
logger.info(f"All data saved to '{save_path}'")

是否有任何具体的调整或优化可以显着加快执行时间?使用另一种并发方法(例如 ThreadPoolExecutor 或其他库)会有帮助吗?或者有没有一种方法可以优化数据获取和处理,从而减少总体时间?任何有关减少执行时间的建议将不胜感激。

python web-scraping beautifulsoup python-requests multiprocessing
1个回答
0
投票

不要刮擦。 (您担心“429 轮询太快”这一事实是有启发性的。)

有免费的 API 为了这。使用它。

© www.soinside.com 2019 - 2024. All rights reserved.