from __future__ import annotations import os import subprocess from dataclasses import dataclass from pathlib import Path from typing import Any from mcp.server.fastmcp import FastMCP import psycopg from psycopg.rows import dict_row REPO_ROOT = Path(__file__).resolve().parents[1] SCRIPTS_ROOT = REPO_ROOT / "scripts" / "auto-organized" VPS_ROOT = REPO_ROOT / "vps" CONTAINERS_ROOT = REPO_ROOT / "containers" DATABASES_ROOT = REPO_ROOT / "databases" K3S_ROOT = REPO_ROOT / "k3s" POSTGRES_DSN_ENV = "INFRA_MCP_POSTGRES_DSN" DOC_ALLOWLIST = ( REPO_ROOT / "README.md", VPS_ROOT, CONTAINERS_ROOT, DATABASES_ROOT, K3S_ROOT, ) READ_ONLY_SCRIPT_PREFIXES = ( "check_", "fetch_", "get_", "inspect_", "verify_", "final_status", "watch_", ) MUTATING_SCRIPT_PREFIXES = ( "approve_", "complete_", "fix_", "merge_", "retrigger_", "revert_", ) mcp = FastMCP( "infracloud-sustentacao", instructions=( "Use the real infracloud repository as the source of truth. " "Prefer inventory markdown, container unit files, and existing scripts. " "Do not assume paths like dev-scripts or docs/openproject if they do not exist. " "If Postgres is configured, prefer the MCP Postgres helpers for server-side persistence and diagnostics." ), ) @dataclass(frozen=True) class ScriptInfo: path: Path relative_path: str is_read_only: bool kind: str def _postgres_dsn() -> str | None: value = os.getenv(POSTGRES_DSN_ENV, "").strip() return value or None def _postgres_enabled() -> bool: return _postgres_dsn() is not None def _get_pg_connection(): dsn = _postgres_dsn() if not dsn: raise ValueError(f"{POSTGRES_DSN_ENV} is not configured") return psycopg.connect(dsn, row_factory=dict_row) def _ensure_mcp_tables() -> None: if not _postgres_enabled(): return with _get_pg_connection() as conn: with conn.cursor() as cur: cur.execute( """ CREATE TABLE IF NOT EXISTS infra_mcp_notes ( id BIGSERIAL PRIMARY KEY, scope TEXT NOT NULL, title TEXT NOT NULL, body TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ) """ ) conn.commit() def _ensure_in_repo(path: Path) -> Path: resolved = path.resolve() if REPO_ROOT not in resolved.parents and resolved != REPO_ROOT: raise ValueError(f"path escapes repository root: {path}") return resolved def _script_kind(name: str) -> str: lower = name.lower() if lower.endswith(".ps1"): return "powershell" if lower.endswith(".sh"): return "shell" return "other" def _is_read_only_script(name: str) -> bool: lower = name.lower() if lower.endswith((".json", ".yaml", ".yml", ".txt", ".pem")): return False if lower.startswith(MUTATING_SCRIPT_PREFIXES): return False return lower.startswith(READ_ONLY_SCRIPT_PREFIXES) def _list_scripts() -> list[ScriptInfo]: if not SCRIPTS_ROOT.exists(): return [] results: list[ScriptInfo] = [] for path in sorted(SCRIPTS_ROOT.rglob("*")): if not path.is_file(): continue relative = path.relative_to(REPO_ROOT).as_posix() results.append( ScriptInfo( path=path, relative_path=relative, is_read_only=_is_read_only_script(path.name), kind=_script_kind(path.name), ) ) return results def _resolve_script(script_name: str) -> ScriptInfo: script_name = script_name.replace("\\", "/").strip() candidates = _list_scripts() exact = [item for item in candidates if item.relative_path == script_name or item.path.name == script_name] if len(exact) == 1: return exact[0] if len(exact) > 1: raise ValueError(f"multiple scripts matched '{script_name}', use a repo-relative path") fuzzy = [item for item in candidates if script_name.lower() in item.relative_path.lower()] if len(fuzzy) == 1: return fuzzy[0] if len(fuzzy) > 1: names = ", ".join(item.relative_path for item in fuzzy[:10]) raise ValueError(f"multiple scripts matched '{script_name}': {names}") raise ValueError(f"script not found: {script_name}") def _read_text(path: Path, max_chars: int = 20000) -> str: resolved = _ensure_in_repo(path) text = resolved.read_text(encoding="utf-8", errors="replace") if len(text) > max_chars: return text[:max_chars] + "\n... [truncated]" return text def _parse_markdown_table(lines: list[str], start_index: int) -> tuple[list[dict[str, str]], int]: header_line = lines[start_index].strip() separator_index = start_index + 1 if separator_index >= len(lines): return [], start_index + 1 separator_line = lines[separator_index].strip() if "|" not in header_line or "|" not in separator_line: return [], start_index + 1 headers = [part.strip(" `") for part in header_line.strip("|").split("|")] rows: list[dict[str, str]] = [] index = start_index + 2 while index < len(lines): line = lines[index].rstrip() if "|" not in line or not line.strip().startswith("|"): break values = [part.strip() for part in line.strip().strip("|").split("|")] if len(values) == len(headers): rows.append(dict(zip(headers, values))) index += 1 return rows, index def _parse_inventory_file(path: Path) -> dict[str, Any]: lines = _read_text(path, max_chars=120000).splitlines() parsed: dict[str, Any] = {"file": path.relative_to(REPO_ROOT).as_posix(), "sections": {}} current_section = "root" parsed["sections"][current_section] = {"tables": [], "paragraphs": []} index = 0 while index < len(lines): line = lines[index].rstrip() if line.startswith("#"): current_section = line.lstrip("#").strip() parsed["sections"].setdefault(current_section, {"tables": [], "paragraphs": []}) index += 1 continue if line.strip().startswith("|"): rows, next_index = _parse_markdown_table(lines, index) if rows: parsed["sections"][current_section]["tables"].append(rows) index = next_index continue if line.strip(): parsed["sections"][current_section]["paragraphs"].append(line.strip()) index += 1 return parsed def _iter_inventory_files() -> list[Path]: return sorted(VPS_ROOT.rglob("services_inventory.md")) + sorted(K3S_ROOT.rglob("services_inventory.md")) def _match_service(query: str, row: dict[str, str]) -> bool: haystack = " ".join(str(value) for value in row.values()).lower() return query.lower() in haystack def _safe_doc_path(relative_path: str) -> Path: relative = Path(relative_path) candidate = _ensure_in_repo(REPO_ROOT / relative) for allowed in DOC_ALLOWLIST: allowed_resolved = allowed.resolve() if candidate == allowed_resolved or allowed_resolved in candidate.parents: return candidate raise ValueError(f"path not allowed: {relative_path}") def _ensure_read_only_sql(sql: str) -> str: normalized = sql.strip().lstrip("(").strip().lower() if not normalized.startswith("select"): raise ValueError("only SELECT queries are allowed") forbidden = ("insert ", "update ", "delete ", "drop ", "alter ", "truncate ", "create ", "grant ", "revoke ") if any(token in normalized for token in forbidden): raise ValueError("query contains non-read-only statements") return sql @mcp.tool( description="List scripts available in scripts/auto-organized, including whether each one is safe for read-only execution.", ) def list_repo_scripts(name_filter: str | None = None) -> list[dict[str, Any]]: scripts = _list_scripts() if name_filter: scripts = [item for item in scripts if name_filter.lower() in item.relative_path.lower()] return [ { "name": item.path.name, "relative_path": item.relative_path, "kind": item.kind, "read_only": item.is_read_only, } for item in scripts ] @mcp.tool( description="Run an existing repo script from scripts/auto-organized. Only read-only diagnostic scripts are executable.", ) def run_repo_script(script_name: str, args: list[str] | None = None, timeout_seconds: int = 60) -> dict[str, Any]: script = _resolve_script(script_name) if not script.is_read_only: raise ValueError( f"script '{script.relative_path}' is not classified as read-only and cannot be executed by this tool" ) args = args or [] if script.kind == "powershell": command = [ "powershell", "-NoProfile", "-ExecutionPolicy", "Bypass", "-File", str(script.path), *args, ] elif script.kind == "shell": command = ["bash", str(script.path), *args] else: raise ValueError(f"unsupported script type: {script.kind}") completed = subprocess.run( command, cwd=REPO_ROOT, capture_output=True, text=True, timeout=timeout_seconds, check=False, ) return { "script": script.relative_path, "exit_code": completed.returncode, "stdout": completed.stdout[-12000:], "stderr": completed.stderr[-12000:], } @mcp.tool( description="Parse one services_inventory.md file into structured JSON. Server examples: redbull, vim, nc1, k3s.", ) def read_services_inventory(server: str) -> dict[str, Any]: inventory_files = {path.parent.name.lower(): path for path in _iter_inventory_files()} server_key = server.lower().strip() if server_key not in inventory_files: raise ValueError(f"inventory not found for '{server}'. Available: {', '.join(sorted(inventory_files))}") return _parse_inventory_file(inventory_files[server_key]) @mcp.tool( description="Search all inventory files for an app, UUID, domain, server, or other service text.", ) def find_service(query: str) -> list[dict[str, Any]]: matches: list[dict[str, Any]] = [] for inventory_path in _iter_inventory_files(): parsed = _parse_inventory_file(inventory_path) for section_name, section in parsed["sections"].items(): for table in section["tables"]: for row in table: if _match_service(query, row): matches.append( { "inventory": parsed["file"], "section": section_name, "row": row, } ) return matches @mcp.tool( description="List Podman/Systemd unit files under containers/ and optionally filter by app name.", ) def list_container_units(name_filter: str | None = None) -> list[dict[str, str]]: results: list[dict[str, str]] = [] for path in sorted(CONTAINERS_ROOT.iterdir()): if not path.is_file(): continue if path.suffix not in {".container", ".service"}: continue relative = path.relative_to(REPO_ROOT).as_posix() if name_filter and name_filter.lower() not in relative.lower(): continue results.append({"name": path.name, "relative_path": relative, "kind": path.suffix.lstrip(".")}) return results @mcp.tool( description="Read a container unit file from containers/ for Podman/Systemd runtime analysis.", ) def read_container_unit(name: str) -> dict[str, str]: candidates = [ path for path in CONTAINERS_ROOT.iterdir() if path.is_file() and path.suffix in {".container", ".service"} and (path.name == name or name.lower() in path.name.lower()) ] if not candidates: raise ValueError(f"container unit not found: {name}") if len(candidates) > 1: names = ", ".join(path.name for path in candidates) raise ValueError(f"multiple container units matched '{name}': {names}") path = candidates[0] return { "relative_path": path.relative_to(REPO_ROOT).as_posix(), "content": _read_text(path, max_chars=16000), } @mcp.tool( description="Read a repo document from README, vps, databases, k3s, or containers paths.", ) def read_repo_document(relative_path: str, max_chars: int = 12000) -> dict[str, str]: path = _safe_doc_path(relative_path) return { "relative_path": path.relative_to(REPO_ROOT).as_posix(), "content": _read_text(path, max_chars=max_chars), } @mcp.tool( description="Search the repo for infrastructure terms such as app names, domains, env keys, or container names.", ) def grep_repo(query: str, glob: str | None = None) -> dict[str, Any]: command = ["rg", "-n", query, str(REPO_ROOT)] if glob: command.extend(["-g", glob]) completed = subprocess.run( command, cwd=REPO_ROOT, capture_output=True, text=True, timeout=30, check=False, ) results = completed.stdout.splitlines() return { "exit_code": completed.returncode, "matches": results[:200], "truncated": len(results) > 200, "stderr": completed.stderr[-4000:], } @mcp.tool( description="Return a compact summary of the actual infracloud repo layout so agents do not assume missing folders like dev-scripts or docs/openproject.", ) def repo_layout_summary() -> dict[str, Any]: return { "repo_root": str(REPO_ROOT), "present_top_level_dirs": sorted(path.name for path in REPO_ROOT.iterdir() if path.is_dir()), "scripts_root": SCRIPTS_ROOT.relative_to(REPO_ROOT).as_posix() if SCRIPTS_ROOT.exists() else None, "inventory_files": [path.relative_to(REPO_ROOT).as_posix() for path in _iter_inventory_files()], "container_units": [path.name for path in CONTAINERS_ROOT.iterdir() if path.is_file() and path.suffix in {".container", ".service"}], "notes": [ "The repo uses scripts/auto-organized instead of dev-scripts.", "The repo does not currently include docs/openproject.", "AGENT.md contains secrets and should not be used as a runtime configuration source.", f"Optional Postgres integration is enabled through {POSTGRES_DSN_ENV}.", ], } @mcp.tool( description="Return the configured Postgres status for the MCP server and basic connectivity details.", ) def postgres_healthcheck() -> dict[str, Any]: dsn = _postgres_dsn() if not dsn: return {"configured": False, "env_var": POSTGRES_DSN_ENV} with _get_pg_connection() as conn: with conn.cursor() as cur: cur.execute("SELECT current_database() AS database, current_user AS user, version() AS version") row = cur.fetchone() return { "configured": True, "env_var": POSTGRES_DSN_ENV, "database": row["database"], "user": row["user"], "version": row["version"], } @mcp.tool( description="Execute a read-only SELECT query against the MCP Postgres database.", ) def postgres_query(sql: str, limit: int = 100) -> dict[str, Any]: if limit < 1 or limit > 500: raise ValueError("limit must be between 1 and 500") safe_sql = _ensure_read_only_sql(sql) wrapped = f"SELECT * FROM ({safe_sql.rstrip().rstrip(';')}) AS q LIMIT {limit}" with _get_pg_connection() as conn: with conn.cursor() as cur: cur.execute(wrapped) rows = cur.fetchall() return { "row_count": len(rows), "rows": rows, } @mcp.tool( description="Store a short operational note in the MCP Postgres database for future support sessions.", ) def add_operational_note(scope: str, title: str, body: str) -> dict[str, Any]: if not scope.strip() or not title.strip() or not body.strip(): raise ValueError("scope, title, and body are required") _ensure_mcp_tables() with _get_pg_connection() as conn: with conn.cursor() as cur: cur.execute( """ INSERT INTO infra_mcp_notes (scope, title, body) VALUES (%s, %s, %s) RETURNING id, scope, title, body, created_at """, (scope.strip(), title.strip(), body.strip()), ) row = cur.fetchone() conn.commit() return row @mcp.tool( description="List recent operational notes stored in the MCP Postgres database.", ) def list_operational_notes(scope: str | None = None, limit: int = 20) -> list[dict[str, Any]]: if limit < 1 or limit > 200: raise ValueError("limit must be between 1 and 200") _ensure_mcp_tables() query = """ SELECT id, scope, title, body, created_at FROM infra_mcp_notes """ params: list[Any] = [] if scope and scope.strip(): query += " WHERE scope = %s" params.append(scope.strip()) query += " ORDER BY created_at DESC LIMIT %s" params.append(limit) with _get_pg_connection() as conn: with conn.cursor() as cur: cur.execute(query, params) return cur.fetchall() if __name__ == "__main__": _ensure_mcp_tables() mcp.run()