我目前正在使用 Python 编写一个网页抓取脚本,该脚本使用 urllib、BeautifulSoup 和 pandas 从网站的多个页面中提取表数据。该脚本旨在处理 gzip 和 brotli 等内容编码,并且它会重试某些 HTTP 错误,例如具有指数退避的 429(请求过多)。
我使用 ProcessPoolExecutor 实现了并发处理,以加快处理速度。然而,该脚本仍然需要大量的时间来运行,大约 395 秒。我相信还有巨大的优化空间。
下面是我正在使用的完整脚本:
import urllib.request
from bs4 import BeautifulSoup
import pandas as pd
import gzip
import brotli
import io
import time
import traceback
from concurrent.futures import ProcessPoolExecutor, as_completed
import logging
# Setup logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
log_stream = io.StringIO()
handler = logging.StreamHandler(log_stream)
formatter = logging.Formatter('%(asctime)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
def get_page_content(url):
req = urllib.request.Request(url, headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
})
response = urllib.request.urlopen(req)
if response.info().get('Content-Encoding') == 'gzip':
buf = io.BytesIO(response.read())
data = gzip.GzipFile(fileobj=buf).read()
elif response.info().get('Content-Encoding') == 'br':
data = brotli.decompress(response.read())
else:
data = response.read()
return data
def extract_table_data(page_url, page_number):
try:
webpage = get_page_content(page_url)
soup = BeautifulSoup(webpage, 'html.parser')
div_element = soup.find('div', class_='tw-mb-6 lg:tw-mb-12')
if div_element:
html_table = div_element.find('table')
if html_table:
df = pd.read_html(io.StringIO(str(html_table)))[0]
df = df.loc[:, df.columns[1:-1]]
df['Page Number'] = page_number
return df
else:
logger.info(f"No table found in the specified div for URL: {page_url}")
else:
logger.info(f"Specified div element not found for URL: {page_url}")
except urllib.error.HTTPError as e:
if e.code == 404:
logger.info(f"HTTP Error 404 on page {page_number}. Stopping scraping.")
raise e
logger.error(f"HTTP Error on page {page_number}: {str(e)}")
traceback.print_exc()
except Exception as e:
logger.error(f"An error occurred for URL {page_url}: {str(e)}")
traceback.print_exc()
return None
def process_page(page):
logger.info(f"Starting to process page {page}")
try:
url = base_url + str(page)
logger.info(f"Fetching URL: {url}")
retries = 0
while retries < max_retries:
try:
df = extract_table_data(url, page)
if df is not None:
return df
else:
logger.info(f"No data found on page {page}, stopping.")
return None
except urllib.error.HTTPError as e:
if e.code == 404:
raise e
elif e.code == 429:
logger.warning(f"HTTP Error 429 on page {page}: Too Many Requests. Retrying after delay...")
retries += 1
time.sleep(retry_delay * retries)
else:
logger.info(f"HTTP Error on page {page}: {e.code}. Retrying...")
retries += 1
time.sleep(retry_delay)
except Exception as e:
logger.error(f"An error occurred on page {page}: {str(e)}. Retrying...")
traceback.print_exc()
retries += 1
time.sleep(retry_delay)
except Exception as e:
logger.error(f"Failed to process page {page}: {str(e)}")
traceback.print_exc()
logger.info(f"Finished processing page {page}")
return None
base_url = 'https://www.coingecko.com/en/coins/1/markets/spot?page='
all_data = pd.DataFrame()
start_page = 1
max_retries = 3
retry_delay = 5
max_consecutive_errors = 5
start_time = time.time()
with ProcessPoolExecutor(max_workers=2) as executor:
futures = {}
consecutive_errors = 0
current_page = start_page
while True:
try:
future = executor.submit(process_page, current_page)
futures[future] = current_page
current_page += 1
completed_futures = [future for future in as_completed(futures) if future.done()]
for future in completed_futures:
page = futures.pop(future)
try:
df = future.result()
if df is not None:
all_data = pd.concat([all_data, df], ignore_index=True)
consecutive_errors = 0
else:
consecutive_errors += 1
except urllib.error.HTTPError as e:
if e.code == 404:
logger.info("Reached a page that does not exist. Stopping.")
break
consecutive_errors += 1
except Exception as e:
logger.error(f"An error occurred while processing page {page}: {str(e)}")
consecutive_errors += 1
if consecutive_errors >= max_consecutive_errors:
logger.info(f"Stopping due to {max_consecutive_errors} consecutive errors.")
break
if consecutive_errors >= max_consecutive_errors or 'HTTP Error 404' in log_stream.getvalue():
break
except Exception as e:
logger.error(f"Process pool encountered an error: {str(e)}")
break
end_time = time.time()
duration = end_time - start_time
logger.info(f"Total time taken: {duration:.2f} seconds")
print(f"Total time taken: {duration:.2f} seconds")
save_path = r'C:\Users\hamid\Downloads\Crypto_Data_Table.csv'
all_data.to_csv(save_path, index=False)
logger.info(f"All data saved to '{save_path}'")
是否有任何具体的调整或优化可以显着加快执行时间?使用另一种并发方法(例如 ThreadPoolExecutor 或其他库)会有帮助吗?或者有没有一种方法可以优化数据获取和处理,从而减少总体时间?任何有关减少执行时间的建议将不胜感激。
不要刮擦。 (您担心“429 轮询太快”这一事实是有启发性的。)
有免费的 API 为了这。使用它。