如何用Scrapy抓取页面的内链?

问题描述 投票:0回答:1

在下面的页面上,我有3个属于受害者、恶意软件和威胁源的链接列表,当我在抓取此页面期间到达上述项目时,我想输入它们的链接并抓取它们的内容。

https://icsstrive.com/incident/lockbit-ransomware-attack-significantly-impacts-owens-group-operations/

我为此编写了以下代码并尝试了不同的事情。问题是,当它抓取受害者并转到其他文件时,它会在输出文件中再次重复受害者并创建许多重复项,有时会跳过恶意软件或威胁源。

# importing the scrapy module
import random
import scrapy
import logging
from scrapy.utils.log import configure_logging
from pycti import OpenCTIApiClient
import stix2
from pycti import (
    Identity,
    ThreatActor,
    Malware,
    Location,
    StixCoreRelationship,
    Report,
)
import json

class icsstriveSpider(scrapy.Spider):

    stix_objects=[]

    name = "icsstrive"
    start_urls = ['https://icsstrive.com/']
    baseUrl="https://icsstrive.com"
    pages = None

    def parse(self, response, **kwargs):
        links = response.css('div.search-r-title a::attr(href)').getall()
        for i in range(len(links)):
            links[i] = links[i]
        yield from response.follow_all(links, self.parse_icsstrive)
      
        if self.pages is None:
            self.pages=response.xpath('//a[@class="wpv-filter-pagination-link js-wpv-pagination-link page-link"]/@href').getall()
        if len(self.pages) >0:
            url=self.pages[0]
            self.pages.remove(url)
            yield response.follow(self.baseUrl+url, self.parse,dont_filter=True)
       
        
    def parse_icsstrive(self, response, **kwargs):
        title = ""
        published = ""
        type = ""
        summary = ""
        incident_Date = ""
        location = ""
        estimated_Cost = ""
        victims_url = ""
        victim_title = ""
        malwares_urls = ""
        threat_source_urls = ""
        references_name = ""
        references_url = ""
        industries = ""
        impacts = ""
        title=response.xpath('//h1[@class="entry-title"]/text()').get()
        published=response.xpath('//p[@class="et_pb_title_meta_container"]/span/text()').get()
        type=response.xpath('//div[@class="et_pb_text_inner"]/text()').get()
        summary=response.xpath('//div[@class="et_pb_text_inner"]/p/text()').get()
        incident_Date = response.xpath('//h3[text()="Incident Date"]/following-sibling::*//text()').get()
        location = response.xpath('//h3[text()="Location"]/following-sibling::p/a/text()').get()
        estimated_Cost = response.xpath('//h3[text()="Estimated Cost"]/following-sibling::p/text()').get()
        victims_url = response.xpath('//div[@class="et_pb_text_inner"]/h3[text()="Victims"]/following-sibling::div/ul/li/a/@href').getall()
        malwares_urls = response.xpath('//div[@class="et_pb_text_inner"]/h3[text()="Type of Malware"]/following-sibling::div/ul/li/a/@href').getall()
        threat_source_urls = response.xpath('//div[@class="et_pb_text_inner"]/h3[text()="Threat Source"]/following-sibling::div/ul/li/a/@href').getall()
        references_name = response.xpath('//div[@class="et_pb_text_inner"]/h3[text()="References"]/following-sibling::div/ul/li/a/text()').getall()
        references_url = response.xpath('//div[@class="et_pb_text_inner"]/h3[text()="References"]/following-sibling::div/ul/li/a/@href').getall()
        industries = response.xpath('//h3[text()="Industries"]/following-sibling::p/a/text()').get()
        impacts = response.xpath('//h3[text()="Impacts"]/following-sibling::*//text()').get()

        item = {
            "title": title,
            "published": published,
            "type": type,
            "summary": summary,
            "incident_Date": incident_Date,
            "estimated_Cost": estimated_Cost,
            "references": ",".join(references_name),
            "industries": industries,
        }
        if location is not None:
            item["location"]= location.replace("'", '"')
        if impacts is not None:
            item["impacts"]= impacts.replace("'", '"')
       
        # Extract malware URLs
        if len(victims_url) > 0: 
            for url in victims_url:
                request= scrapy.Request(url + "?dummy=" + str(random.random()),callback=self.parse_victims,dont_filter=True,meta={'item': item, 'malwares_urls': malwares_urls, 'threat_source_urls':threat_source_urls})
                request.meta['dont_cache'] = True
                yield request
        else:
            yield item  

在此输入

 
    
    def parse_victims(self, response, **kwargs):
        victim_title = ""
        victim_published = ""
        victim_des = ""
        victim_title=response.xpath('//h1[@class="entry-title"]/text()').get()
        victim_des=response.xpath('//div[@class="et_pb_text_inner"]/p/text()').get()
        victim_published = response.xpath('//div[@class="et_pb_text_inner"]/h3[text()="Incidents"]/following-sibling::div/ul/li/strong/text()').getall()
    
        item = response.meta['item']
       
        malwares_urls = response.meta['malwares_urls']
        threat_source_urls = response.meta['threat_source_urls']
        item["victim_title"] = victim_title
        item["victim_des"] = victim_des
        item["victim_url"] = response.url
        if victim_published:
            item["victim_published"] = victim_published[0]
        if item["title"]=="Chinese Identified Hackers Targeting Hawaii Water Utilities and unidentified Oil & Gas Pipeline in US":
             print(item)
       
        if len(malwares_urls) > 0:
            for malware_url in malwares_urls:
                request= scrapy.Request(malware_url+ "?dummy=" + str(random.random()), callback=self.parse_malware,dont_filter=True, meta={'item': item, 'threat_source_urls':threat_source_urls})
                request.meta['dont_cache'] = True
                yield request
        elif len(threat_source_urls) > 0:
            for threat_source_url in threat_source_urls:
                request= scrapy.Request(threat_source_url+ "?dummy=" + str(random.random()), callback=self.parse_threat_source,dont_filter=True,meta={'item': item})
                request.meta['dont_cache'] = True
                yield request
        else:
            yield item   
    def parse_malware(self, response, **kwargs):
        malware_title = ""
        malware_published = ""
        malware_des = ""
        malware_title=response.xpath('//h1[@class="entry-title"]/text()').get()
        malware_des=response.xpath('//div[@class="et_pb_text_inner"]/p/text()').get()
        malware_published = response.xpath('//div[@class="et_pb_text_inner"]/h3[text()="Incidents"]/following-sibling::div/ul/li/strong/text()').getall()
        item = response.meta['item']
        threat_source_urls = response.meta['threat_source_urls']
        item["malware_title"] = malware_title
        item["malware_des"] = malware_des
        
        if malware_published:
            item["malware_published"] = malware_published[0]
        if len(threat_source_urls) > 0:
            for threat_source_url in threat_source_urls:
                request= scrapy.Request(threat_source_url+ "?dummy=" + str(random.random()), callback=self.parse_threat_source,dont_filter=True,meta={'item': item})
                request.meta['dont_cache'] = True
                yield request
        else:
            yield item   

    def parse_threat_source(self, response, **kwargs):
        threat_source_title = ""
        threat_source_published = ""
        threat_source_des = ""
        threat_source_title=response.xpath('//h1[@class="entry-title"]/text()').get()
        threat_source_des=response.xpath('//div[@class="et_pb_text_inner"]/p/text()').get()
        threat_source_published = response.xpath('//div[@class="et_pb_text_inner"]/h3[text()="Incidents"]/following-sibling::div/ul/li/strong/text()').getall()
        item = response.meta['item']
        item["threat_source_title"] = threat_source_title
        item["threat_source_des"] = threat_source_des
        if item["title"]=="Chinese Identified Hackers Targeting Hawaii Water Utilities and unidentified Oil & Gas Pipeline in US":
             print(item)
        if threat_source_published:
            item["threat_source_published"] = threat_source_published[0]
        yield item


python scrapy web-crawler
1个回答
0
投票

代码中存在多种原因导致项目在输出中重复出现。

原因

  1. 您几乎在所有请求中都使用
    dont_filter=True
    。这会禁用 scrapy 内置的重复过滤器,并导致多次解析同一页面。
  2. 在最后的
    parse
    方法中,您将收集所有分页链接,然后循环发送每个分页链接的请求,并将它们发送到解析方法。因此,每次您解析任何页面时,它都会再次向所有其他页面发出相同的请求。
  3. yield
    几乎每个解析方法都有一个字典,然后将同一字典传递给请求中的下一个解析方法
    meta
    并再次
    yield
    它,这将导致重复和不完整的项目,因为您在它们之前生成了它们已完成。

可能的解决方案

  1. 从请求中删除 dont_filter 参数。
  2. 不是每次运行 parse 方法时都发送每个页面的请求,而是仅提交下一页的请求。
  3. 不要生成不完整的项目,如果您要将字典发送到另一个解析方法,请等到字典完成后再将其输出。

这是 1 和 2 的示例:

import scrapy

class icsstriveSpider(scrapy.Spider):

    name = "icsstrive"
    start_urls = ['https://icsstrive.com/']
    baseUrl="https://icsstrive.com"
    pages = None

    def parse(self, response):
        for link in response.css('div.search-r-title a::attr(href)').getall():
            yield response.follow(link, self.parse_icsstrive)
        current_page = response.css('li.wpv_page_current')
        if next_page := current_page.xpath("./following-sibling::li/a/@href").get():
            yield scrapy.Request(response.urljoin(next_page))

    def parse_icsstrive(self, response):
        victims_links = response.xpath("//div[h3[text()='Victims']]//li/a/@href").getall()
        victims = response.xpath("//div[h3[text()='Victims']]//li//text()").getall()
        malware_links = response.xpath("//div[h3[text()='Type of Malware']]//li/a/@href").getall()
        malware = response.xpath("//div[h3[text()='Type of Malware']]//li//text()").getall()
        threat_source_links = response.xpath("//div[h3[text()='Threat Source']]//li/a/@href").getall()
        threat_source = response.xpath("//div[h3[text()='Threat Source']]//li/a/text()").getall()
        title = response.xpath('//h1[@class="entry-title"]/text()').get()
        yield {
            "title": title,
            "victims": victims,
            "victims_links": victims_links,
            "malware": malware,
            "malware_links": malware_links,
            "threat_source_links": threat_source_links,
            "threat_source": threat_source
        }

© www.soinside.com 2019 - 2024. All rights reserved.