我在网络抓取中需要执行一系列操作,但它花了很多时间,所以我不想一个一个地调用单个函数,而是想将其作为不同的进程传递,如果这样做,我必须面对多个驱动程序的冲突,那么如何才能我克服了这一点。
在没有任何冲突的情况下,我希望多个进程在自己的驱动程序中执行任务,并在其中加载先前的步骤。
这是代码:
#importing important libraries
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.action_chains import ActionChains
import time
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from test_sheet import connection_sheet
options = Options()
# options.add_experimental_option("detach", True)
# options.add_argument('headless')
driver = webdriver.Firefox()
data = {
'Device':'-',
'Processor':'-',
'Memory Capacity':'-',
'Storage Capacity':'-',
'Condition':'-',
'Battery Health':'-',
'Include Charger':'-',
'Fully Functional':'-',
'Price':'-'
}
def find_and_fetch():
global data
try:
# Use JavaScript to hide the banner
driver.execute_script("""
var banner = document.querySelector('a.message-banner');
if (banner) {
banner.style.display = 'none';
}""")
except:
print("banner disabled")
time.sleep(1)
if driver.find_element(By.XPATH,"//h5[@class='ng-binding']").text:
text = driver.find_element(By.XPATH,"//h5[@class='ng-binding']").text
# find processor elements and do forloop for it and call this function recursively
if 'Processor' in text:
processor = []
try:
elem = driver.find_element(By.XPATH,"//div[@class='answers']")
elems = elem.find_elements(By.XPATH, "//div[@ng-keydown='selectAnswer($index, $event)']")
for i in elems:
processor.append(i.get_attribute('aria-label'))
for prces in processor:
time.sleep(1)
data['Processor']=prces
print("=> processor : ",prces)
find_and_click(prces)
find_and_fetch()
# break
except Exception as e :
# print("Error While passing to processor selection!")
print(e)
data['Processor']='-'
return
# find memory capacity elements and do forloop on it and call this function recursively
if "memory capacity" in text:
capas = []
try:
elem = driver.find_element(By.XPATH,"//div[@class='answers']")
elems = elem.find_elements(By.XPATH, "//div[@ng-keydown='selectAnswer($index, $event)']")
for i in elems:
capas.append(i.get_attribute('aria-label'))
for capa in capas:
time.sleep(1)
data['Memory Capacity']=capa
print("=> memory capacity : ",capa)
find_and_click(capa)
find_and_fetch()
# break
small_back_button()
except :
print("Error While passing to memory capacity!")
data['Memory Capacity']='-'
return
# find storage capacity elements and do forloop on it and call this function recursively
if "storage capacity" in text:
storage = []
try:
elem = driver.find_element(By.XPATH,"//div[@class='answers']")
elems = elem.find_elements(By.XPATH, "//div[@ng-keydown='selectAnswer($index, $event)']")
for i in elems:
storage.append(i.get_attribute('aria-label'))
for store in storage:
time.sleep(1)
start_time = time.time()
data['Storage Capacity']=store
print("=> storage capacity : ",store)
find_and_click(store)
find_and_fetch()
# break
end_time = time.time()
time_taken = end_time-start_time
print(f"The code block took {time_taken:.4f} seconds to execute.")
driver.quit()
break
small_back_button()
except :
print("Error While passing to storage selection!")
data['Storage Capacity']='-'
return
# find condition elemetns and do forloop on it and call this function recursively
if "condition" in text:
conditions = []
try:
elem = driver.find_element(By.XPATH,"//div[@class='answers']")
elems = elem.find_elements(By.XPATH, "//div[@ng-keydown='selectAnswer($index, $event)']")
for i in elems:
conditions.append(i.get_attribute('aria-label'))
for condition in conditions:
time.sleep(1)
data['Condition']=condition
print("=> condition : ",condition)
find_and_click(condition)
next_button()
find_and_fetch()
# break
small_back_button()
except :
print("Error While passing to condition selection!")
data['Condition']='-'
return
# find battery health elements and do forloop on it and call this function recursively
if "battery health" in text:
health = []
try:
elem = driver.find_element(By.XPATH,"//div[@class='answers']")
elems = elem.find_elements(By.XPATH, "//div[@ng-keydown='selectAnswer($index, $event)']")
for i in elems:
health.append(i.get_attribute('aria-label'))
for hlth in health:
time.sleep(1)
data['Battery Health']=hlth
print("=> bettery health : ",hlth)
find_and_click(hlth)
find_and_fetch()
hlth = None
time.sleep(1)
small_back_button()
except:
print("Error While passing to battery health selection!")
data["Battery Health"]='-'
return
# find including charger elements and do forloop on it and call this function recursively
if "charger" in text:
charger = []
try:
elem = driver.find_element(By.XPATH,"//div[@class='answers']")
elems = elem.find_elements(By.XPATH, "//div[@ng-keydown='selectAnswer($index, $event)']")
for i in elems:
charger.append(i.get_attribute('aria-label'))
for crgr in charger:
time.sleep(1)
data['Include Charger'] = crgr
print("=> include charger : ",crgr)
find_and_click(crgr)
find_and_fetch()
time.sleep(2)
small_back_button()
except :
print("Error While passing to including charger selection!")
data['Include Charger']='-'
return
# find fully functional elements and do forloop on it and call this function recursively
if "fully functional" in text:
functional = []
try:
elem = driver.find_element(By.XPATH,"//div[@class='answers']")
elems = elem.find_elements(By.XPATH, "//div[@ng-keydown='selectAnswer($index, $event)']")
for i in elems:
functional.append(i.get_attribute('aria-label'))
time.sleep(1)
data['Fully Functional']=functional[0]
find_and_click(functional[0])
find_and_fetch()
small_back_button()
except :
print("Error While passing to fully functional selection!")
data['Fully Functional']='-'
return
# find final price page and call fetch details function and return all details
time.sleep(1)
if driver.find_element(By.XPATH, "//h3[@class='your-offer']").text:
text = driver.find_element(By.XPATH, "//h3[@class='your-offer']").text
if "Your device is valued at" in text:
print('in the final page')
fetch_info()
time.sleep(1)
large_back_button()
return
return
# if nothing is found on page
else:
try:
small_back_button()
except:pass
return
def find_and_click(elem):
time.sleep(1)
try:
elem = WebDriverWait(driver, 5).until(
EC.element_to_be_clickable((By.XPATH, f"//div[@aria-label='{elem}']"))
)
driver.execute_script("arguments[0].scrollIntoView(true);", elem)
action = ActionChains(driver)
action.move_to_element(elem).click().perform()
except:print("could not find any clickable element")
return
def next_button():
time.sleep(1)
next_button = WebDriverWait(driver, 5).until(
EC.element_to_be_clickable(
(By.XPATH, "//button[@class='button success right']")
)
)
driver.execute_script("arguments[0].scrollIntoView(true);", next_button)
action = ActionChains(driver)
action.move_to_element(next_button).click().perform()
return
def small_back_button():
time.sleep(1)
back = WebDriverWait(driver, 5).until(
EC.element_to_be_clickable(
(By.XPATH, "//button[@class='button secondary left']")
)
)
# using this page will be scrolled to the element
driver.execute_script("arguments[0].scrollIntoView(true);", back)
# action chain to perform moving to the element and click it
action = ActionChains(driver)
action.move_to_element(back).click().perform()
return
def large_back_button():
time.sleep(1)
back = WebDriverWait(driver, 5).until(
EC.element_to_be_clickable(
(
By.XPATH,
"//button[@class='button secondary large left no-margin']",
)
)
)
# using this page will be scrolled to the element
driver.execute_script("arguments[0].scrollIntoView(true);", back)
# action chain to perform moving to the element and click it
action = ActionChains(driver)
action.move_to_element(back).click().perform()
return
def fetch_info():
global data
time.sleep(1)
elem = driver.find_element(By.XPATH ,"//div[@class='pricing-form-final-offer']").text
text = elem.split('\n')
price = text[3]
text = text[0].split(',')
device_name = text[0].split(':')
data['Device']=device_name[0]
data['Price']=price
print(data)
connection_sheet(spreadsheet_id='1Ze7Uam6GhNGYPXvXYF3TLZPydkZQ6u5l4rmdc7CxLOU',data=data,user_sheet_name='MacInfo')
return
def load_page(url):
driver.get(url)
time.sleep(2)
find_and_fetch()
load_page(url='https://www.itsworthmore.com/sell/macbook-pro-m1/macbook-pro-16-m4')
因此,从内存存储中,我想将其划分为不同的驱动程序,进行自己的抓取,而不会相互冲突。
并行处理的简单方法的细节是:
(1)将python的操作限制为第一个属性('Processor')的一个选择。
(2)让python接收选择的索引作为参数。
(3)准备3个批处理文件,其中每个索引都启动python,就像这样。
python py_main_mac_book.py 1
cmd/k
(4)双击运行所有批次。
以下代码可用于此目的。
要自动执行此操作,您可以使用以下代码中包含的 python 子进程类。当您使用参数 0 运行此代码时,它将同时对“处理器”的所有选择起作用,而它将对参数大于 0 的每个选择起作用。要使用它,请将其另存为“py_main_mac_book.py”。
并行调查 3 类“处理器”需要 19 分钟。
请注意,使用正常查看模式(非无头)并行运行进程,尤其是 3 个或更多进程,可能会导致 PC 热关机。
from selenium import webdriver
from selenium.webdriver.common.by import By
#from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.firefox.firefox_binary import FirefoxBinary
import time
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
#from selenium.webdriver.chrome.options import Options
#from test_sheet import connection_sheet
import sys
import os
import csv
from datetime import datetime
import subprocess
class MacPrice:
def __init__(self):
os.environ['MOZ_HEADLESS'] = '1'
# Select your Firefox binary.
binary = FirefoxBinary('C:\\Program Files\\Mozilla Firefox\\firefox.exe', log_file=sys.stdout)
# Start selenium with the configured binary.
self.driver = webdriver.Firefox(firefox_binary=binary)
url='https://www.itsworthmore.com/sell/macbook-pro-m1/macbook-pro-16-m4'
self.driver.get(url)
#accept cookie
self.cookie_accept()
self.at_names = {
"Processor":"Processor",
"memory capacity":"Memory Capacity",
"storage capacity":"Storage Capacity",
"condition":"Condition",
"battery health":"Battery Health",
"charger":"Include Charger",
"fully functional":"Fully Functional"
}
self.data = {
'Device':'-',
'Processor':'-',
'Memory Capacity':'-',
'Storage Capacity':'-',
'Condition':'-',
'Battery Health':'-',
'Include Charger':'-',
'Fully Functional':'-',
'Price':'-'
}
def cookie_accept(self):
#click cookie accept button
try:
cookie_button = WebDriverWait(self.driver, 20).until(
EC.element_to_be_clickable(
(By.XPATH,"//button[@class='js-cookie-consent-agree cookie-consent__agree cursor-pointer button small primary']"))
)
except:
pass
else:
cookie_button.click()
def num_selection(self):
'''return number of selection'''
select_elms = WebDriverWait(self.driver, 20).until(
EC.presence_of_all_elements_located((By.XPATH,"//div[@class='answers']/div")))
print('\nnumber of selections: ',len(select_elms))
return len(select_elms)
def survey_start(self,index,start_time):
fout = open('mac_data-{}.csv'.format(index),'w',newline ='')
self.writer = csv.writer(fout)
self.writer.writerow(list(self.data.keys()))
data_key = 'Processor'
self.selected(data_key,index-1)
self.tree_survey()
fout.close()
self.driver.quit()
print('\nProcess {}'.format(index))
print ('started at : ', start_time )
print ('ended at : ', datetime.today().strftime('%y-%m-%d %H:%M:%S') ,'\n')
def tree_survey(self):
data_key = self.attrib_name()
select_elms = WebDriverWait(self.driver, 10).until(
EC.presence_of_all_elements_located((By.XPATH,"//div[@class='answers']/div")))
select_num = len(select_elms)
if select_elms[0].is_displayed():
for i in range(select_num):
self.select_attrib(data_key,i)
self.tree_survey()#recursion for the next attribute
#data clear
self.data[data_key] = '-'
#going back
back= self.driver.find_elements(
By.XPATH,"//div[@class='animate-wrap']/button")[0]
self.driver.execute_script("arguments[0].scrollIntoView(true);", back)
time.sleep(1)
back.click()
else:#price
time.sleep(1)
try:
dev_elm=WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.XPATH,"//div[@class='pricing-form-final-offer']/h3/strong"))
)
except:
#case price not offered
self.data['Price']= 'not offered'
self.data['Device'] = '---'
print(self.data['Price'])
#back button
back= self.driver.find_element(
By.XPATH,"//div[@class='pricing-form-final-offer']/button")
else:
self.data['Device'] = dev_elm.get_attribute('innerHTML')
price_elm=self.data['Price'] = self.driver.find_element(
By.XPATH,"//h3[@class='your-offer']//strong")
self.data['Price']= price_elm.get_attribute('innerHTML')
print(self.data['Price'])
#back button
back= self.driver.find_elements(
By.XPATH,"//div[@class='offer']//button")[0]
#output
self.writer.writerow(list(self.data.values()))
#back button
self.driver.execute_script('arguments[0].scrollIntoView({behavior:"auto", block: "center"});', back)
time.sleep(1)
back.click()
def select_attrib(self,data_key,i):
select_elms = WebDriverWait(self.driver, 10).until(
EC.presence_of_all_elements_located((By.XPATH,"//div[@class='answers']/div")))
self.data[data_key] = select_elms[i].text
indicator = select_elms[i].find_element(By.XPATH,"./span[@class='selected-indicator']")
indicator.click()
if data_key == 'Condition':
WebDriverWait(self.driver, 20).until(
EC.presence_of_all_elements_located((By.XPATH,"//div[@class='animate-wrap']/button"))
)[1].click()#forward button
def attrib_name(self):
prompt = WebDriverWait(self.driver, 20).until(
EC.presence_of_element_located((By.XPATH,"//h5[@class='ng-binding']"))).text
for key in self.at_names:
if key in prompt:
break
return self.at_names[key]
if __name__ == "__main__":
index = int(sys.argv[1])
print('Process {} has started.'.format(index))
start_time = datetime.now().strftime('%y-%m-%d %H:%M:%S')
mp = MacPrice()
if index == 0:
processes = []
num = mp.num_selection()
for i in range(2,num+1):
command = 'python py_main_mac_book.py {}'.format(i)
#this code itself
process =subprocess.Popen(command.split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
processes.append(process)
mp.survey_start(1,start_time)
print('\nReports from subprocesses follow:')
for i, process in enumerate(processes):
stdout, stderr = process.communicate()
print('\nprocess ', i+2)
if process.returncode == 0:
print(f"succeed:\n {stdout.decode()}")
else:
print(f"error:\n {stdout.decode()}\n {stderr.decode()}")
print('\nTotal Time:')
print ('started at : ', start_time )
print ('ended at : ', datetime.today().strftime('%y-%m-%d %H:%M:%S') ,'\n')
else:
mp.survey_start(index,start_time)