#!/usr/bin/env python3
"""
Predator W6x — Advanced Multi-CVE Reconnaissance Scanner
Targets: CVE-49195 (mtk_dut/UCC), CVE-49196 (WiFi block injection), CVE-49199 (MQTT injection)
Military-grade async scanner with service fingerprinting and vulnerability confidence scoring.

Author: Advanced Persistent Security Research
"""

from __future__ import annotations

import argparse
import asyncio
import ipaddress
import json
import re
import socket
import sys
import time
from collections.abc import Sequence
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from enum import IntEnum, auto
from pathlib import Path
from typing import Any
from urllib.parse import urlparse

import aiohttp
from colorama import Fore, Style, init

try:
    from scapy.all import ARP, Ether, srp
    HAS_SCAPY = True
except ImportError:
    HAS_SCAPY = False

init(autoreset=True)


# ---------------------------------------------------------------------------
# Constants & Configuration
# ---------------------------------------------------------------------------

DEFAULT_TIMEOUT = 8
CONCURRENCY = 100
MTK_DUT_PORT = 9000
MQTT_PORT = 1883
WEB_PORT = 80
WEB_SSL_PORT = 443

# Known vulnerable firmware signatures
VULN_SIGNATURES = {
    "mtk_dut": ["UCC", "MTK", "MediaTek", "DUT"],
    "wifi_block": ["wifi", "block", "mac", "api/wifi"],
    "mqtt": ["mqtt", "predator", "control", "broker"],
}

class Color:
    OKGREEN = Fore.GREEN + Style.BRIGHT
    OKBLUE = Fore.BLUE + Style.BRIGHT
    OKCYAN = Fore.CYAN + Style.BRIGHT
    OKMAGENTA = Fore.MAGENTA + Style.BRIGHT
    WARNING = Fore.YELLOW + Style.BRIGHT
    FAIL = Fore.RED + Style.BRIGHT
    RESET = Style.RESET_ALL
    BOLD = Style.BRIGHT


class VulnStatus(IntEnum):
    VULNERABLE = auto()
    LIKELY = auto()
    POSSIBLE = auto()
    NOT_VULNERABLE = auto()
    UNKNOWN = auto()


@dataclass(frozen=True, slots=True)
class ServiceProbe:
    port: int
    protocol: str
    status: str
    banner: str = ""
    response_time_ms: float = 0.0


@dataclass(frozen=True, slots=True)
class HostResult:
    target: str
    ip: str
    mac: str = ""
    hostname: str = ""
    status: VulnStatus = VulnStatus.UNKNOWN
    confidence: int = 0
    findings: list[str] = field(default_factory=list)
    services: list[ServiceProbe] = field(default_factory=list)
    cve_matches: dict[str, bool] = field(default_factory=dict)
    recommendations: list[str] = field(default_factory=list)

    def to_dict(self) -> dict[str, Any]:
        return {
            "target": self.target,
            "ip": self.ip,
            "mac": self.mac,
            "hostname": self.hostname,
            "status": self.status.name,
            "confidence": self.confidence,
            "findings": self.findings,
            "services": [asdict(s) for s in self.services],
            "cve_matches": self.cve_matches,
            "recommendations": self.recommendations,
        }


# ---------------------------------------------------------------------------
# Async Probing Engine
# ---------------------------------------------------------------------------

class AsyncProber:
    def __init__(self, timeout: int = DEFAULT_TIMEOUT):
        self.timeout = aiohttp.ClientTimeout(total=timeout, connect=5)
        self._session: aiohttp.ClientSession | None = None

    async def __aenter__(self) -> "AsyncProber":
        conn = aiohttp.TCPConnector(
            limit=CONCURRENCY,
            limit_per_host=CONCURRENCY // 2,
            ssl=False,
            enable_cleanup_closed=True,
        )
        self._session = aiohttp.ClientSession(connector=conn, timeout=self.timeout)
        return self

    async def __aexit__(self, *args: object) -> None:
        if self._session:
            await self._session.close()

    async def probe_tcp_banner(self, ip: str, port: int, send_bytes: bytes = b"", proto: str = "tcp") -> ServiceProbe:
        start = time.perf_counter()
        try:
            reader, writer = await asyncio.wait_for(
                asyncio.open_connection(ip, port), timeout=self.timeout.total
            )
            if send_bytes:
                writer.write(send_bytes)
                await writer.drain()
            banner = ""
            try:
                banner_bytes = await asyncio.wait_for(reader.read(1024), timeout=2)
                banner = banner_bytes.decode("utf-8", errors="replace").strip()
            except asyncio.TimeoutError:
                pass
            writer.close()
            await writer.wait_closed()
            elapsed = (time.perf_counter() - start) * 1000
            return ServiceProbe(port=port, protocol=proto, status="open", banner=banner, response_time_ms=elapsed)
        except (ConnectionRefusedError, asyncio.TimeoutError, OSError):
            elapsed = (time.perf_counter() - start) * 1000
            return ServiceProbe(port=port, protocol=proto, status="closed", response_time_ms=elapsed)

    async def probe_http(self, ip: str, port: int, path: str = "/", ssl: bool = False) -> ServiceProbe:
        start = time.perf_counter()
        scheme = "https" if ssl else "http"
        url = f"{scheme}://{ip}:{port}{path}"
        try:
            assert self._session is not None
            async with self._session.get(url, ssl=False, allow_redirects=True) as resp:
                body = await resp.text()
                elapsed = (time.perf_counter() - start) * 1000
                return ServiceProbe(
                    port=port,
                    protocol="https" if ssl else "http",
                    status=str(resp.status),
                    banner=f"{resp.status} {len(body)} bytes",
                    response_time_ms=elapsed,
                )
        except Exception:
            elapsed = (time.perf_counter() - start) * 1000
            return ServiceProbe(port=port, protocol="https" if ssl else "http", status="error", response_time_ms=elapsed)


# ---------------------------------------------------------------------------
# Vulnerability Assessment
# ---------------------------------------------------------------------------

def assess_predator_vulns(result: HostResult) -> tuple[VulnStatus, int, dict[str, bool]]:
    """Assess vulnerability status for all 3 CVEs."""
    score = 0
    findings: list[str] = []
    cve_matches = {"CVE-49195": False, "CVE-49196": False, "CVE-49199": False}

    for svc in result.services:
        banner_lower = svc.banner.lower()

        # CVE-49195: mtk_dut/UCC on port 9000
        if svc.port == MTK_DUT_PORT and svc.status == "open":
            score += 25
            findings.append(f"mtk_dut service on port {MTK_DUT_PORT} (banner: {svc.banner[:80]})")
            if any(sig.lower() in banner_lower for sig in VULN_SIGNATURES["mtk_dut"]):
                score += 15
                cve_matches["CVE-49195"] = True
                findings.append("CVE-49195: mtk_dut/UCC command injection confirmed")

        # CVE-49199: MQTT on 1883
        if svc.port == MQTT_PORT and svc.status == "open":
            score += 20
            findings.append(f"MQTT broker on port {MQTT_PORT}")
            if any(sig.lower() in banner_lower for sig in VULN_SIGNATURES["mqtt"]):
                score += 15
                cve_matches["CVE-49199"] = True
                findings.append("CVE-49199: Predator MQTT injection vector confirmed")

        # CVE-49196: WiFi block endpoint (web)
        if svc.port in (WEB_PORT, WEB_SSL_PORT) and svc.status != "error":
            if any(sig.lower() in banner_lower for sig in VULN_SIGNATURES["wifi_block"]):
                score += 15
                cve_matches["CVE-49196"] = True
                findings.append("CVE-49196: WiFi block injection endpoint detected")

    # Determine overall status
    if score >= 70:
        return VulnStatus.VULNERABLE, min(score, 100), cve_matches
    elif score >= 45:
        return VulnStatus.LIKELY, min(score, 100), cve_matches
    elif score >= 20:
        return VulnStatus.POSSIBLE, min(score, 100), cve_matches
    elif score > 0:
        return VulnStatus.NOT_VULNERABLE, score, cve_matches
    return VulnStatus.UNKNOWN, score, cve_matches


# ---------------------------------------------------------------------------
# ARP Discovery (scapy-based)
# ---------------------------------------------------------------------------

def arp_discover(network: str) -> list[tuple[str, str]]:
    """Return list of (ip, mac) for live hosts in network."""
    if not HAS_SCAPY:
        return []
    try:
        ans, _ = srp(Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(pdst=network), timeout=3, verbose=0)
        return [(received.psrc, received.hwsrc) for _, received in ans]
    except Exception:
        return []


# ---------------------------------------------------------------------------
# Host Analysis Pipeline
# ---------------------------------------------------------------------------

async def analyze_host(prober: AsyncProber, ip: str, mac: str = "") -> HostResult:
    """Full analysis for a single host."""
    services: list[ServiceProbe] = []

    # Parallel service probes
    probe_tasks = [
        prober.probe_tcp_banner(ip, MTK_DUT_PORT, b"UCC help\n", "mtk_dut"),
        prober.probe_tcp_banner(ip, MQTT_PORT, b"", "mqtt"),
        prober.probe_http(ip, WEB_PORT, "/"),
        prober.probe_http(ip, WEB_SSL_PORT, "/", ssl=True),
        prober.probe_http(ip, WEB_PORT, "/api/wifi/block"),
        prober.probe_http(ip, WEB_PORT, "/cgi-bin/luci"),
    ]
    results = await asyncio.gather(*probe_tasks)
    services.extend(results)

    # Hostname resolution
    hostname = ""
    try:
        hostname = socket.gethostbyaddr(ip)[0]
    except Exception:
        pass

    result = HostResult(target=ip, ip=ip, mac=mac, hostname=hostname, services=services)
    status, confidence, cve_matches = assess_predator_vulns(result)
    result.status = status
    result.confidence = confidence
    result.cve_matches = cve_matches
    result.findings = [f for svc in services if svc.status in ("open", "200") for f in [f"{svc.protocol}/{svc.port}: {svc.banner[:120]}"]]

    # Recommendations
    if status in (VulnStatus.VULNERABLE, VulnStatus.LIKELY):
        result.recommendations.append("IMMEDIATE: Isolate device from management network")
        if cve_matches["CVE-49195"]:
            result.recommendations.append("Block port 9000 (mtk_dut) at network perimeter")
        if cve_matches["CVE-49199"]:
            result.recommendations.append("Block port 1883 (MQTT) or enforce authentication")
        if cve_matches["CVE-49196"]:
            result.recommendations.append("Patch WiFi block endpoint; restrict API access")
        result.recommendations.append("Update firmware to latest vendor release")

    return result


# ---------------------------------------------------------------------------
# CLI / Main
# ---------------------------------------------------------------------------

def print_banner() -> None:
    banner = f"""
{Color.FAIL}
╔══════════════════════════════════════════════════════════════════════════════╗
║  Predator W6x — Multi-CVE Reconnaissance Scanner                              ║
║  CVE-49195 (mtk_dut/UCC) | CVE-49196 (WiFi Block) | CVE-49199 (MQTT)         ║
║  Military-Grade Async Discovery & Vulnerability Assessment                    ║
╚══════════════════════════════════════════════════════════════════════════════╝
{Color.RESET}"""
    print(banner)


def parse_args() -> argparse.Namespace:
    p = argparse.ArgumentParser(
        description="Predator W6x Multi-CVE Scanner",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  python scanner.py -n 192.168.1.0/24
  python scanner.py -t 192.168.1.50 -o results.json
  python scanner.py -f targets.txt --threads 200
        """,
    )
    p.add_argument("-n", "--network", help="CIDR network range (e.g. 192.168.1.0/24)")
    p.add_argument("-t", "--target", help="Single target IP")
    p.add_argument("-f", "--file", help="File with target IPs (one per line)")
    p.add_argument("-o", "--output", help="JSON output file")
    p.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT, help=f"Timeout per probe (default {DEFAULT_TIMEOUT}s)")
    p.add_argument("--no-arp", action="store_true", help="Skip ARP discovery")
    p.add_argument("--threads", type=int, default=CONCURRENCY, help=f"Concurrent probes (default {CONCURRENCY})")
    return p.parse_args()


def load_targets(args: argparse.Namespace) -> list[str]:
    targets: list[str] = []
    if args.target:
        targets.append(args.target)
    if args.file:
        with open(args.file, encoding="utf-8") as fh:
            for line in fh:
                line = line.strip()
                if line and not line.startswith("#"):
                    targets.append(line)
    if args.network:
        try:
            net = ipaddress.ip_network(args.network, strict=False)
            targets.extend([str(h) for h in net.hosts()])
        except ValueError as exc:
            print(f"{Color.FAIL}[!] Invalid CIDR: {exc}{Color.RESET}")
            sys.exit(1)
    return targets


async def main_async() -> None:
    args = parse_args()
    print_banner()

    targets = load_targets(args)
    if not targets:
        print(f"{Color.WARNING}[!] No targets specified. Use -n, -t, or -f.{Color.RESET}")
        sys.exit(1)

    # ARP discovery for CIDR targets
    if args.network and not args.no_arp and HAS_SCAPY:
        print(f"{Color.OKBLUE}[*] ARP discovery on {args.network}...{Color.RESET}")
        arp_results = arp_discover(args.network)
        if arp_results:
            targets = list(dict.fromkeys([ip for ip, _ in arp_results] + targets))
            print(f"{Color.OKGREEN}[+] Discovered {len(arp_results)} live hosts{Color.RESET}")

    print(f"{Color.OKBLUE}[*] Loaded {len(targets)} target(s){Color.RESET}")
    print(f"{Color.OKBLUE}[*] Timeout: {args.timeout}s | Concurrency: {args.threads}{Color.RESET}")

    results: list[HostResult] = []

    async with AsyncProber(timeout=args.timeout) as prober:
        semaphore = asyncio.Semaphore(args.threads)

        async def bounded_analyze(ip: str, mac: str = "") -> HostResult:
            async with semaphore:
                return await analyze_host(prober, ip, mac)

        tasks = [bounded_analyze(t) for t in targets]
        for coro in asyncio.as_completed(tasks):
            result = await coro
            results.append(result)

            status_color = {
                VulnStatus.VULNERABLE: Color.FAIL,
                VulnStatus.LIKELY: Color.WARNING,
                VulnStatus.POSSIBLE: Color.OKCYAN,
                VulnStatus.NOT_VULNERABLE: Color.OKGREEN,
                VulnStatus.UNKNOWN: Color.OKMAGENTA,
            }.get(result.status, Color.RESET)

            print(f"\n{Color.BOLD}Target: {result.ip}{Color.RESET}  MAC: {result.mac or 'N/A'}  Hostname: {result.hostname or 'N/A'}")
            print(f"Status: {status_color}{result.status.name}{Color.RESET} (confidence: {result.confidence}%)")
            for cve, match in result.cve_matches.items():
                if match:
                    print(f"  {Color.FAIL}>>> {cve} CONFIRMED{Color.RESET}")
            for finding in result.findings[:5]:
                print(f"  → {finding}")

    if args.output:
        Path(args.output).write_text(json.dumps([r.to_dict() for r in results], indent=2))
        print(f"\n{Color.OKGREEN}[*] Results written to {args.output}{Color.RESET}")


def main() -> None:
    try:
        asyncio.run(main_async())
    except KeyboardInterrupt:
        print(f"\n{Color.WARNING}[!] Interrupted by user{Color.RESET}")
        sys.exit(130)


if __name__ == "__main__":
    main()
