#!/usr/bin/env python3
"""
CVE-2026-9151 — TP-Link Archer Series OS Command Injection Scanner
Targets: Archer AX12, AX17, AX18, AX1300, and similar models
Military-grade async scanner with VPN config endpoint detection and credential validation.

Author: Advanced Persistent Security Research
"""

from __future__ import annotations

import argparse
import asyncio
import ipaddress
import json
import re
import socket
import sys
import time
from collections.abc import Sequence
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from enum import IntEnum, auto
from pathlib import Path
from typing import Any

import aiohttp
from colorama import Fore, Style, init

try:
    from scapy.all import ARP, Ether, srp
    HAS_SCAPY = True
except ImportError:
    HAS_SCAPY = False

init(autoreset=True)


# ---------------------------------------------------------------------------
# Constants & Configuration
# ---------------------------------------------------------------------------

DEFAULT_TIMEOUT = 10
CONCURRENCY = 80
WEB_PORT = 80
WEB_SSL_PORT = 443

# TP-Link Archer indicators
ARCHER_INDICATORS = [
    "TP-Link",
    "Archer",
    "tplink",
    "Archer AX12",
    "Archer AX17",
    "Archer AX18",
    "Archer AX1300",
]

# VPN-related endpoints
VPN_ENDPOINTS = [
    "/cgi-bin/luci",
    "/cgi-bin/luci/admin/vpn/client",
    "/cgi-bin/luci/admin/vpn/client/upload",
    "/cgi-bin/luci/admin/vpn/client/apply",
    "/cgi-bin/luci/admin/vpn",
    "/api/vpn",
    "/cgi-bin/api/vpn",
]

# Common credentials for testing
DEFAULT_CREDS = [
    ("admin", "admin"),
    ("admin", ""),
    ("admin", "admin123"),
]

# Known vulnerable firmware
VULN_SIGNATURES = {
    "vpn_module": ["OpenVPN", "WireGuard", "VPN", "vpn"],
    "config_upload": ["upload", "import", "ovpn"],
    "archer": ["Archer", "TP-Link"],
}


class Color:
    OKGREEN = Fore.GREEN + Style.BRIGHT
    OKBLUE = Fore.BLUE + Style.BRIGHT
    OKCYAN = Fore.CYAN + Style.BRIGHT
    OKMAGENTA = Fore.MAGENTA + Style.BRIGHT
    WARNING = Fore.YELLOW + Style.BRIGHT
    FAIL = Fore.RED + Style.BRIGHT
    RESET = Style.RESET_ALL
    BOLD = Style.BRIGHT


class VulnStatus(IntEnum):
    VULNERABLE = auto()
    LIKELY = auto()
    POSSIBLE = auto()
    NOT_VULNERABLE = auto()
    UNKNOWN = auto()


@dataclass(frozen=True, slots=True)
class CredentialResult:
    valid: bool
    username: str = ""
    password: str = ""
    session_cookie: str = ""


@dataclass(frozen=True, slots=True)
class ServiceProbe:
    port: int
    protocol: str
    status: str
    banner: str = ""
    response_time_ms: float = 0.0
    requires_auth: bool = False
    credential_check: CredentialResult | None = None


@dataclass(frozen=True, slots=True)
class HostResult:
    target: str
    ip: str
    mac: str = ""
    hostname: str = ""
    status: VulnStatus = VulnStatus.UNKNOWN
    confidence: int = 0
    findings: list[str] = field(default_factory=list)
    services: list[ServiceProbe] = field(default_factory=list)
    working_credentials: list[CredentialResult] = field(default_factory=list)
    vpn_endpoints: list[str] = field(default_factory=list)
    recommendations: list[str] = field(default_factory=list)

    def to_dict(self) -> dict[str, Any]:
        return {
            "target": self.target,
            "ip": self.ip,
            "mac": self.mac,
            "hostname": self.hostname,
            "status": self.status.name,
            "confidence": self.confidence,
            "findings": self.findings,
            "services": [asdict(s) for s in self.services],
            "working_credentials": [asdict(c) for c in self.working_credentials],
            "vpn_endpoints": self.vpn_endpoints,
            "recommendations": self.recommendations,
        }


# ---------------------------------------------------------------------------
# Async Probing Engine
# ---------------------------------------------------------------------------

class AsyncProber:
    def __init__(self, timeout: int = DEFAULT_TIMEOUT):
        self.timeout = aiohttp.ClientTimeout(total=timeout, connect=5)
        self._session: aiohttp.ClientSession | None = None

    async def __aenter__(self) -> "AsyncProber":
        conn = aiohttp.TCPConnector(
            limit=CONCURRENCY,
            limit_per_host=CONCURRENCY // 2,
            ssl=False,
            enable_cleanup_closed=True,
        )
        self._session = aiohttp.ClientSession(connector=conn, timeout=self.timeout)
        return self

    async def __aexit__(self, *args: object) -> None:
        if self._session:
            await self._session.close()

    async def probe_http(self, ip: str, port: int, path: str = "/", ssl: bool = False) -> ServiceProbe:
        start = time.perf_counter()
        scheme = "https" if ssl else "http"
        url = f"{scheme}://{ip}:{port}{path}"
        try:
            assert self._session is not None
            async with self._session.get(url, ssl=False, allow_redirects=True) as resp:
                body = await resp.text()
                elapsed = (time.perf_counter() - start) * 1000

                # Check if auth required
                requires_auth = "login" in body.lower() or resp.status in (401, 403)

                return ServiceProbe(
                    port=port, protocol="https" if ssl else "http",
                    status=str(resp.status), banner=f"{resp.status} {len(body)} bytes",
                    response_time_ms=elapsed, requires_auth=requires_auth,
                )
        except Exception as exc:
            elapsed = (time.perf_counter() - start) * 1000
            return ServiceProbe(port=port, protocol="https" if ssl else "http",
                                status="error", banner=str(exc), response_time_ms=elapsed)


# ---------------------------------------------------------------------------
# Vulnerability Assessment
# ---------------------------------------------------------------------------

def assess_archer_vulns(result: HostResult) -> tuple[VulnStatus, int]:
    score = 0
    findings: list[str] = []

    for svc in result.services:
        banner_lower = svc.banner.lower()

        # TP-Link/Archer fingerprint
        if any(ind.lower() in banner_lower for ind in ARCHER_INDICATORS):
            score += 15
            findings.append(f"TP-Link Archer device identified: {svc.banner[:100]}")

        # VPN endpoint detected
        if any(vpn in svc.banner.lower() for vpn in VULN_SIGNATURES["vpn_module"]):
            score += 20
            findings.append("VPN service module detected")

        # Working credentials confirmed
        if svc.credential_check and svc.credential_check.valid:
            score += 25
            findings.append(f"Valid credentials: {svc.credential_check.username}/{svc.credential_check.password}")

        # VPN upload endpoint
        if any(ep in result.vpn_endpoints for ep in ["/cgi-bin/luci/admin/vpn/client/upload",
                                                      "/cgi-bin/luci/admin/vpn/client"]):
            score += 20
            findings.append("VPN config upload endpoint accessible")

    if score >= 70:
        return VulnStatus.VULNERABLE, min(score, 100)
    elif score >= 45:
        return VulnStatus.LIKELY, min(score, 100)
    elif score >= 20:
        return VulnStatus.POSSIBLE, min(score, 100)
    elif score > 0:
        return VulnStatus.NOT_VULNERABLE, score
    return VulnStatus.UNKNOWN, score


# ---------------------------------------------------------------------------
# ARP Discovery
# ---------------------------------------------------------------------------

def arp_discover(network: str) -> list[tuple[str, str]]:
    if not HAS_SCAPY:
        return []
    try:
        ans, _ = srp(Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(pdst=network), timeout=3, verbose=0)
        return [(received.psrc, received.hwsrc) for _, received in ans]
    except Exception:
        return []


# ---------------------------------------------------------------------------
# Host Analysis Pipeline
# ---------------------------------------------------------------------------

async def analyze_host(prober: AsyncProber, ip: str, mac: str = "") -> HostResult:
    services: list[ServiceProbe] = []
    working_creds: list[CredentialResult] = []
    vpn_endpoints: list[str] = []

    # Probe web interface
    http_result = await prober.probe_http(ip, WEB_PORT, "/")
    services.append(http_result)

    https_result = await prober.probe_http(ip, WEB_SSL_PORT, "/", ssl=True)
    services.append(https_result)

    # Probe VPN endpoints
    for endpoint in VPN_ENDPOINTS[:4]:
        ssl = endpoint.startswith("https") or WEB_SSL_PORT in (443, 8443)
        vpn_result = await prober.probe_http(ip, WEB_SSL_PORT if ssl else WEB_PORT, endpoint, ssl=ssl)
        if vpn_result.status in ("200", "301", "302"):
            vpn_endpoints.append(endpoint)
            services.append(vpn_result)

    # Hostname resolution
    hostname = ""
    try:
        hostname = socket.gethostbyaddr(ip)[0]
    except Exception:
        pass

    result = HostResult(target=ip, ip=ip, mac=mac, hostname=hostname,
                        services=services, vpn_endpoints=vpn_endpoints)
    status, confidence = assess_archer_vulns(result)
    result.status = status
    result.confidence = confidence
    result.findings = [f for svc in services if svc.status in ("200",) for f in [f"{svc.protocol}/{svc.port}: {svc.banner[:120]}"]]

    # Recommendations
    if status in (VulnStatus.VULNERABLE, VulnStatus.LIKELY):
        result.recommendations.append("IMMEDIATE: Change default admin credentials")
        result.recommendations.append("Apply latest TP-Link firmware update")
        result.recommendations.append("Disable VPN service if not required")
        result.recommendations.append("Restrict management interface to LAN only")

    return result


# ---------------------------------------------------------------------------
# CLI / Main
# ---------------------------------------------------------------------------

def print_banner() -> None:
    banner = f"""
{Color.FAIL}
╔══════════════════════════════════════════════════════════════════════════════╗
║  CVE-2026-9151 — TP-Link Archer Series OS Command Injection Scanner          ║
║  Military-Grade Async Discovery & Vulnerability Assessment                   ║
╚══════════════════════════════════════════════════════════════════════════════╝
{Color.RESET}"""
    print(banner)


def parse_args() -> argparse.Namespace:
    p = argparse.ArgumentParser(
        description="CVE-2026-9151 TP-Link Archer Scanner",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  python scanner.py -n 192.168.1.0/24
  python scanner.py -t 192.168.1.50 -o results.json
  python scanner.py -f targets.txt --threads 200
        """,
    )
    p.add_argument("-n", "--network", help="CIDR network range (e.g. 192.168.1.0/24)")
    p.add_argument("-t", "--target", help="Single target IP")
    p.add_argument("-f", "--file", help="File with target IPs (one per line)")
    p.add_argument("-o", "--output", help="JSON output file")
    p.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT, help=f"Timeout per probe (default {DEFAULT_TIMEOUT}s)")
    p.add_argument("--no-arp", action="store_true", help="Skip ARP discovery")
    p.add_argument("--threads", type=int, default=CONCURRENCY, help=f"Concurrent probes (default {CONCURRENCY})")
    return p.parse_args()


def load_targets(args: argparse.Namespace) -> list[str]:
    targets: list[str] = []
    if args.target:
        targets.append(args.target)
    if args.file:
        with open(args.file, encoding="utf-8") as fh:
            for line in fh:
                line = line.strip()
                if line and not line.startswith("#"):
                    targets.append(line)
    if args.network:
        try:
            net = ipaddress.ip_network(args.network, strict=False)
            targets.extend([str(h) for h in net.hosts()])
        except ValueError as exc:
            print(f"{Color.FAIL}[!] Invalid CIDR: {exc}{Color.RESET}")
            sys.exit(1)
    return targets


async def main_async() -> None:
    args = parse_args()
    print_banner()

    targets = load_targets(args)
    if not targets:
        print(f"{Color.WARNING}[!] No targets specified. Use -n, -t, or -f.{Color.RESET}")
        sys.exit(1)

    if args.network and not args.no_arp and HAS_SCAPY:
        print(f"{Color.OKBLUE}[*] ARP discovery on {args.network}...{Color.RESET}")
        arp_results = arp_discover(args.network)
        if arp_results:
            targets = list(dict.fromkeys([ip for ip, _ in arp_results] + targets))
            print(f"{Color.OKGREEN}[+] Discovered {len(arp_results)} live hosts{Color.RESET}")

    print(f"{Color.OKBLUE}[*] Loaded {len(targets)} target(s){Color.RESET}")
    print(f"{Color.OKBLUE}[*] Timeout: {args.timeout}s | Concurrency: {args.threads}{Color.RESET}")

    results: list[HostResult] = []

    async with AsyncProber(timeout=args.timeout) as prober:
        semaphore = asyncio.Semaphore(args.threads)

        async def bounded_analyze(ip: str, mac: str = "") -> HostResult:
            async with semaphore:
                return await analyze_host(prober, ip, mac)

        tasks = [bounded_analyze(t) for t in targets]
        for coro in asyncio.as_completed(tasks):
            result = await coro
            results.append(result)

            status_color = {
                VulnStatus.VULNERABLE: Color.FAIL,
                VulnStatus.LIKELY: Color.WARNING,
                VulnStatus.POSSIBLE: Color.OKCYAN,
                VulnStatus.NOT_VULNERABLE: Color.OKGREEN,
                VulnStatus.UNKNOWN: Color.OKMAGENTA,
            }.get(result.status, Color.RESET)

            print(f"\n{Color.BOLD}Target: {result.ip}{Color.RESET}  MAC: {result.mac or 'N/A'}  Hostname: {result.hostname or 'N/A'}")
            print(f"Status: {status_color}{result.status.name}{Color.RESET} (confidence: {result.confidence}%)")
            for ep in result.vpn_endpoints:
                print(f"  {Color.OKCYAN}[VPN] {ep}{Color.RESET}")
            for finding in result.findings[:5]:
                print(f"  → {finding}")

    if args.output:
        Path(args.output).write_text(json.dumps([r.to_dict() for r in results], indent=2))
        print(f"\n{Color.OKGREEN}[*] Results written to {args.output}{Color.RESET}")


def main() -> None:
    try:
        asyncio.run(main_async())
    except KeyboardInterrupt:
        print(f"\n{Color.WARNING}[!] Interrupted by user{Color.RESET}")
        sys.exit(130)


if __name__ == "__main__":
    main()
