所以,我有一个简单的 python 脚本,旨在使用 scapy 检测名为 mininet 的虚拟网络环境中的 syn 洪水攻击。
from scapy.all import *
from collections import defaultdict
import threading
import time
import sys
# Data structures
syn_to_synack = defaultdict(lambda: [0, 0]) # SYN to SYN-ACK ratio tracker
blocked_ips = set() # Blocked IPs
# Thresholds
SYN_SYNACK_RATIO_THRESHOLD = 3 # Lower ratio for easier detection during testing
CHECK_INTERVAL = 1 # Interval in seconds to check thresholds
def monitor_packets(packet):
"""
Function to process sniffed packets and update tracking data.
"""
global blocked_ips
if packet.haslayer(TCP):
tcp = packet[TCP]
ip = packet[IP]
# If the source IP is blocked, drop the packet
if ip.src in blocked_ips:
print(f"[BLOCKED] Dropping packet from {ip.src}", flush=True)
return
# Track SYN packets
if tcp.flags == "S":
syn_to_synack[(ip.src, tcp.dport)][0] += 1 # Increment SYN count
time.sleep(0.01)
print(f"[info] SYN packet tracked: {ip.src} -> {ip.dst}:{tcp.dport}", flush=True)
print(f"[info] Updated syn_to_synack: {dict(syn_to_synack)}", file=sys.stderr)
# Track SYN-ACK packets
if tcp.flags == "SA":
syn_to_synack[(ip.src, tcp.sport)][1] += 1 # Increment SYN-ACK count
time.sleep(0.01) # Artificial delay for debugging
print(f"[info] SYN-ACK packet tracked: {ip.src} -> {ip.dst}:{tcp.dport}", flush=True)
print(f"[info] Updated syn_to_synack: {dict(syn_to_synack)}", file=sys.stderr)
def sniff_packets():
"""
Function to run sniffing in a separate thread.
"""
print("[*] Starting packet sniffing...", flush=True)
sniff(filter="tcp", prn=monitor_packets, store=False, timeout=0.1)
def check_thresholds():
"""
Function to check thresholds for SYN flood detection and block offending IPs.
"""
global blocked_ips
print(f"[info] Checking thresholds...", flush=True)
print(f"[info] Current syn_to_synack state: {dict(syn_to_synack)}", file=sys.stderr)
print(f"[info] Current blocked IPs: {blocked_ips}", flush=True)
# Check SYN to SYN-ACK ratio
for key, counts in syn_to_synack.items():
syn_count, synack_count = counts
print(f"[info] Evaluating key: {key}, SYN count: {syn_count}, SYN-ACK count: {synack_count}", flush=True)
if synack_count == 0 or (syn_count / synack_count) > SYN_SYNACK_RATIO_THRESHOLD:
print(f"[ALERT] High SYN/SYN-ACK ratio for {key}: {syn_count}/{synack_count}", flush=True)
src_ip = key[0] # Extract the source IP from the key
print(f"[info] src_ip extracted: {src_ip}", flush=True)
print(f"[info] Checking if {src_ip} is already blocked...", flush=True)
if src_ip not in blocked_ips: # Block the source IP based on the ratio
print(f"[ACTION] Blocking IP: {src_ip}", flush=True)
blocked_ips.add(src_ip) # Block the IP
time.sleep(0.1)
else:
print(f"[info] {src_ip} is already blocked.", flush=True)
if __name__ == "__main__":
print("[*] Starting SYN flood detection with enhanced debugging...", flush=True)
try:
# Start sniffing in a separate thread
sniff_thread = threading.Thread(target=sniff_packets, daemon=True)
sniff_thread.start()
# Run the threshold checking loop
while True:
time.sleep(CHECK_INTERVAL)
print("[info] running threshold loop...", flush=True)
check_thresholds()
except KeyboardInterrupt:
print("\n[!] Stopping SYN flood detection.", flush=True)
此脚本应该在路由器中运行,嗅探 tcp 数据包并监控 syn/syn-ack 比率以标记这些攻击,然后阻止不良源 ip。我试过了:
[*] 开始使用双标志逻辑进行 SYN 泛洪检测...
[信息] SYN 从 192.168.1.2 到 192.168.2.2:12345
...消息不断显示,直到此消息开始显示
[警报] 检测到 192.168.1.2 的高 SYN 速率(29 SYN/秒)
...此消息一直显示,直到我终止程序
[!] 停止 SYN 泛洪检测。
这是一个相当简单的脚本,但我一直无法理解为什么有些消息会被打印而有些则不会。我知道这可能是一个逻辑错误,但是为什么在这些消息之前的其他一些消息不会以任何方式打印出来?
问题是您有两个线程都写入相同的输出(stdout)。这会导致消息混合在一起的竞争条件。
解决此问题的方法是仅让一个线程写入标准输出。您可以将
while True:
放入单独的线程中,并将对 print() 的调用替换为将内容添加到队列中的内容。然后让主线程从队列中读取并打印它。
import queue
OUTOUT = queue.Queue()
def log(x):
OUTPUT.put(x)
[... Your code replacing print() by log()]
if __name__ == "__main__":
[Start thread 1]
[Start thread 2]
while True:
msg = OUTPUT.get()
print(msg)
另一个选项(更脏)是使用:
os.write(1, b"The data you want to print")
根据我的经验,这在多线程环境中效果更好。