如何使用 Selenium 在页面之间导航?

问题描述 投票:0回答:1

我正在尝试通过汽车列表抓取该网站 - cargurus.com。具体来说,我正在尝试从正在销售的法拉利中获取详细信息。我已经能够让它工作了,至少是第一页。然而,尽管它告诉我它已移至条目的下一页,但它仍然给我重复的结果。

我尝试过a)更新它正在查看的页面的链接

def __init__(self, car_make: str) -> None:
     self.car_make = car_make
     self.base_url = f"https://www.cargurus.com/Cars/l-Used-{car_make}-m25"

def load_page(self, page_number: int):
    url = f"{self.base_url}?resultsPage={page_number}"
    self.driver.get(url)
    print(f"\nAccessing URL: {url}")  # Print the URL at the top of each page scan

    # Wait for the page to load completely
    WebDriverWait(self.driver, 10).until(
        EC.presence_of_element_located((By.ID, 'cargurus-listing-search'))
    )

和 b) 使用 selenium 尝试模拟在当前页面上的所有条目都被抓取后单击下一页按钮:

def click_next_page(self):
    try:
        # Wait for the "Next Page" button to be clickable
        next_button = WebDriverWait(self.driver, 10).until(
            EC.element_to_be_clickable((By.CSS_SELECTOR, 'button[aria-labelledby="bottomPaginationNext"]'))
        )
        next_button.click()
        print("Clicked the 'Next Page' button.")
    except Exception as e:
        print(f"Failed to click 'Next Page' button. Msg: {e}")
        logging.error(f"Failed to click 'Next Page' button. Msg: {e}")

然而,当我运行代码时(我编写的代码是为了当发现重复的条目时,它会为了时间而自动停止),它仍然看到相同的条目(汽车列表)

我不知道如何继续。

这是我的完整代码:

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from dataclasses import dataclass, asdict
from typing import List
import csv
import logging
from bs4 import BeautifulSoup
import time

from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

@dataclass
class Car:
    link: str
    full_name: str
    hp: str
    engine: str
    year: int
    mileage: str
    price: int

class CarScraper:
    def __init__(self, car_make: str) -> None:
        self.car_make = car_make
        self.base_url = f"https://www.cargurus.com/Cars/l-Used-{car_make}-m25"

        # Set up Chrome options
        options = Options()
        options.add_argument("--headless")  # Run Chrome in headless mode
        options.add_argument("--no-sandbox")
        options.add_argument("--disable-dev-shm-usage")

        # Path to chromedriver
        self.driver = webdriver.Chrome(service=Service('/usr/local/bin/chromedriver'), options=options)
        self.processed_links = set()  # Set to keep track of processed car links
        self.processed_pages = set()  # Set to keep track of processed page numbers

    def load_page(self, page_number: int):
        url = f"{self.base_url}?resultsPage={page_number}"
        self.driver.get(url)
        print(f"\nAccessing URL: {url}")  # Print the URL at the top of each page scan

        # Wait for the page to load completely
        WebDriverWait(self.driver, 10).until(
            EC.presence_of_element_located((By.ID, 'cargurus-listing-search'))
        )

    def get_page_source(self) -> str:
        return self.driver.page_source

    def extract_cars_from_page(self, soup: BeautifulSoup) -> List[Car]:
        offers_table = soup.find('div', id='cargurus-listing-search')

        if offers_table is None:
            print("Failed to find the offers table on the page.")
            logging.error("Failed to find the offers table on the page.")
            print(soup.prettify())  # Print the prettified HTML for debugging
            return []

        print("Found offers table.")

        # Navigate through nested divs to find car entries
        nested_container = offers_table.find('div', class_='pecvNo')
        if nested_container is None:
            print("Failed to find the nested container.")
            logging.error("Failed to find the nested container.")
            return []

        car_entries_container = nested_container.find('div', class_='Km1Vfz')
        if car_entries_container is None:
            print("Failed to find the car entries container.")
            logging.error("Failed to find the car entries container.")
            return []

        car_entries = car_entries_container.find_all('div', class_='pazLTN')
        print(f"Found {len(car_entries)} car entries.")  # Debugging line

        list_of_cars = []
        for car in car_entries:
            try:
                link_element = car.find('a', href=True)
                link = link_element.get('href', 'N/A') if link_element else 'N/A'
                
                # Skip if this link has already been processed
                if link in self.processed_links:
                    continue
                
                self.processed_links.add(link)

                full_name_element = car.find('h4')
                full_name = full_name_element.get('title', 'N/A') if full_name_element else 'N/A'
                
                # Safely split full_name to get the year and the model
                split_name = full_name.split()
                year = int(split_name[0]) if split_name and split_name[0].isdigit() else 0
                model_name = " ".join(split_name[1:]) if len(split_name) > 1 else 'N/A'

                mileage_element = car.find('p', class_='us1dS iK3Zj _erOpv kIL3VY', attrs={'data-testid': 'srp-tile-mileage'})
                mileage_text = mileage_element.text.strip() if mileage_element else 'N/A'

                price_element = car.find('h4', class_='us1dS i5dPf SOf0Fe')
                price_text = price_element.text.strip().replace("$", "").replace(",", "") if price_element else '0'

                description_element = car.find('p', class_='us1dS iK3Zj _erOpv kIL3VY', attrs={'data-testid': 'seo-srp-tile-engine-display-name'})
                description_text = description_element.text.strip() if description_element else 'N/A'

                # Split the description into hp and engine
                if 'hp' in description_text:
                    hp, engine = description_text.split(' hp', 1)
                    hp = hp.strip()  # Ensure there are no leading/trailing spaces
                    engine = engine.strip()  # Remove any leading/trailing spaces from the engine part
                else:
                    hp = 'N/A'
                    engine = 'N/A'

                car_data = Car(
                    link="https://www.cargurus.com" + link,
                    full_name=model_name,
                    hp=hp,
                    engine=engine,
                    year=year,
                    mileage=mileage_text,
                    price=int(price_text)
                )
                list_of_cars.append(car_data)
                print(f"Extracted car: {car_data}")  # Debugging line
                logging.info(f"Extracted car: {car_data}")
            except Exception as e:
                logging.error(f"Failed to gather car. Msg: {e}")
                print(f"Failed to gather car. Msg: {e}")
        return list_of_cars

    def click_next_page(self):
        try:
            # Wait for the "Next Page" button to be clickable
            next_button = WebDriverWait(self.driver, 10).until(
                EC.element_to_be_clickable((By.CSS_SELECTOR, 'button[aria-labelledby="bottomPaginationNext"]'))
            )
            next_button.click()
            print("Clicked the 'Next Page' button.")
        except Exception as e:
            print(f"Failed to click 'Next Page' button. Msg: {e}")
            logging.error(f"Failed to click 'Next Page' button. Msg: {e}")

    def quit_driver(self):
        if self.driver:
            self.driver.quit()

def write_to_csv(cars: List[Car]) -> None:
    try:
        with open("cargurus_ferrari.csv", mode="w", newline='') as f:
            fieldnames = [
                "link",
                "full_name",
                "hp",
                "engine",
                "year",
                "mileage",
                "price",
            ]
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()
            for car in cars:
                writer.writerow(asdict(car))
    except IOError as e:
        print(f"Failed to write to CSV file. Msg: {e}")

def scrape_all_pages():
    make = "Ferrari"  # Adjust this as needed
    scraper = CarScraper(make)
    all_cars = []

    page_number = 1
    while True:
        print(f"Scraping page {page_number}...")
        scraper.load_page(page_number)
        page_source = scraper.get_page_source()
        soup = BeautifulSoup(page_source, "html.parser")

        # Extract cars from the current page
        cars = scraper.extract_cars_from_page(soup)

        if not cars:
            print("No more new cars found or failed to extract cars. Ending scraping.")
            logging.info("No more new cars found or failed to extract cars. Ending scraping.")
            break

        all_cars.extend(cars)
        print(f"Number of cars found on page {page_number}: {len(cars)}")
        logging.info(f"Number of cars found on page {page_number}: {len(cars)}")

        # Track processed pages
        scraper.processed_pages.add(page_number)

        # Click the "Next Page" button
        scraper.click_next_page()
        page_number += 1
        time.sleep(5)  # Be polite and avoid hitting the server too hard

    # Save all collected cars to CSV
    write_to_csv(all_cars)
    print("All cars saved to cargurus_ferrari.csv")
    logging.info("All cars saved to cargurus_ferrari.csv")

    # Clean up
    scraper.quit_driver()

if __name__ == '__main__':
    scrape_all_pages()
python selenium-webdriver web-scraping
1个回答
0
投票

您可以将 resultsPage 值从 1 更改为 1,直到分页按钮被禁用,如下所示:

from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver import ChromeOptions


URL = "https://www.cargurus.com/Cars/l-Used-Ferrari-m25#resultsPage={}"
SELECTOR = By.CSS_SELECTOR, "button[aria-labelledby='bottomPaginationNext']"

options = ChromeOptions()
options.add_argument('--headless=new')
options.add_argument("--start-maximized")

with webdriver.Chrome(options) as driver:
    page = 1
    wait = WebDriverWait(driver, 10)
    while True:
        url = URL.format(page)
        print(url)
        try:
            driver.get(url)
            # process page content here
            button = wait.until(EC.presence_of_element_located(SELECTOR))
            if button.get_attribute("disabled") is not None:
                break
        except Exception as e:
            break
        page += 1
© www.soinside.com 2019 - 2024. All rights reserved.