如果提供的名称服务器无效,Python dns.resolver 是否使用后备名称服务器?

问题描述 投票:0回答:1

我正在尝试使用 Python 检查 DNS 记录,但是我知道我正在检查的 IP 地址不是有效的解析器,但它返回了正确的 NS 记录。那么,dns.resolver 是否执行某种后备?或者它使用了之前调用的函数中的值?

我对特定 IP 执行挖掘请求:

dig @1.3.66.109 cloudflare.com NS

它失败了,因为该 IP 不是有效的解析器,但我的 Python 脚本说它是有效的。

我调用这个函数来从Google获取Google NS记录:

def get_google_nameservers(retries=3):
    for _ in range(retries):
        try:
            resolver = dns.resolver.Resolver()
            resolver.nameservers = ['8.8.8.8']
            resolver.timeout = 20
            resolver.lifetime = 20
            response = resolver.resolve('google.com', 'NS')
            google_nameservers = sorted([r.to_text().strip('.') for r in response])
            return google_nameservers
        except (dns.resolver.LifetimeTimeout, dns.resolver.NoNameservers) as e:
            print(f"Error fetching Google nameservers, retrying... Error: {e}")
            time.sleep(5)
    raise Exception("Failed to get Google nameservers after multiple attempts")

然后我调用这个函数来检查 NS 记录并与 Google 的响应进行比较:

def check_ns_record(ip, google_nameservers):
    try:
        resolver = dns.resolver.Resolver()
        resolver.nameservers = [ip]
        resolver.timeout = 20
        resolver.lifetime = 20

        start_time = time.time()
        response = resolver.resolve('google.com', 'NS', raise_on_no_answer=True)
        response_time = time.time() - start_time

        resolved_ns = sorted([r.to_text().strip('.') for r in response])
        is_valid = not set(google_nameservers).difference(resolved_ns)

        print(f"Checked IP: {ip}, Resolved NS: {resolved_ns}, Google NS: {google_nameservers}, Is Valid: {is_valid}")

        return {
            'ip': ip,
            'status': 'valid',
            'is_valid': is_valid,
            'response_time': response_time
        }
    except (dns.resolver.NoNameservers, dns.resolver.Timeout, dns.resolver.NXDOMAIN, dns.resolver.NoAnswer) as e:
        print(f"Error for IP: {ip}, Error: {e}")
    except Exception as e:
        print(f"General error for IP: {ip}, Error: {e}")

    return {
        'ip': ip,
        'status': 'error',
        'is_valid': False,
        'response_time': None,
        'error': str(e)
    }

它还会立即返回 NS 记录,因此没有足够的时间超时,而且我完全困惑于它从哪里获取记录,因为我确定它不是来自该 IP 地址。

有人遇到过这种情况吗?可以分享一下吗?我们将不胜感激。

这是上下文的完整脚本:

import concurrent.futures
import pymysql
import dns.resolver
import socket
import time

# Database connection function
def get_database_connection():
    try:
        conn = pymysql.connect(
            host='localhost',
            user='user',
            password='pass',
            db='db',
            charset='utf8mb4',
            cursorclass=pymysql.cursors.DictCursor
        )
        print("Database connection established.")
        return conn
    except pymysql.MySQLError as e:
        print(f"Error connecting to database: {e}")
        raise

# Function to generate IPv4 addresses
def generate_ipv4_addresses(start, limit):
    ipv4_addresses = []
    current_address = int.from_bytes(map(int, start.split('.')), byteorder='big')

    for _ in range(limit):
        if current_address > int.from_bytes(map(int, '255.255.255.255'.split('.')), byteorder='big'):
            break
        ipv4_addresses.append('.'.join(map(str, current_address.to_bytes(4, byteorder='big'))))
        current_address += 1

    return ipv4_addresses

# Function to check NS record
def check_ns_record(ip, google_nameservers):
    try:
        resolver = dns.resolver.Resolver()
        resolver.nameservers = [ip]
        resolver.timeout = 20
        resolver.lifetime = 20

        start_time = time.time()
        response = resolver.resolve('google.com', 'NS', raise_on_no_answer=True)
        response_time = time.time() - start_time

        resolved_ns = sorted([r.to_text().strip('.') for r in response])
        is_valid = not set(google_nameservers).difference(resolved_ns)

        print(f"Checked IP: {ip}, Resolved NS: {resolved_ns}, Google NS: {google_nameservers}, Is Valid: {is_valid}")

        return {
            'ip': ip,
            'status': 'valid',
            'is_valid': is_valid,
            'response_time': response_time
        }
    except (dns.resolver.NoNameservers, dns.resolver.Timeout, dns.resolver.NXDOMAIN, dns.resolver.NoAnswer) as e:
        print(f"Error for IP: {ip}, Error: {e}")
    except Exception as e:
        print(f"General error for IP: {ip}, Error: {e}")

    return {
        'ip': ip,
        'status': 'error',
        'is_valid': False,
        'response_time': None,
        'error': str(e)
    }

# Function to normalize DNS records
def normalize_dns_record(record):
    return record.rstrip('.')

# Get current Google nameservers from Google for testing DNS Resolvers
def get_google_nameservers(retries=3):
    for _ in range(retries):
        try:
            resolver = dns.resolver.Resolver()
            resolver.nameservers = ['8.8.8.8']
            resolver.timeout = 20
            resolver.lifetime = 20
            response = resolver.resolve('google.com', 'NS')
            google_nameservers = sorted([r.to_text().strip('.') for r in response])
            return google_nameservers
        except (dns.resolver.LifetimeTimeout, dns.resolver.NoNameservers) as e:
            print(f"Error fetching Google nameservers, retrying... Error: {e}")
            time.sleep(5)
    raise Exception("Failed to get Google nameservers after multiple attempts")

# Main function to execute DNS checks and update the database
def main():
    start_time = time.time()  # Start the timer

    google_nameservers = get_google_nameservers()
    print("Google nameservers obtained.")

    with get_database_connection() as conn:
        with conn.cursor() as cursor:
            print("Fetching the last IPv4 address.")
            cursor.execute("SELECT ipv4_address FROM last_ipv4 ORDER BY id DESC LIMIT 1")
            result = cursor.fetchone()
            last_ipv4 = result['ipv4_address'] if result else '0.255.255.255'
            print(f"Last IPv4 address: {last_ipv4}")

    ipv4_addresses = generate_ipv4_addresses(last_ipv4, 1)
    print(f"Generated {len(ipv4_addresses)} IPv4 addresses.")

    # Use ThreadPoolExecutor for concurrent DNS checks
    max_workers = min(1000, 14 * 4)  # Using 4 times the number of cores for max workers
    total_ips = len(ipv4_addresses)
    completed_ips = 0

    print("Starting DNS checks.")
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(check_ns_record, ip, google_nameservers): ip for ip in ipv4_addresses}
        
        for future in concurrent.futures.as_completed(futures):
            result = future.result()
            completed_ips += 1
            if completed_ips % 100 == 0:
                print(f"Completed {completed_ips} of {total_ips} IPs")

    print("DNS checks completed. Updating the database.")
    try:
        with get_database_connection() as conn:
            with conn.cursor() as cursor:
                for future in futures:
                    result = future.result()
                    if result['is_valid']:
                        ip_binary = result['ip'].encode('utf-8')
                        response_time = result['response_time']
                        print(f"Checking for duplicate {result['ip']}")
                        cursor.execute("SELECT * FROM resolvers WHERE ip = %s", (ip_binary,))
                        check_row = cursor.fetchone()

                        if not check_row:
                            cursor.execute("""
                                INSERT INTO resolvers (ip, is_valid, successes, reliability, last_response, all_responses, avg_response)
                                VALUES (%s, 1, 1, 100, %s, %s, %s)
                                ON DUPLICATE KEY UPDATE ip = ip, is_valid = is_valid, successes = successes, reliability = reliability, last_response = last_response, all_responses = all_responses, avg_response = avg_response
                            """, (ip_binary, response_time, response_time, response_time))

                        cursor.execute("INSERT INTO valid_ipv4 (ip) VALUES (%s) ON DUPLICATE KEY UPDATE ip = ip", (ip_binary,))

                new_last_ipv4 = ipv4_addresses[-1]
                cursor.execute("INSERT INTO last_ipv4 (id, ipv4_address) VALUES (1, %s) ON DUPLICATE KEY UPDATE ipv4_address = %s", (new_last_ipv4, new_last_ipv4))
            print("Database update completed.")
            conn.commit()
    except pymysql.MySQLError as e:
        print(f"Error updating database: {e}")
        raise

    end_time = time.time()  # End the timer
    total_time = end_time - start_time
    print(f"Total execution time: {total_time:.2f} seconds")

if __name__ == '__main__':
    main()

python dns resolver dnspython
1个回答
0
投票

dns.resolver 似乎默认使用缓存,必须使用

resolver.cache = None
禁用缓存。感谢您的帮助巴马尔。

© www.soinside.com 2019 - 2024. All rights reserved.