我已经处理这个问题一天了,并且在从 HMRC 和 Companies House API 获取公司数据时遇到问题。我主要需要营业额超过 100 万英镑的公司。我在代码中设置了
MIN_TURNOVER = 1_000_000
来过滤掉营业额低于 100 万英镑的公司。
这是我的代码
import os
import requests
import logging
import urllib.parse
import json
import csv
from datetime import datetime
import time
from hmrc_client import HMRCClient
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Constants
COMPANIES_API_KEY = os.getenv('COMPANIES_API_KEY')
BASE_URL = 'https://api.companieshouse.gov.uk'
RATE_LIMIT_DELAY = 0.6 # Minimum delay between requests (in seconds)
MIN_DIRECTOR_AGE = 50 # Minimum age for directors
MIN_TURNOVER = 1_000_000 # £1 million minimum turnover
class CompaniesHouseClient:
def __init__(self):
"""Initialize the Companies House client."""
self.api_key = os.getenv('COMPANIES_API_KEY')
if not self.api_key:
raise ValueError("COMPANIES_API_KEY not found in environment variables")
# Set up session with authentication
self.session = requests.Session()
self.session.auth = (self.api_key, '')
self.session.headers.update({
'Accept': 'application/json',
'User-Agent': 'CompanyDataRetrieval/1.0'
})
# Configure rate limiting
self.last_request_time = time.time()
self.request_times = [] # Keep track of request timestamps
self.max_requests_per_minute = 500 # Conservative limit
self.min_request_interval = 0.15 # Minimum time between requests in seconds
self.hmrc_client = HMRCClient()
logger.info("Initialized Companies House client")
def _rate_limit(self):
"""Implement rate limiting for API requests."""
current_time = time.time()
# Remove request timestamps older than 1 minute
self.request_times = [t for t in self.request_times if current_time - t <= 60]
# If we've made too many requests in the last minute, wait
if len(self.request_times) >= self.max_requests_per_minute:
sleep_time = 60 - (current_time - self.request_times[0])
if sleep_time > 0:
logger.info(f"Rate limit approaching, waiting {sleep_time:.2f} seconds...")
time.sleep(sleep_time)
self.request_times = [] # Reset after waiting
# Ensure minimum interval between requests
time_since_last_request = current_time - self.last_request_time
if time_since_last_request < self.min_request_interval:
time.sleep(self.min_request_interval - time_since_last_request)
self.last_request_time = time.time()
self.request_times.append(self.last_request_time)
def make_request(self, url, params=None):
"""Make a request to the Companies House API with retry logic"""
max_retries = 3
base_delay = 2 # Base delay for exponential backoff
for attempt in range(max_retries):
try:
self._rate_limit() # Apply rate limiting
logger.debug(f"Making request to {url}")
response = self.session.get(
url,
params=params,
timeout=30
)
if response.status_code == 429: # Rate limit exceeded
retry_after = int(response.headers.get('Retry-After', base_delay * (2 ** attempt)))
logger.warning(f"Rate limit exceeded. Waiting {retry_after} seconds...")
time.sleep(retry_after)
continue
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {str(e)}")
if attempt < max_retries - 1:
sleep_time = base_delay * (2 ** attempt)
logger.info(f"Retrying in {sleep_time} seconds...")
time.sleep(sleep_time)
else:
return None
return None
def search_companies(self, sic_code):
"""Search for companies with specific SIC code"""
companies = []
items_per_page = 100
max_results = 20000
processed_companies = set()
# Search terms optimized for each SIC code
search_terms = {
# General cleaning
'81210': [f'"{sic_code}" cleaning'],
'81200': [f'"{sic_code}" cleaning'],
# Specialized cleaning
'81220': [f'"{sic_code}" cleaning'],
'81221': [f'"{sic_code}" window cleaning'],
'81222': [f'"{sic_code}" specialized cleaning'],
'81223': [f'"{sic_code}" chimney cleaning'],
'81229': [f'"{sic_code}" specialized cleaning'],
# Other cleaning
'81290': [f'"{sic_code}" cleaning'],
'81291': [f'"{sic_code}" disinfecting'],
'81299': [f'"{sic_code}" cleaning'],
# Additional services
'81300': [f'"{sic_code}" landscaping'],
'82990': [f'"{sic_code}" cleaning'],
# Waste management
'38110': [f'"{sic_code}" waste'],
'38210': [f'"{sic_code}" waste treatment'],
'38220': [f'"{sic_code}" hazardous waste'],
'38320': [f'"{sic_code}" recycling']
}
terms = search_terms.get(sic_code, [f'"{sic_code}"'])
for term in terms:
logger.info(f"Searching with term: {term}")
start_index = 0
while start_index < max_results:
try:
params = {
'q': term,
'items_per_page': items_per_page,
'start_index': start_index,
'restrictions': 'active'
}
response_data = self.make_request(f"{BASE_URL}/search/companies", params)
if not response_data or 'items' not in response_data:
break
items = response_data['items']
if not items:
break
total_items = response_data.get('total_results', 0)
logger.info(f"Processing {len(items)} companies from index {start_index}. Total available: {total_items}")
# Process companies in batches
for company in items:
company_number = company.get('company_number')
if not company_number or company_number in processed_companies:
continue
# Get basic company details first
company_details = {
'company_number': company_number,
'company_name': company.get('company_name', ''),
'company_status': company.get('company_status', ''),
'date_of_creation': company.get('date_of_creation', ''),
'company_type': company.get('type', '')
}
# Only get full details if basic criteria are met
if company_details['company_status'].lower() == 'active':
full_details = self.get_company_details(company_number)
if full_details:
company_details.update(full_details)
companies.append(company_details)
processed_companies.add(company_number)
logger.debug(f"Found matching company: {company_details['company_name']}")
start_index += len(items)
if start_index >= min(total_items, max_results):
break
except Exception as e:
logger.error(f"Error processing search term {term} at index {start_index}: {str(e)}")
break
logger.info(f"Found {len(companies)} unique companies for SIC code {sic_code}")
return companies
def get_company_details(self, company_number):
"""Get detailed information about a company"""
if not company_number:
return None
url = f"{BASE_URL}/company/{company_number}"
data = self.make_request(url)
if data:
# Add the company number to the data if not present
data['company_number'] = company_number
# Clean up the company name
if 'company_name' not in data and 'title' in data:
data['company_name'] = data['title']
# Ensure SIC codes are present
if 'sic_codes' not in data:
data['sic_codes'] = []
return data
def get_company_officers(self, company_number):
"""Get officers of a company"""
if not company_number:
return None
url = f"{BASE_URL}/company/{company_number}/officers"
params = {
'items_per_page': 100,
'status': 'active' # Only get active officers
}
return self.make_request(url, params)
def get_company_accounts(self, company_number):
"""Get company accounts information"""
if not company_number:
return None
url = f"{BASE_URL}/company/{company_number}/filing-history"
data = self.make_request(url)
if not data or 'items' not in data:
logger.warning(f"No filing history found for company {company_number}")
return None
# Look for the most recent full accounts
for filing in data.get('items', []):
if filing.get('category') in ['accounts', 'accounts-with-accounts-type-full', 'accounts-with-accounts-type-small']:
accounts_data = filing.get('data', {})
# Try different possible turnover fields
turnover_fields = ['turnover', 'revenue', 'total_turnover', 'uk_turnover']
for field in turnover_fields:
if field in accounts_data:
try:
turnover_str = str(accounts_data[field])
# Handle different formats
if isinstance(turnover_str, (int, float)):
return float(turnover_str)
# Remove currency symbols and commas
turnover_str = turnover_str.replace('£', '').replace(',', '').strip()
# Handle ranges like "1000000-5000000"
if '-' in turnover_str:
lower, upper = map(str.strip, turnover_str.split('-'))
try:
# Try to get both bounds
lower_val = float(lower)
upper_val = float(upper)
# Use the higher value if both are valid
return max(lower_val, upper_val)
except ValueError:
# If upper bound fails, use lower bound
return float(lower)
# Handle text-based ranges
turnover_bands = {
'over £500m': 500_000_000,
'over £100m': 100_000_000,
'over £50m': 50_000_000,
'over £25m': 25_000_000,
'over £10m': 10_000_000,
'over £5m': 5_000_000,
'over £2m': 2_000_000,
'over £1m': 1_000_000,
}
for band, value in turnover_bands.items():
if band.lower() in turnover_str.lower():
return value
# Try direct conversion
return float(turnover_str)
except (ValueError, AttributeError) as e:
logger.warning(f"Could not parse turnover value '{accounts_data[field]}' for company {company_number}: {e}")
continue
logger.warning(f"No turnover information found in filing history for company {company_number}")
return None
def process_companies(self):
"""Process companies and save to CSV"""
# Define SIC codes for cleaning and waste management
sic_codes = {
"Cleaning": [
'81210', # General cleaning of buildings
'81229', # Other specialized cleaning activities
'81220', # Other building and industrial cleaning activities
'81222', # Specialized cleaning activities
'81221', # Window cleaning services
'81223', # Chimney cleaning services
'81299', # Other cleaning services n.e.c.
'81290', # Other cleaning activities
'81291', # Disinfecting and exterminating services
'81200', # General cleaning activities
'81300', # Landscaping activities
'82990', # Other business support activities
],
"Waste Management": [
'38110', # Collection of non-hazardous waste
'38320', # Recovery of sorted materials
'38220', # Treatment and disposal of hazardous waste
'38210', # Treatment and disposal of non-hazardous waste
]
}
# Create output file with timestamp
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_file = f'filtered_companies_{timestamp}.csv'
# Define CSV fields
fieldnames = [
'company_number', 'company_name', 'company_status',
'incorporation_date', 'sic_codes', 'registered_office_address',
'active_directors_over_50', 'company_type', 'companies_house_turnover',
'hmrc_turnover', 'last_accounts_date', 'category', 'vat_number'
]
processed_count = 0
saved_count = 0
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for category, codes in sic_codes.items():
logger.info(f"Processing {category} companies...")
for sic_code in codes:
logger.info(f"Searching for companies with SIC code {sic_code}")
companies = self.search_companies(sic_code)
if not companies:
logger.warning(f"No companies found for SIC code {sic_code}")
continue
# Process companies in batches
batch_size = 50 # Reduced batch size for better handling
for i in range(0, len(companies), batch_size):
batch = companies[i:i + batch_size]
logger.info(f"Processing batch {i//batch_size + 1} of {len(companies)//batch_size + 1}")
for company in batch:
processed_count += 1
company_number = company.get('company_number')
company_name = company.get('company_name', 'Unknown')
try:
# Get turnover information
ch_turnover = self.get_company_accounts(company_number)
# Get VAT number and HMRC turnover
vat_info = self.hmrc_client.get_vat_info(company_number)
hmrc_turnover = None
vat_number = None
if vat_info:
vat_number = vat_info.get('vatNumber')
if vat_number:
hmrc_turnover = self.hmrc_client.get_company_turnover(vat_number)
# Check if either turnover meets our criteria (£1M or more)
turnover_ok = False
# Check Companies House turnover
if ch_turnover and ch_turnover >= MIN_TURNOVER:
turnover_ok = True
# Check HMRC turnover if Companies House turnover wasn't sufficient
elif hmrc_turnover and hmrc_turnover >= MIN_TURNOVER:
turnover_ok = True
# Only proceed if we have a valid turnover of £1M or more
if not turnover_ok:
continue
# Save companies that have £1M+ turnover
company_data = {
'company_number': company_number,
'company_name': company_name,
'company_status': company.get('company_status', ''),
'incorporation_date': company.get('date_of_creation', ''),
'sic_codes': ', '.join(company.get('sic_codes', [])),
'registered_office_address': self._format_address(company.get('registered_office_address', {})),
'active_directors_over_50': '',
'company_type': company.get('type', ''),
'companies_house_turnover': f"£{ch_turnover:,.2f}" if ch_turnover else 'Not available',
'hmrc_turnover': f"£{hmrc_turnover:,.2f}" if hmrc_turnover else 'Not available',
'last_accounts_date': (
company.get('last_accounts', {}).get('made_up_to', 'Not available')
),
'category': category,
'vat_number': vat_number or 'Not available'
}
writer.writerow(company_data)
csvfile.flush() # Force write to disk
saved_count += 1
logger.info(f"Saved data for company {company_name}")
except Exception as e:
logger.error(f"Error processing company {company_name}: {str(e)}")
continue
# Add a small delay between companies
time.sleep(RATE_LIMIT_DELAY)
logger.info(f"Completed batch. Total processed: {processed_count}, Total saved: {saved_count}")
logger.info(f"Completed SIC code {sic_code}. Total processed: {processed_count}, Total saved: {saved_count}")
logger.info(f"Completed category {category}. Total processed: {processed_count}, Total saved: {saved_count}")
logger.info(f"Processing complete. Processed {processed_count} companies, saved {saved_count} to CSV")
return output_file
def _format_address(self, address_dict):
"""Format address dictionary into a string"""
if not address_dict:
return ''
address_parts = [
address_dict.get('address_line_1', ''),
address_dict.get('address_line_2', ''),
address_dict.get('locality', ''),
address_dict.get('region', ''),
address_dict.get('postal_code', ''),
address_dict.get('country', '')
]
return ', '.join(part for part in address_parts if part)
def calculate_age(self, date_of_birth):
"""Calculate age from date of birth dictionary"""
if not date_of_birth or 'year' not in date_of_birth:
return None
try:
# Create a date object using year and month (if available)
year = int(date_of_birth['year'])
month = int(date_of_birth.get('month', 1))
day = 1 # Default to first of the month
birth_date = datetime(year, month, day)
today = datetime.now()
age = today.year - birth_date.year
# Adjust age if birthday hasn't occurred this year
if today.month < birth_date.month:
age -= 1
return age
except (ValueError, TypeError):
logger.error(f"Error calculating age for date of birth: {date_of_birth}")
return None
def main():
try:
logger.info("Starting company data retrieval process")
client = CompaniesHouseClient()
output_file = client.process_companies()
if output_file:
logger.info(f"Data has been saved to {output_file}")
else:
logger.error("No data was saved")
except Exception as e:
logger.error(f"An error occurred: {str(e)}")
raise
if __name__ == "__main__":
main()
问题可能出在您对营业额的处理上: 说,您有一家有价值的公司
'over £2m'
在您的
get_company_accounts()
函数中,您首先从翻转字符串中删除所有 £ 符号,从而得到 'over 2m'
:
turnover_str = turnover_str.replace('£', '').replace(',', '').strip()`.
稍后,您尝试将成交量_str 与字典中的波段相匹配,但这些波段中仍然有 £ 符号。导致循环此选项的代码不返回值,因为它们永远不会命中
'over 2m'
。
turnover_bands = {
'over £500m': 500_000_000,
...
'over £2m': 2_000_000,
'over £1m': 1_000_000,
}
最后你尝试返回字符串的解析值
return float(turnover_str)
这会引发异常,因为
'over 2m'
无法解析为浮点数,导致您的代码捕获异常并忽略公司。
可能最简单的解决方案是更改您的
turnover_bands
字典键:
turnover_bands = {
'over 500m': 500_000_000,
...
'over 2m': 2_000_000,
'over 1m': 1_000_000
}