diff --git a/mcp/README.md b/mcp/README.md new file mode 100644 index 0000000..493addb --- /dev/null +++ b/mcp/README.md @@ -0,0 +1,33 @@ +# Infracloud MCP + +MCP server for the actual `infracloud` repository layout. + +## Scope + +- Reads `vps/*/services_inventory.md` and `k3s/services_inventory.md` +- Lists and reads `containers/*.container` and `containers/*.service` +- Lists scripts in `scripts/auto-organized` +- Executes only scripts classified as read-only diagnostics +- Provides repo grep and safe document reads + +## Install + +```powershell +pip install -r C:\dev\infracloud\mcp\requirements.txt +``` + +## Run + +```powershell +python C:\dev\infracloud\mcp\server.py +``` + +## Claude Desktop + +Use `claude_desktop_config.infracloud.json` as a base and merge it into your Claude Desktop MCP config. + +## Notes + +- This repo currently uses `scripts/auto-organized`, not `dev-scripts`. +- This repo currently does not have `docs/openproject`. +- `AGENT.md` includes secrets in plaintext. The MCP server does not expose that file. diff --git a/mcp/__pycache__/server.cpython-312.pyc b/mcp/__pycache__/server.cpython-312.pyc new file mode 100644 index 0000000..21446a2 Binary files /dev/null and b/mcp/__pycache__/server.cpython-312.pyc differ diff --git a/mcp/claude_desktop_config.infracloud.json b/mcp/claude_desktop_config.infracloud.json new file mode 100644 index 0000000..fd32528 --- /dev/null +++ b/mcp/claude_desktop_config.infracloud.json @@ -0,0 +1,11 @@ +{ + "mcpServers": { + "infracloud-sustentacao": { + "command": "python", + "args": [ + "C:\\dev\\infracloud\\mcp\\server.py" + ], + "cwd": "C:\\dev\\infracloud" + } + } +} diff --git a/mcp/requirements.txt b/mcp/requirements.txt new file mode 100644 index 0000000..a0b21a7 --- /dev/null +++ b/mcp/requirements.txt @@ -0,0 +1 @@ +mcp>=1.6.0 diff --git a/mcp/server.py b/mcp/server.py new file mode 100644 index 0000000..34da03a --- /dev/null +++ b/mcp/server.py @@ -0,0 +1,394 @@ +from __future__ import annotations + +import subprocess +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +from mcp.server.fastmcp import FastMCP + + +REPO_ROOT = Path(__file__).resolve().parents[1] +SCRIPTS_ROOT = REPO_ROOT / "scripts" / "auto-organized" +VPS_ROOT = REPO_ROOT / "vps" +CONTAINERS_ROOT = REPO_ROOT / "containers" +DATABASES_ROOT = REPO_ROOT / "databases" +K3S_ROOT = REPO_ROOT / "k3s" +DOC_ALLOWLIST = ( + REPO_ROOT / "README.md", + VPS_ROOT, + CONTAINERS_ROOT, + DATABASES_ROOT, + K3S_ROOT, +) + +READ_ONLY_SCRIPT_PREFIXES = ( + "check_", + "fetch_", + "get_", + "inspect_", + "verify_", + "final_status", + "watch_", +) +MUTATING_SCRIPT_PREFIXES = ( + "approve_", + "complete_", + "fix_", + "merge_", + "retrigger_", + "revert_", +) + +mcp = FastMCP( + "infracloud-sustentacao", + instructions=( + "Use the real infracloud repository as the source of truth. " + "Prefer inventory markdown, container unit files, and existing scripts. " + "Do not assume paths like dev-scripts or docs/openproject if they do not exist." + ), +) + + +@dataclass(frozen=True) +class ScriptInfo: + path: Path + relative_path: str + is_read_only: bool + kind: str + + +def _ensure_in_repo(path: Path) -> Path: + resolved = path.resolve() + if REPO_ROOT not in resolved.parents and resolved != REPO_ROOT: + raise ValueError(f"path escapes repository root: {path}") + return resolved + + +def _script_kind(name: str) -> str: + lower = name.lower() + if lower.endswith(".ps1"): + return "powershell" + if lower.endswith(".sh"): + return "shell" + return "other" + + +def _is_read_only_script(name: str) -> bool: + lower = name.lower() + if lower.endswith((".json", ".yaml", ".yml", ".txt", ".pem")): + return False + if lower.startswith(MUTATING_SCRIPT_PREFIXES): + return False + return lower.startswith(READ_ONLY_SCRIPT_PREFIXES) + + +def _list_scripts() -> list[ScriptInfo]: + if not SCRIPTS_ROOT.exists(): + return [] + + results: list[ScriptInfo] = [] + for path in sorted(SCRIPTS_ROOT.rglob("*")): + if not path.is_file(): + continue + relative = path.relative_to(REPO_ROOT).as_posix() + results.append( + ScriptInfo( + path=path, + relative_path=relative, + is_read_only=_is_read_only_script(path.name), + kind=_script_kind(path.name), + ) + ) + return results + + +def _resolve_script(script_name: str) -> ScriptInfo: + script_name = script_name.replace("\\", "/").strip() + candidates = _list_scripts() + + exact = [item for item in candidates if item.relative_path == script_name or item.path.name == script_name] + if len(exact) == 1: + return exact[0] + if len(exact) > 1: + raise ValueError(f"multiple scripts matched '{script_name}', use a repo-relative path") + + fuzzy = [item for item in candidates if script_name.lower() in item.relative_path.lower()] + if len(fuzzy) == 1: + return fuzzy[0] + if len(fuzzy) > 1: + names = ", ".join(item.relative_path for item in fuzzy[:10]) + raise ValueError(f"multiple scripts matched '{script_name}': {names}") + + raise ValueError(f"script not found: {script_name}") + + +def _read_text(path: Path, max_chars: int = 20000) -> str: + resolved = _ensure_in_repo(path) + text = resolved.read_text(encoding="utf-8", errors="replace") + if len(text) > max_chars: + return text[:max_chars] + "\n... [truncated]" + return text + + +def _parse_markdown_table(lines: list[str], start_index: int) -> tuple[list[dict[str, str]], int]: + header_line = lines[start_index].strip() + separator_index = start_index + 1 + if separator_index >= len(lines): + return [], start_index + 1 + + separator_line = lines[separator_index].strip() + if "|" not in header_line or "|" not in separator_line: + return [], start_index + 1 + + headers = [part.strip(" `") for part in header_line.strip("|").split("|")] + rows: list[dict[str, str]] = [] + index = start_index + 2 + while index < len(lines): + line = lines[index].rstrip() + if "|" not in line or not line.strip().startswith("|"): + break + values = [part.strip() for part in line.strip().strip("|").split("|")] + if len(values) == len(headers): + rows.append(dict(zip(headers, values))) + index += 1 + + return rows, index + + +def _parse_inventory_file(path: Path) -> dict[str, Any]: + lines = _read_text(path, max_chars=120000).splitlines() + parsed: dict[str, Any] = {"file": path.relative_to(REPO_ROOT).as_posix(), "sections": {}} + current_section = "root" + parsed["sections"][current_section] = {"tables": [], "paragraphs": []} + + index = 0 + while index < len(lines): + line = lines[index].rstrip() + if line.startswith("#"): + current_section = line.lstrip("#").strip() + parsed["sections"].setdefault(current_section, {"tables": [], "paragraphs": []}) + index += 1 + continue + + if line.strip().startswith("|"): + rows, next_index = _parse_markdown_table(lines, index) + if rows: + parsed["sections"][current_section]["tables"].append(rows) + index = next_index + continue + + if line.strip(): + parsed["sections"][current_section]["paragraphs"].append(line.strip()) + index += 1 + + return parsed + + +def _iter_inventory_files() -> list[Path]: + return sorted(VPS_ROOT.rglob("services_inventory.md")) + sorted(K3S_ROOT.rglob("services_inventory.md")) + + +def _match_service(query: str, row: dict[str, str]) -> bool: + haystack = " ".join(str(value) for value in row.values()).lower() + return query.lower() in haystack + + +def _safe_doc_path(relative_path: str) -> Path: + relative = Path(relative_path) + candidate = _ensure_in_repo(REPO_ROOT / relative) + for allowed in DOC_ALLOWLIST: + allowed_resolved = allowed.resolve() + if candidate == allowed_resolved or allowed_resolved in candidate.parents: + return candidate + raise ValueError(f"path not allowed: {relative_path}") + + +@mcp.tool( + description="List scripts available in scripts/auto-organized, including whether each one is safe for read-only execution.", +) +def list_repo_scripts(name_filter: str | None = None) -> list[dict[str, Any]]: + scripts = _list_scripts() + if name_filter: + scripts = [item for item in scripts if name_filter.lower() in item.relative_path.lower()] + + return [ + { + "name": item.path.name, + "relative_path": item.relative_path, + "kind": item.kind, + "read_only": item.is_read_only, + } + for item in scripts + ] + + +@mcp.tool( + description="Run an existing repo script from scripts/auto-organized. Only read-only diagnostic scripts are executable.", +) +def run_repo_script(script_name: str, args: list[str] | None = None, timeout_seconds: int = 60) -> dict[str, Any]: + script = _resolve_script(script_name) + if not script.is_read_only: + raise ValueError( + f"script '{script.relative_path}' is not classified as read-only and cannot be executed by this tool" + ) + + args = args or [] + if script.kind == "powershell": + command = [ + "powershell", + "-NoProfile", + "-ExecutionPolicy", + "Bypass", + "-File", + str(script.path), + *args, + ] + elif script.kind == "shell": + command = ["bash", str(script.path), *args] + else: + raise ValueError(f"unsupported script type: {script.kind}") + + completed = subprocess.run( + command, + cwd=REPO_ROOT, + capture_output=True, + text=True, + timeout=timeout_seconds, + check=False, + ) + return { + "script": script.relative_path, + "exit_code": completed.returncode, + "stdout": completed.stdout[-12000:], + "stderr": completed.stderr[-12000:], + } + + +@mcp.tool( + description="Parse one services_inventory.md file into structured JSON. Server examples: redbull, vim, nc1, k3s.", +) +def read_services_inventory(server: str) -> dict[str, Any]: + inventory_files = {path.parent.name.lower(): path for path in _iter_inventory_files()} + server_key = server.lower().strip() + if server_key not in inventory_files: + raise ValueError(f"inventory not found for '{server}'. Available: {', '.join(sorted(inventory_files))}") + return _parse_inventory_file(inventory_files[server_key]) + + +@mcp.tool( + description="Search all inventory files for an app, UUID, domain, server, or other service text.", +) +def find_service(query: str) -> list[dict[str, Any]]: + matches: list[dict[str, Any]] = [] + for inventory_path in _iter_inventory_files(): + parsed = _parse_inventory_file(inventory_path) + for section_name, section in parsed["sections"].items(): + for table in section["tables"]: + for row in table: + if _match_service(query, row): + matches.append( + { + "inventory": parsed["file"], + "section": section_name, + "row": row, + } + ) + return matches + + +@mcp.tool( + description="List Podman/Systemd unit files under containers/ and optionally filter by app name.", +) +def list_container_units(name_filter: str | None = None) -> list[dict[str, str]]: + results: list[dict[str, str]] = [] + for path in sorted(CONTAINERS_ROOT.iterdir()): + if not path.is_file(): + continue + if path.suffix not in {".container", ".service"}: + continue + relative = path.relative_to(REPO_ROOT).as_posix() + if name_filter and name_filter.lower() not in relative.lower(): + continue + results.append({"name": path.name, "relative_path": relative, "kind": path.suffix.lstrip(".")}) + return results + + +@mcp.tool( + description="Read a container unit file from containers/ for Podman/Systemd runtime analysis.", +) +def read_container_unit(name: str) -> dict[str, str]: + candidates = [ + path + for path in CONTAINERS_ROOT.iterdir() + if path.is_file() and path.suffix in {".container", ".service"} and (path.name == name or name.lower() in path.name.lower()) + ] + if not candidates: + raise ValueError(f"container unit not found: {name}") + if len(candidates) > 1: + names = ", ".join(path.name for path in candidates) + raise ValueError(f"multiple container units matched '{name}': {names}") + + path = candidates[0] + return { + "relative_path": path.relative_to(REPO_ROOT).as_posix(), + "content": _read_text(path, max_chars=16000), + } + + +@mcp.tool( + description="Read a repo document from README, vps, databases, k3s, or containers paths.", +) +def read_repo_document(relative_path: str, max_chars: int = 12000) -> dict[str, str]: + path = _safe_doc_path(relative_path) + return { + "relative_path": path.relative_to(REPO_ROOT).as_posix(), + "content": _read_text(path, max_chars=max_chars), + } + + +@mcp.tool( + description="Search the repo for infrastructure terms such as app names, domains, env keys, or container names.", +) +def grep_repo(query: str, glob: str | None = None) -> dict[str, Any]: + command = ["rg", "-n", query, str(REPO_ROOT)] + if glob: + command.extend(["-g", glob]) + + completed = subprocess.run( + command, + cwd=REPO_ROOT, + capture_output=True, + text=True, + timeout=30, + check=False, + ) + results = completed.stdout.splitlines() + return { + "exit_code": completed.returncode, + "matches": results[:200], + "truncated": len(results) > 200, + "stderr": completed.stderr[-4000:], + } + + +@mcp.tool( + description="Return a compact summary of the actual infracloud repo layout so agents do not assume missing folders like dev-scripts or docs/openproject.", +) +def repo_layout_summary() -> dict[str, Any]: + return { + "repo_root": str(REPO_ROOT), + "present_top_level_dirs": sorted(path.name for path in REPO_ROOT.iterdir() if path.is_dir()), + "scripts_root": SCRIPTS_ROOT.relative_to(REPO_ROOT).as_posix() if SCRIPTS_ROOT.exists() else None, + "inventory_files": [path.relative_to(REPO_ROOT).as_posix() for path in _iter_inventory_files()], + "container_units": [path.name for path in CONTAINERS_ROOT.iterdir() if path.is_file() and path.suffix in {".container", ".service"}], + "notes": [ + "The repo uses scripts/auto-organized instead of dev-scripts.", + "The repo does not currently include docs/openproject.", + "AGENT.md contains secrets and should not be used as a runtime configuration source.", + ], + } + + +if __name__ == "__main__": + mcp.run()