无法从 Companies House 和 HMRC API 获取营业额数据超过 200 万英镑的数据

问题描述 投票:0回答:1

我已经处理这个问题一天了,并且在从 HMRC 和 Companies House API 获取公司数据时遇到问题。我主要需要营业额超过 100 万英镑的公司。我在代码中设置了

MIN_TURNOVER = 1_000_000
来过滤掉营业额低于 100 万英镑的公司。

这是我的代码

import os
import requests
import logging
import urllib.parse
import json
import csv
from datetime import datetime
import time
from hmrc_client import HMRCClient
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Configure logging
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Constants
COMPANIES_API_KEY = os.getenv('COMPANIES_API_KEY')
BASE_URL = 'https://api.companieshouse.gov.uk'
RATE_LIMIT_DELAY = 0.6  # Minimum delay between requests (in seconds)
MIN_DIRECTOR_AGE = 50  # Minimum age for directors
MIN_TURNOVER = 1_000_000  # £1 million minimum turnover

class CompaniesHouseClient:
    def __init__(self):
        """Initialize the Companies House client."""
        self.api_key = os.getenv('COMPANIES_API_KEY')
        if not self.api_key:
            raise ValueError("COMPANIES_API_KEY not found in environment variables")
            
        # Set up session with authentication
        self.session = requests.Session()
        self.session.auth = (self.api_key, '')
        self.session.headers.update({
            'Accept': 'application/json',
            'User-Agent': 'CompanyDataRetrieval/1.0'
        })
        
        # Configure rate limiting
        self.last_request_time = time.time()
        self.request_times = []  # Keep track of request timestamps
        self.max_requests_per_minute = 500  # Conservative limit
        self.min_request_interval = 0.15  # Minimum time between requests in seconds
        
        self.hmrc_client = HMRCClient()
        logger.info("Initialized Companies House client")

    def _rate_limit(self):
        """Implement rate limiting for API requests."""
        current_time = time.time()
        
        # Remove request timestamps older than 1 minute
        self.request_times = [t for t in self.request_times if current_time - t <= 60]
        
        # If we've made too many requests in the last minute, wait
        if len(self.request_times) >= self.max_requests_per_minute:
            sleep_time = 60 - (current_time - self.request_times[0])
            if sleep_time > 0:
                logger.info(f"Rate limit approaching, waiting {sleep_time:.2f} seconds...")
                time.sleep(sleep_time)
                self.request_times = []  # Reset after waiting
        
        # Ensure minimum interval between requests
        time_since_last_request = current_time - self.last_request_time
        if time_since_last_request < self.min_request_interval:
            time.sleep(self.min_request_interval - time_since_last_request)
        
        self.last_request_time = time.time()
        self.request_times.append(self.last_request_time)

    def make_request(self, url, params=None):
        """Make a request to the Companies House API with retry logic"""
        max_retries = 3
        base_delay = 2  # Base delay for exponential backoff
        
        for attempt in range(max_retries):
            try:
                self._rate_limit()  # Apply rate limiting
                
                logger.debug(f"Making request to {url}")
                response = self.session.get(
                    url,
                    params=params,
                    timeout=30
                )
                
                if response.status_code == 429:  # Rate limit exceeded
                    retry_after = int(response.headers.get('Retry-After', base_delay * (2 ** attempt)))
                    logger.warning(f"Rate limit exceeded. Waiting {retry_after} seconds...")
                    time.sleep(retry_after)
                    continue
                
                response.raise_for_status()
                return response.json()
                
            except requests.exceptions.RequestException as e:
                logger.error(f"Request failed: {str(e)}")
                if attempt < max_retries - 1:
                    sleep_time = base_delay * (2 ** attempt)
                    logger.info(f"Retrying in {sleep_time} seconds...")
                    time.sleep(sleep_time)
                else:
                    return None
        
        return None

    def search_companies(self, sic_code):
        """Search for companies with specific SIC code"""
        companies = []
        items_per_page = 100
        max_results = 20000
        processed_companies = set()
        
        # Search terms optimized for each SIC code
        search_terms = {
            # General cleaning
            '81210': [f'"{sic_code}" cleaning'],
            '81200': [f'"{sic_code}" cleaning'],
            
            # Specialized cleaning
            '81220': [f'"{sic_code}" cleaning'],
            '81221': [f'"{sic_code}" window cleaning'],
            '81222': [f'"{sic_code}" specialized cleaning'],
            '81223': [f'"{sic_code}" chimney cleaning'],
            '81229': [f'"{sic_code}" specialized cleaning'],
            
            # Other cleaning
            '81290': [f'"{sic_code}" cleaning'],
            '81291': [f'"{sic_code}" disinfecting'],
            '81299': [f'"{sic_code}" cleaning'],
            
            # Additional services
            '81300': [f'"{sic_code}" landscaping'],
            '82990': [f'"{sic_code}" cleaning'],
            
            # Waste management
            '38110': [f'"{sic_code}" waste'],
            '38210': [f'"{sic_code}" waste treatment'],
            '38220': [f'"{sic_code}" hazardous waste'],
            '38320': [f'"{sic_code}" recycling']
        }
        
        terms = search_terms.get(sic_code, [f'"{sic_code}"'])
        
        for term in terms:
            logger.info(f"Searching with term: {term}")
            start_index = 0
            
            while start_index < max_results:
                try:
                    params = {
                        'q': term,
                        'items_per_page': items_per_page,
                        'start_index': start_index,
                        'restrictions': 'active'
                    }
                    
                    response_data = self.make_request(f"{BASE_URL}/search/companies", params)
                    
                    if not response_data or 'items' not in response_data:
                        break
                    
                    items = response_data['items']
                    if not items:
                        break
                    
                    total_items = response_data.get('total_results', 0)
                    logger.info(f"Processing {len(items)} companies from index {start_index}. Total available: {total_items}")
                    
                    # Process companies in batches
                    for company in items:
                        company_number = company.get('company_number')
                        
                        if not company_number or company_number in processed_companies:
                            continue
                        
                        # Get basic company details first
                        company_details = {
                            'company_number': company_number,
                            'company_name': company.get('company_name', ''),
                            'company_status': company.get('company_status', ''),
                            'date_of_creation': company.get('date_of_creation', ''),
                            'company_type': company.get('type', '')
                        }
                        
                        # Only get full details if basic criteria are met
                        if company_details['company_status'].lower() == 'active':
                            full_details = self.get_company_details(company_number)
                            if full_details:
                                company_details.update(full_details)
                                companies.append(company_details)
                                processed_companies.add(company_number)
                                logger.debug(f"Found matching company: {company_details['company_name']}")
                    
                    start_index += len(items)
                    if start_index >= min(total_items, max_results):
                        break
                    
                except Exception as e:
                    logger.error(f"Error processing search term {term} at index {start_index}: {str(e)}")
                    break
        
        logger.info(f"Found {len(companies)} unique companies for SIC code {sic_code}")
        return companies

    def get_company_details(self, company_number):
        """Get detailed information about a company"""
        if not company_number:
            return None
            
        url = f"{BASE_URL}/company/{company_number}"
        data = self.make_request(url)
        
        if data:
            # Add the company number to the data if not present
            data['company_number'] = company_number
            
            # Clean up the company name
            if 'company_name' not in data and 'title' in data:
                data['company_name'] = data['title']
            
            # Ensure SIC codes are present
            if 'sic_codes' not in data:
                data['sic_codes'] = []
                
        return data

    def get_company_officers(self, company_number):
        """Get officers of a company"""
        if not company_number:
            return None
            
        url = f"{BASE_URL}/company/{company_number}/officers"
        params = {
            'items_per_page': 100,
            'status': 'active'  # Only get active officers
        }
        return self.make_request(url, params)

    def get_company_accounts(self, company_number):
        """Get company accounts information"""
        if not company_number:
            return None
            
        url = f"{BASE_URL}/company/{company_number}/filing-history"
        data = self.make_request(url)
        
        if not data or 'items' not in data:
            logger.warning(f"No filing history found for company {company_number}")
            return None
            
        # Look for the most recent full accounts
        for filing in data.get('items', []):
            if filing.get('category') in ['accounts', 'accounts-with-accounts-type-full', 'accounts-with-accounts-type-small']:
                accounts_data = filing.get('data', {})
                # Try different possible turnover fields
                turnover_fields = ['turnover', 'revenue', 'total_turnover', 'uk_turnover']
                for field in turnover_fields:
                    if field in accounts_data:
                        try:
                            turnover_str = str(accounts_data[field])
                            # Handle different formats
                            if isinstance(turnover_str, (int, float)):
                                return float(turnover_str)
                            
                            # Remove currency symbols and commas
                            turnover_str = turnover_str.replace('£', '').replace(',', '').strip()
                            
                            # Handle ranges like "1000000-5000000"
                            if '-' in turnover_str:
                                lower, upper = map(str.strip, turnover_str.split('-'))
                                try:
                                    # Try to get both bounds
                                    lower_val = float(lower)
                                    upper_val = float(upper)
                                    # Use the higher value if both are valid
                                    return max(lower_val, upper_val)
                                except ValueError:
                                    # If upper bound fails, use lower bound
                                    return float(lower)
                            
                            # Handle text-based ranges
                            turnover_bands = {
                                'over £500m': 500_000_000,
                                'over £100m': 100_000_000,
                                'over £50m': 50_000_000,
                                'over £25m': 25_000_000,
                                'over £10m': 10_000_000,
                                'over £5m': 5_000_000,
                                'over £2m': 2_000_000,
                                'over £1m': 1_000_000,
                            }
                            
                            for band, value in turnover_bands.items():
                                if band.lower() in turnover_str.lower():
                                    return value
                            
                            # Try direct conversion
                            return float(turnover_str)
                            
                        except (ValueError, AttributeError) as e:
                            logger.warning(f"Could not parse turnover value '{accounts_data[field]}' for company {company_number}: {e}")
                            continue
        
        logger.warning(f"No turnover information found in filing history for company {company_number}")
        return None

    def process_companies(self):
        """Process companies and save to CSV"""
        # Define SIC codes for cleaning and waste management
        sic_codes = {
            "Cleaning": [
                '81210',  # General cleaning of buildings
                '81229',  # Other specialized cleaning activities
                '81220',  # Other building and industrial cleaning activities
                '81222',  # Specialized cleaning activities
                '81221',  # Window cleaning services
                '81223',  # Chimney cleaning services
                '81299',  # Other cleaning services n.e.c.
                '81290',  # Other cleaning activities
                '81291',  # Disinfecting and exterminating services
                '81200',  # General cleaning activities
                '81300',  # Landscaping activities
                '82990',  # Other business support activities
            ],
            "Waste Management": [
                '38110',  # Collection of non-hazardous waste
                '38320',  # Recovery of sorted materials
                '38220',  # Treatment and disposal of hazardous waste
                '38210',  # Treatment and disposal of non-hazardous waste
            ]
        }
        
        # Create output file with timestamp
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        output_file = f'filtered_companies_{timestamp}.csv'
        
        # Define CSV fields
        fieldnames = [
            'company_number', 'company_name', 'company_status',
            'incorporation_date', 'sic_codes', 'registered_office_address',
            'active_directors_over_50', 'company_type', 'companies_house_turnover',
            'hmrc_turnover', 'last_accounts_date', 'category', 'vat_number'
        ]
        
        processed_count = 0
        saved_count = 0
        
        with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writeheader()
            
            for category, codes in sic_codes.items():
                logger.info(f"Processing {category} companies...")
                
                for sic_code in codes:
                    logger.info(f"Searching for companies with SIC code {sic_code}")
                    companies = self.search_companies(sic_code)
                    
                    if not companies:
                        logger.warning(f"No companies found for SIC code {sic_code}")
                        continue
                    
                    # Process companies in batches
                    batch_size = 50  # Reduced batch size for better handling
                    for i in range(0, len(companies), batch_size):
                        batch = companies[i:i + batch_size]
                        logger.info(f"Processing batch {i//batch_size + 1} of {len(companies)//batch_size + 1}")
                        
                        for company in batch:
                            processed_count += 1
                            company_number = company.get('company_number')
                            company_name = company.get('company_name', 'Unknown')
                            
                            try:
                                # Get turnover information
                                ch_turnover = self.get_company_accounts(company_number)
                                
                                # Get VAT number and HMRC turnover
                                vat_info = self.hmrc_client.get_vat_info(company_number)
                                hmrc_turnover = None
                                vat_number = None
                                
                                if vat_info:
                                    vat_number = vat_info.get('vatNumber')
                                    if vat_number:
                                        hmrc_turnover = self.hmrc_client.get_company_turnover(vat_number)
                                
                                # Check if either turnover meets our criteria (£1M or more)
                                turnover_ok = False
                                
                                # Check Companies House turnover
                                if ch_turnover and ch_turnover >= MIN_TURNOVER:
                                    turnover_ok = True
                                # Check HMRC turnover if Companies House turnover wasn't sufficient
                                elif hmrc_turnover and hmrc_turnover >= MIN_TURNOVER:
                                    turnover_ok = True
                                
                                # Only proceed if we have a valid turnover of £1M or more
                                if not turnover_ok:
                                    continue
                                
                                # Save companies that have £1M+ turnover
                                company_data = {
                                    'company_number': company_number,
                                    'company_name': company_name,
                                    'company_status': company.get('company_status', ''),
                                    'incorporation_date': company.get('date_of_creation', ''),
                                    'sic_codes': ', '.join(company.get('sic_codes', [])),
                                    'registered_office_address': self._format_address(company.get('registered_office_address', {})),
                                    'active_directors_over_50': '',
                                    'company_type': company.get('type', ''),
                                    'companies_house_turnover': f"£{ch_turnover:,.2f}" if ch_turnover else 'Not available',
                                    'hmrc_turnover': f"£{hmrc_turnover:,.2f}" if hmrc_turnover else 'Not available',
                                    'last_accounts_date': (
                                        company.get('last_accounts', {}).get('made_up_to', 'Not available')
                                    ),
                                    'category': category,
                                    'vat_number': vat_number or 'Not available'
                                }
                                
                                writer.writerow(company_data)
                                csvfile.flush()  # Force write to disk
                                saved_count += 1
                                logger.info(f"Saved data for company {company_name}")
                                
                            except Exception as e:
                                logger.error(f"Error processing company {company_name}: {str(e)}")
                                continue
                            
                            # Add a small delay between companies
                            time.sleep(RATE_LIMIT_DELAY)
                        
                        logger.info(f"Completed batch. Total processed: {processed_count}, Total saved: {saved_count}")
                    
                    logger.info(f"Completed SIC code {sic_code}. Total processed: {processed_count}, Total saved: {saved_count}")
                
                logger.info(f"Completed category {category}. Total processed: {processed_count}, Total saved: {saved_count}")
        
        logger.info(f"Processing complete. Processed {processed_count} companies, saved {saved_count} to CSV")
        return output_file

    def _format_address(self, address_dict):
        """Format address dictionary into a string"""
        if not address_dict:
            return ''
        
        address_parts = [
            address_dict.get('address_line_1', ''),
            address_dict.get('address_line_2', ''),
            address_dict.get('locality', ''),
            address_dict.get('region', ''),
            address_dict.get('postal_code', ''),
            address_dict.get('country', '')
        ]
        return ', '.join(part for part in address_parts if part)

    def calculate_age(self, date_of_birth):
        """Calculate age from date of birth dictionary"""
        if not date_of_birth or 'year' not in date_of_birth:
            return None
            
        try:
            # Create a date object using year and month (if available)
            year = int(date_of_birth['year'])
            month = int(date_of_birth.get('month', 1))
            day = 1  # Default to first of the month
            
            birth_date = datetime(year, month, day)
            today = datetime.now()
            
            age = today.year - birth_date.year
            
            # Adjust age if birthday hasn't occurred this year
            if today.month < birth_date.month:
                age -= 1
                
            return age
        except (ValueError, TypeError):
            logger.error(f"Error calculating age for date of birth: {date_of_birth}")
            return None

def main():
    try:
        logger.info("Starting company data retrieval process")
        client = CompaniesHouseClient()
        output_file = client.process_companies()
        
        if output_file:
            logger.info(f"Data has been saved to {output_file}")
        else:
            logger.error("No data was saved")
            
    except Exception as e:
        logger.error(f"An error occurred: {str(e)}")
        raise

if __name__ == "__main__":
    main()
python web-scraping
1个回答
0
投票

问题可能出在您对营业额的处理上: 说,您有一家有价值的公司

'over £2m'

在您的

get_company_accounts()
函数中,您首先从翻转字符串中删除所有 £ 符号,从而得到
'over 2m'
:

turnover_str = turnover_str.replace('£', '').replace(',', '').strip()`.

稍后,您尝试将成交量_str 与字典中的波段相匹配,但这些波段中仍然有 £ 符号。导致循环此选项的代码不返回值,因为它们永远不会命中

'over 2m'

turnover_bands = {
     'over £500m': 500_000_000,
     ...
     'over £2m': 2_000_000,
     'over £1m': 1_000_000,
 }

最后你尝试返回字符串的解析值

return float(turnover_str)

这会引发异常,因为

'over 2m'
无法解析为浮点数,导致您的代码捕获异常并忽略公司。

可能最简单的解决方案是更改您的

turnover_bands
字典键:

turnover_bands = {
         'over 500m': 500_000_000,
         ...
         'over 2m': 2_000_000,
         'over 1m': 1_000_000
     }
© www.soinside.com 2019 - 2024. All rights reserved.