#!/usr/bin/env python3
"""
Langflow Multi-CVE Reconnaissance Scanner
Targets: CVE-2026-7524 (Path Traversal), CVE-2026-7700 (Lambda eval), CVE-2026-7687 (CodeParser)
Military-grade async scanner with vulnerability fingerprinting and exploitability scoring.

Author: Advanced Persistent Security Research
"""

from __future__ import annotations

import argparse
import asyncio
import ipaddress
import json
import re
import socket
import sys
import time
from collections.abc import Sequence
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from enum import IntEnum, auto
from pathlib import Path
from typing import Any

import aiohttp
from colorama import Fore, Style, init

init(autoreset=True)


# ---------------------------------------------------------------------------
# Constants & Configuration
# ---------------------------------------------------------------------------

DEFAULT_TIMEOUT = 10
CONCURRENCY = 80
WEB_PORT = 7860
WEB_SSL_PORT = 7860

# Langflow endpoints
LANGFLOW_ENDPOINTS = {
    "health": "/health",
    "api_flows": "/api/v1/flows/",
    "api_components": "/api/v1/components/",
    "api_upload": "/api/v1/upload/",
    "api_process": "/api/v1/process/",
    "lambda_filter": "/api/v1/components/LambdaFilterComponent",
    "code_parser": "/api/v1/components/CodeParser",
    "archive_upload": "/api/v1/upload/archive",
    "docling": "/api/v1/docling/",
    "nvidia_retriever": "/api/v1/nvidia/",
    "video_file": "/api/v1/video/",
    "unstructured": "/api/v1/unstructured/",
    "read_file": "/api/v1/read-file/",
}

# Vulnerable versions
VULN_VERSIONS = {
    "CVE-2026-7524": ("1.0.0", "1.9.1"),
    "CVE-2026-7700": ("1.0.0", "1.8.4"),
    "CVE-2026-7687": ("1.0.0", "1.8.4"),
}

class Color:
    OKGREEN = Fore.GREEN + Style.BRIGHT
    OKBLUE = Fore.BLUE + Style.BRIGHT
    OKCYAN = Fore.CYAN + Style.BRIGHT
    OKMAGENTA = Fore.MAGENTA + Style.BRIGHT
    WARNING = Fore.YELLOW + Style.BRIGHT
    FAIL = Fore.RED + Style.BRIGHT
    RESET = Style.RESET_ALL
    BOLD = Style.BRIGHT


class VulnStatus(IntEnum):
    VULNERABLE = auto()
    LIKELY = auto()
    POSSIBLE = auto()
    NOT_VULNERABLE = auto()
    UNKNOWN = auto()


@dataclass(frozen=True, slots=True)
class ServiceProbe:
    port: int
    protocol: str
    status: str
    banner: str = ""
    response_time_ms: float = 0.0
    version: str = ""
    vuln_indicators: dict[str, Any] = field(default_factory=dict)


@dataclass(frozen=True, slots=True)
class HostResult:
    target: str
    ip: str
    hostname: str = ""
    status: VulnStatus = VulnStatus.UNKNOWN
    confidence: int = 0
    findings: list[str] = field(default_factory=list)
    services: list[ServiceProbe] = field(default_factory=list)
    cve_matches: dict[str, bool] = field(default_factory=dict)
    version: str = ""
    recommendations: list[str] = field(default_factory=list)

    def to_dict(self) -> dict[str, Any]:
        return {
            "target": self.target,
            "ip": self.ip,
            "hostname": self.hostname,
            "status": self.status.name,
            "confidence": self.confidence,
            "findings": self.findings,
            "services": [asdict(s) for s in self.services],
            "cve_matches": self.cve_matches,
            "version": self.version,
            "recommendations": self.recommendations,
        }


# ---------------------------------------------------------------------------
# Async Probing Engine
# ---------------------------------------------------------------------------

class AsyncProber:
    def __init__(self, timeout: int = DEFAULT_TIMEOUT):
        self.timeout = aiohttp.ClientTimeout(total=timeout, connect=5)
        self._session: aiohttp.ClientSession | None = None

    async def __aenter__(self) -> "AsyncProber":
        conn = aiohttp.TCPConnector(
            limit=CONCURRENCY,
            limit_per_host=CONCURRENCY // 2,
            ssl=False,
            enable_cleanup_closed=True,
        )
        self._session = aiohttp.ClientSession(connector=conn, timeout=self.timeout)
        return self

    async def __aexit__(self, *args: object) -> None:
        if self._session:
            await self._session.close()

    async def probe_http(self, ip: str, port: int, path: str = "/", ssl: bool = False,
                         method: str = "GET", data: dict | bytes | None = None,
                         headers: dict | None = None) -> ServiceProbe:
        start = time.perf_counter()
        scheme = "https" if ssl else "http"
        url = f"{scheme}://{ip}:{port}{path}"
        try:
            assert self._session is not None
            req_headers = {"User-Agent": "Mozilla/5.0 (Langflow Scanner)"}
            if headers:
                req_headers.update(headers)
            
            if method == "POST":
                if isinstance(data, bytes):
                    async with self._session.post(url, data=data, headers=req_headers, 
                                                  ssl=False, allow_redirects=True) as resp:
                        body = await resp.text()
                        elapsed = (time.perf_counter() - start) * 1000
                        return ServiceProbe(port=port, protocol="https" if ssl else "http",
                                            status=str(resp.status), banner=f"{resp.status} {len(body)} bytes",
                                            response_time_ms=elapsed)
                else:
                    async with self._session.post(url, json=data, headers=req_headers,
                                                  ssl=False, allow_redirects=True) as resp:
                        body = await resp.text()
                        elapsed = (time.perf_counter() - start) * 1000
                        return ServiceProbe(port=port, protocol="https" if ssl else "http",
                                            status=str(resp.status), banner=f"{resp.status} {len(body)} bytes",
                                            response_time_ms=elapsed)
            else:
                async with self._session.get(url, ssl=False, allow_redirects=True) as resp:
                    body = await resp.text()
                    elapsed = (time.perf_counter() - start) * 1000
                    return ServiceProbe(port=port, protocol="https" if ssl else "http",
                                        status=str(resp.status), banner=f"{resp.status} {len(body)} bytes",
                                        response_time_ms=elapsed)
        except Exception as exc:
            elapsed = (time.perf_counter() - start) * 1000
            return ServiceProbe(port=port, protocol="https" if ssl else "http",
                                status="error", banner=str(exc), response_time_ms=elapsed)


# ---------------------------------------------------------------------------
# Vulnerability Assessment
# ---------------------------------------------------------------------------

def extract_version(body: str, headers: dict) -> str:
    # Look for Langflow version in response
    patterns = [
        r"Langflow[/\s]([\d\.]+)",
        r"\"version\":\s*\"([\d\.]+)\"",
        r"langflow[_-]version[\"':\s]+([\d\.]+)",
        r"LangFlow/([\d\.]+)",
    ]
    for pat in patterns:
        m = re.search(pat, body, re.IGNORECASE)
        if m:
            return m.group(1)
    return ""


def version_in_range(version: str, min_v: str, max_v: str) -> bool:
    try:
        v_parts = [int(x) for x in version.split(".")]
        min_parts = [int(x) for x in min_v.split(".")]
        max_parts = [int(x) for x in max_v.split(".")]
        
        # Pad to same length
        max_len = max(len(v_parts), len(min_parts), len(max_parts))
        v_parts += [0] * (max_len - len(v_parts))
        min_parts += [0] * (max_len - len(min_parts))
        max_parts += [0] * (max_len - len(max_parts))
        
        return min_parts <= v_parts <= max_parts
    except:
        return False


def assess_langflow_vulns(result: HostResult) -> tuple[VulnStatus, int, dict[str, bool]]:
    score = 0
    findings: list[str] = []
    cve_matches = {"CVE-2026-7524": False, "CVE-2026-7700": False, "CVE-2026-7687": False}

    for svc in result.services:
        banner_lower = svc.banner.lower()

        # Langflow identification
        if "langflow" in banner_lower or "langflow" in str(svc.vuln_indicators).lower():
            score += 10
            findings.append(f"Langflow identified: {svc.banner[:100]}")

        # Version check
        if svc.version:
            v = svc.version
            for cve_id, (min_v, max_v) in VULN_VERSIONS.items():
                if version_in_range(v, min_v, max_v):
                    score += 20
                    findings.append(f"Version {v} in vulnerable range for {cve_id} ({min_v}-{max_v})")

        # Endpoint-specific checks
        for cve_id, indicator in [
            ("CVE-2026-7524", "archive"),
            ("CVE-2026-7524", "docling"),
            ("CVE-2026-7524", "nvidia"),
            ("CVE-2026-7524", "video"),
            ("CVE-2026-7524", "unstructured"),
            ("CVE-2026-7524", "read-file"),
            ("CVE-2026-7700", "lambda_filter"),
            ("CVE-2026-7687", "code_parser"),
        ]:
            if indicator in str(svc.vuln_indicators).lower() or indicator in svc.banner.lower():
                score += 15
                findings.append(f"{cve_id} attack surface detected: {indicator}")

    if score >= 70:
        return VulnStatus.VULNERABLE, min(score, 100), cve_matches
    elif score >= 45:
        return VulnStatus.LIKELY, min(score, 100), cve_matches
    elif score >= 25:
        return VulnStatus.POSSIBLE, min(score, 100), cve_matches
    elif score > 0:
        return VulnStatus.NOT_VULNERABLE, score, cve_matches
    return VulnStatus.UNKNOWN, score, cve_matches


# ---------------------------------------------------------------------------
# Host Analysis Pipeline
# ---------------------------------------------------------------------------

async def analyze_host(prober: AsyncProber, ip: str) -> HostResult:
    services: list[ServiceProbe] = []

    # Probe main health endpoint
    health = await prober.probe_http(ip, WEB_PORT, "/health")
    services.append(health)

    # Check for Langflow version
    version = extract_version(health.banner, {})
    
    # Probe all known endpoints
    for name, path in LANGFLOW_ENDPOINTS.items():
        probe = await prober.probe_http(ip, WEB_PORT, path)
        probe.vuln_indicators["endpoint"] = name
        probe.version = version
        services.append(probe)

    # Hostname resolution
    hostname = ""
    try:
        hostname = socket.gethostbyaddr(ip)[0]
    except Exception:
        pass

    result = HostResult(target=ip, ip=ip, hostname=hostname, services=services, version=version)
    status, confidence, cve_matches = assess_langflow_vulns(result)
    result.status = status
    result.confidence = confidence
    result.cve_matches = cve_matches
    result.findings = [f for svc in services if svc.status in ("200",) for f in [f"{svc.protocol}/{svc.port}: {svc.vuln_indicators.get('endpoint', 'unknown')}"]]

    # Recommendations
    if status in (VulnStatus.VULNERABLE, VulnStatus.LIKELY):
        result.recommendations.append("IMMEDIATE: Update Langflow to latest version (>= 1.9.2)")
        result.recommendations.append("Restrict Langflow API to internal network only")
        result.recommendations.append("Disable file upload/archive processing if not required")
        result.recommendations.append("Implement WAF rules for archive upload endpoints")
        if cve_matches.get("CVE-2026-7524"):
            result.recommendations.append("CVE-2026-7524: Disable archive extraction components")
        if cve_matches.get("CVE-2026-7700"):
            result.recommendations.append("CVE-2026-7700: Disable LambdaFilterComponent")
        if cve_matches.get("CVE-2026-7687"):
            result.recommendations.append("CVE-2026-7687: Disable CodeParser component")

    return result


# ---------------------------------------------------------------------------
# CLI / Main
# ---------------------------------------------------------------------------

def print_banner() -> None:
    banner = f"""
{Color.FAIL}
╔══════════════════════════════════════════════════════════════════════════════╗
║  Langflow Multi-CVE Scanner (CVE-2026-7524, CVE-2026-7700, CVE-2026-7687)    ║
║  Path Traversal | Lambda eval() Code Injection | CodeParser Command Injection  ║
║  Military-Grade Async Discovery & Exploitability Assessment                    ║
╚══════════════════════════════════════════════════════════════════════════════╝
{Color.RESET}"""
    print(banner)


def parse_args() -> argparse.Namespace:
    p = argparse.ArgumentParser(
        description="Langflow Multi-CVE Scanner",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  python scanner.py -n 10.0.0.0/24
  python scanner.py -t 10.0.0.50 -o results.json
  python scanner.py -f targets.txt --threads 200
        """,
    )
    p.add_argument("-n", "--network", help="CIDR network range (e.g. 10.0.0.0/24)")
    p.add_argument("-t", "--target", help="Single target IP")
    p.add_argument("-f", "--file", help="File with target IPs (one per line)")
    p.add_argument("-o", "--output", help="JSON output file")
    p.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT, help=f"Timeout per probe (default {DEFAULT_TIMEOUT}s)")
    p.add_argument("--threads", type=int, default=CONCURRENCY, help=f"Concurrent probes (default {CONCURRENCY})")
    return p.parse_args()


def load_targets(args: argparse.Namespace) -> list[str]:
    targets: list[str] = []
    if args.target:
        targets.append(args.target)
    if args.file:
        with open(args.file, encoding="utf-8") as fh:
            for line in fh:
                line = line.strip()
                if line and not line.startswith("#"):
                    targets.append(line)
    if args.network:
        try:
            net = ipaddress.ip_network(args.network, strict=False)
            targets.extend([str(h) for h in net.hosts()])
        except ValueError as exc:
            print(f"{Color.FAIL}[!] Invalid CIDR: {exc}{Color.RESET}")
            sys.exit(1)
    return targets


async def main_async() -> None:
    args = parse_args()
    print_banner()

    targets = load_targets(args)
    if not targets:
        print(f"{Color.WARNING}[!] No targets specified. Use -n, -t, or -f.{Color.RESET}")
        sys.exit(1)

    print(f"{Color.OKBLUE}[*] Loaded {len(targets)} target(s){Color.RESET}")
    print(f"{Color.OKBLUE}[*] Timeout: {args.timeout}s | Concurrency: {args.threads}{Color.RESET}")

    results: list[HostResult] = []

    async with AsyncProber(timeout=args.timeout) as prober:
        semaphore = asyncio.Semaphore(args.threads)

        async def bounded_analyze(ip: str) -> HostResult:
            async with semaphore:
                return await analyze_host(prober, ip)

        tasks = [bounded_analyze(t) for t in targets]
        for coro in asyncio.as_completed(tasks):
            result = await coro
            results.append(result)

            status_color = {
                VulnStatus.VULNERABLE: Color.FAIL,
                VulnStatus.LIKELY: Color.WARNING,
                VulnStatus.POSSIBLE: Color.OKCYAN,
                VulnStatus.NOT_VULNERABLE: Color.OKGREEN,
                VulnStatus.UNKNOWN: Color.OKMAGENTA,
            }.get(result.status, Color.RESET)

            print(f"\n{Color.BOLD}Target: {result.ip}{Color.RESET}  Hostname: {result.hostname or 'N/A'}  Version: {result.version or 'N/A'}")
            print(f"Status: {status_color}{result.status.name}{Color.RESET} (confidence: {result.confidence}%)")
            for cve, match in result.cve_matches.items():
                if match:
                    print(f"  {Color.FAIL}>>> {cve} CONFIRMED{Color.RESET}")
            for finding in result.findings[:5]:
                print(f"  → {finding}")

    if args.output:
        Path(args.output).write_text(json.dumps([r.to_dict() for r in results], indent=2))
        print(f"\n{Color.OKGREEN}[*] Results written to {args.output}{Color.RESET}")


def main() -> None:
    try:
        asyncio.run(main_async())
    except KeyboardInterrupt:
        print(f"\n{Color.WARNING}[!] Interrupted by user{Color.RESET}")
        sys.exit(130)


if __name__ == "__main__":
    main()
