我如何通过调用不同驱动程序中的一个函数来覆盖问题,从而在 selenium python 中使用多处理进行网页抓取

问题描述 投票:0回答:1

我在网络抓取中需要执行一系列操作,但它花了很多时间,所以我不想一个一个地调用单个函数,而是想将其作为不同的进程传递,如果这样做,我必须面对多个驱动程序的冲突,那么如何才能我克服了这一点。

在没有任何冲突的情况下,我希望多个进程在自己的驱动程序中执行任务,并在其中加载先前的步骤。

这是代码:

#importing important libraries
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
import time
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from test_sheet import connection_sheet

options = Options()
# options.add_experimental_option("detach", True) 
# options.add_argument('headless')
driver = webdriver.Firefox()

data = {
    'Device':'-',
    'Processor':'-',
    'Memory Capacity':'-',
    'Storage Capacity':'-',
    'Condition':'-',
    'Battery Health':'-',
    'Include Charger':'-',
    'Fully Functional':'-',
    'Price':'-'
}


def find_and_fetch():
    global data
    try:
        # Use JavaScript to hide the banner
        driver.execute_script("""
            var banner = document.querySelector('a.message-banner');
            if (banner) {
                banner.style.display = 'none';
            }""")
    except:
        print("banner disabled")

    time.sleep(1)
    if driver.find_element(By.XPATH,"//h5[@class='ng-binding']").text:
        text = driver.find_element(By.XPATH,"//h5[@class='ng-binding']").text
        # find processor elements and do forloop for it and call this function recursively
        if 'Processor' in text:
            processor = []
            try:
                elem = driver.find_element(By.XPATH,"//div[@class='answers']")
                elems = elem.find_elements(By.XPATH, "//div[@ng-keydown='selectAnswer($index, $event)']")
                for i in elems:
                    processor.append(i.get_attribute('aria-label'))
                for prces in processor:
                    time.sleep(1)
                    data['Processor']=prces
                    print("=> processor : ",prces)
                    find_and_click(prces)
                    find_and_fetch()
                    # break            
            except Exception as e :
                # print("Error While passing to processor selection!")
                print(e)
            data['Processor']='-'
            return

        # find memory capacity elements and do forloop on it and call this function recursively
        if "memory capacity" in text:
            capas = []
            try:
                elem = driver.find_element(By.XPATH,"//div[@class='answers']")
                elems = elem.find_elements(By.XPATH, "//div[@ng-keydown='selectAnswer($index, $event)']")
                for i in elems:
                    capas.append(i.get_attribute('aria-label'))
                for capa in capas:
                    time.sleep(1)
                    data['Memory Capacity']=capa
                    print("=> memory capacity : ",capa)
                    find_and_click(capa)
                    find_and_fetch()
                    # break     
                small_back_button()       
            except :
                print("Error While passing to memory capacity!")
            data['Memory Capacity']='-'
            return

        # find storage capacity elements and do forloop on it and call this function recursively
        if "storage capacity" in text:
            storage = []
            try:
                elem = driver.find_element(By.XPATH,"//div[@class='answers']")
                elems = elem.find_elements(By.XPATH, "//div[@ng-keydown='selectAnswer($index, $event)']")
                for i in elems:
                    storage.append(i.get_attribute('aria-label'))
                for store in storage:
                    time.sleep(1)
                    start_time = time.time()
                    data['Storage Capacity']=store
                    print("=> storage capacity : ",store)
                    find_and_click(store)
                    find_and_fetch()
                    # break   
                    end_time = time.time()
                    time_taken = end_time-start_time
                    print(f"The code block took {time_taken:.4f} seconds to execute.")
                    driver.quit()
                    break
                small_back_button()  
                
            except :
                print("Error While passing to storage selection!")
            data['Storage Capacity']='-'
            return

        # find condition elemetns and do forloop on it and call this function recursively
        if "condition" in text:
            conditions = []
            try:
                elem = driver.find_element(By.XPATH,"//div[@class='answers']")
                elems = elem.find_elements(By.XPATH, "//div[@ng-keydown='selectAnswer($index, $event)']")
                for i in elems:
                    conditions.append(i.get_attribute('aria-label'))
                for condition in conditions:
                    time.sleep(1)
                    data['Condition']=condition
                    print("=> condition : ",condition)
                    find_and_click(condition)
                    next_button()
                    find_and_fetch()
                    # break 
                small_back_button()           
            except :
                print("Error While passing to condition selection!")
            data['Condition']='-'
            return

        # find battery health elements and do forloop on it and call this function recursively
        if "battery health" in text:
            health = []
            try:
                elem = driver.find_element(By.XPATH,"//div[@class='answers']")
                elems = elem.find_elements(By.XPATH, "//div[@ng-keydown='selectAnswer($index, $event)']")
                for i in elems:
                    health.append(i.get_attribute('aria-label'))
                for hlth in health:
                    time.sleep(1)
                    data['Battery Health']=hlth
                    print("=> bettery health : ",hlth)
                    find_and_click(hlth)
                    find_and_fetch()
                hlth = None
                time.sleep(1)
                small_back_button()            
            except:
                print("Error While passing to battery health selection!")
            data["Battery Health"]='-'
            return
        
        # find including charger elements and do forloop on it and call this function recursively
        if "charger" in text:
            charger = []
            try:
                elem = driver.find_element(By.XPATH,"//div[@class='answers']")
                elems = elem.find_elements(By.XPATH, "//div[@ng-keydown='selectAnswer($index, $event)']")
                for i in elems:
                    charger.append(i.get_attribute('aria-label'))
                for crgr in charger:
                    time.sleep(1)
                    data['Include Charger'] = crgr
                    print("=> include charger : ",crgr)
                    find_and_click(crgr)
                    find_and_fetch()
                time.sleep(2)
                small_back_button()            
            except :
                print("Error While passing to including charger selection!")
            data['Include Charger']='-'
            return

        # find fully functional elements and do forloop on it and call this function recursively
        if "fully functional" in text:
            functional = []
            try:
                elem = driver.find_element(By.XPATH,"//div[@class='answers']")
                elems = elem.find_elements(By.XPATH, "//div[@ng-keydown='selectAnswer($index, $event)']")
                for i in elems:
                    functional.append(i.get_attribute('aria-label'))  
                time.sleep(1)
                data['Fully Functional']=functional[0]
                find_and_click(functional[0]) 
                find_and_fetch()
                small_back_button()
                
            except :
                print("Error While passing to fully functional selection!")
            data['Fully Functional']='-'
            return
    # find final price page and call fetch details function and return all details
    time.sleep(1)
    if driver.find_element(By.XPATH, "//h3[@class='your-offer']").text:
        text = driver.find_element(By.XPATH, "//h3[@class='your-offer']").text
        if "Your device is valued at" in text:
            print('in the final page')
            fetch_info()
            time.sleep(1)
            large_back_button()
            return
        return
    
    # if nothing is found on page
    else:
        try:
            small_back_button()
        except:pass
        return

def find_and_click(elem):
    time.sleep(1)
    try:
        elem = WebDriverWait(driver, 5).until(
            EC.element_to_be_clickable((By.XPATH, f"//div[@aria-label='{elem}']"))
        )
        driver.execute_script("arguments[0].scrollIntoView(true);", elem)
        action = ActionChains(driver)
        action.move_to_element(elem).click().perform()
    except:print("could not find any clickable element")
    return

def next_button():
    time.sleep(1)
    next_button = WebDriverWait(driver, 5).until(
        EC.element_to_be_clickable(
            (By.XPATH, "//button[@class='button success right']")
        )
    )
    driver.execute_script("arguments[0].scrollIntoView(true);", next_button)

    action = ActionChains(driver)
    action.move_to_element(next_button).click().perform()
    return
    
def small_back_button():
    time.sleep(1)
    back = WebDriverWait(driver, 5).until(
        EC.element_to_be_clickable(
            (By.XPATH, "//button[@class='button secondary left']")
        )
    )
    # using this page will be scrolled to the element
    driver.execute_script("arguments[0].scrollIntoView(true);", back)
    # action chain to perform moving to the element and click it
    action = ActionChains(driver)
    action.move_to_element(back).click().perform()
    return

def large_back_button():
    time.sleep(1)
    back = WebDriverWait(driver, 5).until(
        EC.element_to_be_clickable(
            (
                By.XPATH,
                "//button[@class='button secondary large left no-margin']",
            )
        )
    )
    # using this page will be scrolled to the element
    driver.execute_script("arguments[0].scrollIntoView(true);", back)
    # action chain to perform moving to the element and click it
    action = ActionChains(driver)
    action.move_to_element(back).click().perform()
    return

def fetch_info():
    global data
    time.sleep(1)
    elem = driver.find_element(By.XPATH ,"//div[@class='pricing-form-final-offer']").text
    text = elem.split('\n')
    price = text[3]
    text = text[0].split(',')
    device_name = text[0].split(':')
    data['Device']=device_name[0]
    data['Price']=price
    print(data)
    connection_sheet(spreadsheet_id='1Ze7Uam6GhNGYPXvXYF3TLZPydkZQ6u5l4rmdc7CxLOU',data=data,user_sheet_name='MacInfo')
    return

def load_page(url):
    driver.get(url)
    time.sleep(2)
    find_and_fetch()

load_page(url='https://www.itsworthmore.com/sell/macbook-pro-m1/macbook-pro-16-m4')

因此,从内存存储中,我想将其划分为不同的驱动程序,进行自己的抓取,而不会相互冲突。

python algorithm selenium-webdriver web-scraping multiprocessing
1个回答
0
投票

并行处理的简单方法的细节是:
(1)将python的操作限制为第一个属性('Processor')的一个选择。
(2)让python接收选择的索引作为参数。
(3)准备3个批处理文件,其中每个索引都启动python,就像这样。

python py_main_mac_book.py 1
cmd/k

(4)双击运行所有批次。
以下代码可用于此目的。

要自动执行此操作,您可以使用以下代码中包含的 python 子进程类。当您使用参数 0 运行此代码时,它将同时对“处理器”的所有选择起作用,而它将对参数大于 0 的每个选择起作用。要使用它,请将其另存为“py_main_mac_book.py”。

并行调查 3 类“处理器”需要 19 分钟。
请注意,使用正常查看模式(非无头)并行运行进程,尤其是 3 个或更多进程,可能会导致 PC 热关机。

from selenium import webdriver
from selenium.webdriver.common.by import By
#from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.firefox.firefox_binary import FirefoxBinary
import time
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
#from selenium.webdriver.chrome.options import Options
#from test_sheet import connection_sheet
import sys
import os
import csv
from datetime import  datetime
import subprocess

class MacPrice:
    
    def __init__(self):

        os.environ['MOZ_HEADLESS'] = '1'
        # Select your Firefox binary.
        binary = FirefoxBinary('C:\\Program Files\\Mozilla Firefox\\firefox.exe', log_file=sys.stdout)
        # Start selenium with the configured binary.
        self.driver = webdriver.Firefox(firefox_binary=binary)
        
        url='https://www.itsworthmore.com/sell/macbook-pro-m1/macbook-pro-16-m4'
        self.driver.get(url)
        
        #accept cookie
        self.cookie_accept()

        self.at_names = {
            "Processor":"Processor",
            "memory capacity":"Memory Capacity",
            "storage capacity":"Storage Capacity",
            "condition":"Condition",
            "battery health":"Battery Health",
            "charger":"Include Charger",
            "fully functional":"Fully Functional" 
            }
        self.data = {
            'Device':'-',
            'Processor':'-',
            'Memory Capacity':'-',
            'Storage Capacity':'-',
            'Condition':'-',
            'Battery Health':'-',
            'Include Charger':'-',
            'Fully Functional':'-',
            'Price':'-'
            }

    def cookie_accept(self):
        #click cookie accept button
        try:
            cookie_button = WebDriverWait(self.driver, 20).until(
                EC.element_to_be_clickable(
                (By.XPATH,"//button[@class='js-cookie-consent-agree cookie-consent__agree cursor-pointer button small primary']"))
                )
        except:
            pass
        else:
            cookie_button.click()

    def num_selection(self):
        '''return number of selection'''
        
        select_elms = WebDriverWait(self.driver, 20).until(
            EC.presence_of_all_elements_located((By.XPATH,"//div[@class='answers']/div")))
        print('\nnumber of selections: ',len(select_elms))
        return len(select_elms)
    
    def survey_start(self,index,start_time):
        
        fout = open('mac_data-{}.csv'.format(index),'w',newline ='')
        self.writer = csv.writer(fout)
        self.writer.writerow(list(self.data.keys()))

        data_key = 'Processor'
        self.selected(data_key,index-1)
        self.tree_survey()

        fout.close()
        self.driver.quit()
        print('\nProcess {}'.format(index))
        print ('started at : ', start_time )
        print ('ended at   : ', datetime.today().strftime('%y-%m-%d %H:%M:%S') ,'\n')

    def tree_survey(self):

        data_key = self.attrib_name()
        
        select_elms = WebDriverWait(self.driver, 10).until(
            EC.presence_of_all_elements_located((By.XPATH,"//div[@class='answers']/div")))
        
        select_num = len(select_elms)
        if select_elms[0].is_displayed():
            for i in range(select_num):
                self.select_attrib(data_key,i)
                self.tree_survey()#recursion for the next attribute
            #data clear
            self.data[data_key] = '-'
            #going back
            back= self.driver.find_elements(
                    By.XPATH,"//div[@class='animate-wrap']/button")[0]
            self.driver.execute_script("arguments[0].scrollIntoView(true);", back)
            time.sleep(1)
            back.click()
            
        else:#price
            time.sleep(1)
            try:
                dev_elm=WebDriverWait(self.driver, 10).until(
                EC.presence_of_element_located((By.XPATH,"//div[@class='pricing-form-final-offer']/h3/strong"))
                )
                
            except:
                #case price not offered
                self.data['Price']= 'not offered'
                self.data['Device'] = '---'
                print(self.data['Price'])
                #back button
                back= self.driver.find_element(
                    By.XPATH,"//div[@class='pricing-form-final-offer']/button")

            else:
                self.data['Device'] = dev_elm.get_attribute('innerHTML')
                price_elm=self.data['Price'] = self.driver.find_element(
                    By.XPATH,"//h3[@class='your-offer']//strong")
                self.data['Price']= price_elm.get_attribute('innerHTML')
                print(self.data['Price'])
                #back button
                back= self.driver.find_elements(
                    By.XPATH,"//div[@class='offer']//button")[0]
            #output
            self.writer.writerow(list(self.data.values()))
            #back button
            self.driver.execute_script('arguments[0].scrollIntoView({behavior:"auto", block: "center"});', back)
            time.sleep(1)
            back.click()
            
    def select_attrib(self,data_key,i):

        select_elms = WebDriverWait(self.driver, 10).until(
                    EC.presence_of_all_elements_located((By.XPATH,"//div[@class='answers']/div")))
        self.data[data_key] = select_elms[i].text
        indicator = select_elms[i].find_element(By.XPATH,"./span[@class='selected-indicator']")
        indicator.click()
        if data_key == 'Condition':
            WebDriverWait(self.driver, 20).until(
                EC.presence_of_all_elements_located((By.XPATH,"//div[@class='animate-wrap']/button"))
                )[1].click()#forward button

    def attrib_name(self):
        prompt = WebDriverWait(self.driver, 20).until(
            EC.presence_of_element_located((By.XPATH,"//h5[@class='ng-binding']"))).text
        for key in self.at_names:
            if key in prompt:
                break
        return self.at_names[key]
    

if __name__ == "__main__":

    index = int(sys.argv[1])
    print('Process {} has started.'.format(index))
    start_time = datetime.now().strftime('%y-%m-%d %H:%M:%S')

    mp = MacPrice()

    if index == 0:
        processes = []
        num = mp.num_selection()
        for i in range(2,num+1):
            command = 'python py_main_mac_book.py {}'.format(i)
                    #this code itself
            process =subprocess.Popen(command.split(), 
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE)
            processes.append(process)
        mp.survey_start(1,start_time)

        print('\nReports from subprocesses follow:')
        for i, process in enumerate(processes):
            stdout, stderr = process.communicate()
            print('\nprocess ', i+2)
            if process.returncode == 0:
                print(f"succeed:\n {stdout.decode()}")
            else:
                print(f"error:\n {stdout.decode()}\n {stderr.decode()}")
        print('\nTotal Time:')
        print ('started at : ', start_time )
        print ('ended at   : ', datetime.today().strftime('%y-%m-%d %H:%M:%S') ,'\n')

    else:
        mp.survey_start(index,start_time)
        
© www.soinside.com 2019 - 2024. All rights reserved.