所以,我有一个简单的 python 脚本,旨在使用 scapy 检测名为 mininet 的虚拟网络环境中的 syn 洪水攻击。
from scapy.all import *
from collections import defaultdict
import threading
import time
import sys
# Data structures
syn_to_synack = defaultdict(lambda: [0, 0]) # SYN to SYN-ACK ratio tracker
blocked_ips = set() # Blocked IPs
# Thresholds
SYN_SYNACK_RATIO_THRESHOLD = 3 # Lower ratio for easier detection during testing
CHECK_INTERVAL = 1 # Interval in seconds to check thresholds
def monitor_packets(packet):
Function to process sniffed packets and update tracking data.
global blocked_ips
if packet.haslayer(TCP):
tcp = packet[TCP]
ip = packet[IP]
# If the source IP is blocked, drop the packet
if ip.src in blocked_ips:
print(f"[BLOCKED] Dropping packet from {ip.src}", flush=True)
# Track SYN packets
if tcp.flags == "S":
syn_to_synack[(ip.src, tcp.dport)][0] += 1 # Increment SYN count
print(f"[info] SYN packet tracked: {ip.src} -> {ip.dst}:{tcp.dport}", flush=True)
print(f"[info] Updated syn_to_synack: {dict(syn_to_synack)}", file=sys.stderr)
# Track SYN-ACK packets
if tcp.flags == "SA":
syn_to_synack[(ip.src, tcp.sport)][1] += 1 # Increment SYN-ACK count
time.sleep(0.01) # Artificial delay for debugging
print(f"[info] SYN-ACK packet tracked: {ip.src} -> {ip.dst}:{tcp.dport}", flush=True)
print(f"[info] Updated syn_to_synack: {dict(syn_to_synack)}", file=sys.stderr)
def sniff_packets():
Function to run sniffing in a separate thread.
print("[*] Starting packet sniffing...", flush=True)
sniff(filter="tcp", prn=monitor_packets, store=False, timeout=0.1)
def check_thresholds():
Function to check thresholds for SYN flood detection and block offending IPs.
global blocked_ips
print(f"[info] Checking thresholds...", flush=True)
print(f"[info] Current syn_to_synack state: {dict(syn_to_synack)}", file=sys.stderr)
print(f"[info] Current blocked IPs: {blocked_ips}", flush=True)
# Check SYN to SYN-ACK ratio
for key, counts in syn_to_synack.items():
syn_count, synack_count = counts
print(f"[info] Evaluating key: {key}, SYN count: {syn_count}, SYN-ACK count: {synack_count}", flush=True)
if synack_count == 0 or (syn_count / synack_count) > SYN_SYNACK_RATIO_THRESHOLD:
print(f"[ALERT] High SYN/SYN-ACK ratio for {key}: {syn_count}/{synack_count}", flush=True)
src_ip = key[0] # Extract the source IP from the key
print(f"[info] src_ip extracted: {src_ip}", flush=True)
print(f"[info] Checking if {src_ip} is already blocked...", flush=True)
if src_ip not in blocked_ips: # Block the source IP based on the ratio
print(f"[ACTION] Blocking IP: {src_ip}", flush=True)
blocked_ips.add(src_ip) # Block the IP
print(f"[info] {src_ip} is already blocked.", flush=True)
if __name__ == "__main__":
print("[*] Starting SYN flood detection with enhanced debugging...", flush=True)
# Start sniffing in a separate thread
sniff_thread = threading.Thread(target=sniff_packets, daemon=True)
# Run the threshold checking loop
while True:
print("[info] running threshold loop...", flush=True)
except KeyboardInterrupt:
print("\n[!] Stopping SYN flood detection.", flush=True)
此脚本应该在路由器中运行,嗅探 tcp 数据包并监控 syn/syn-ack 比率以标记这些攻击,然后阻止不良源 ip。我试过了:
[*] 开始使用双标志逻辑进行 SYN 泛洪检测...
[信息] SYN 从 到
[警报] 检测到 的高 SYN 速率(29 SYN/秒)
[!] 停止 SYN 泛洪检测。
while True:
放入单独的线程中,并将对 print() 的调用替换为将内容添加到队列中的内容。然后让主线程从队列中读取并打印它。
import queue
OUTOUT = queue.Queue()
def log(x):
[... Your code replacing print() by log()]
if __name__ == "__main__":
[Start thread 1]
[Start thread 2]
while True:
msg = OUTPUT.get()
os.write(1, b"The data you want to print")