infracloud/mcp/server.py
2026-03-09 09:36:36 -03:00

394 lines
13 KiB
Python

from __future__ import annotations
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from mcp.server.fastmcp import FastMCP
REPO_ROOT = Path(__file__).resolve().parents[1]
SCRIPTS_ROOT = REPO_ROOT / "scripts" / "auto-organized"
VPS_ROOT = REPO_ROOT / "vps"
CONTAINERS_ROOT = REPO_ROOT / "containers"
DATABASES_ROOT = REPO_ROOT / "databases"
K3S_ROOT = REPO_ROOT / "k3s"
DOC_ALLOWLIST = (
REPO_ROOT / "README.md",
VPS_ROOT,
CONTAINERS_ROOT,
DATABASES_ROOT,
K3S_ROOT,
)
READ_ONLY_SCRIPT_PREFIXES = (
"check_",
"fetch_",
"get_",
"inspect_",
"verify_",
"final_status",
"watch_",
)
MUTATING_SCRIPT_PREFIXES = (
"approve_",
"complete_",
"fix_",
"merge_",
"retrigger_",
"revert_",
)
mcp = FastMCP(
"infracloud-sustentacao",
instructions=(
"Use the real infracloud repository as the source of truth. "
"Prefer inventory markdown, container unit files, and existing scripts. "
"Do not assume paths like dev-scripts or docs/openproject if they do not exist."
),
)
@dataclass(frozen=True)
class ScriptInfo:
path: Path
relative_path: str
is_read_only: bool
kind: str
def _ensure_in_repo(path: Path) -> Path:
resolved = path.resolve()
if REPO_ROOT not in resolved.parents and resolved != REPO_ROOT:
raise ValueError(f"path escapes repository root: {path}")
return resolved
def _script_kind(name: str) -> str:
lower = name.lower()
if lower.endswith(".ps1"):
return "powershell"
if lower.endswith(".sh"):
return "shell"
return "other"
def _is_read_only_script(name: str) -> bool:
lower = name.lower()
if lower.endswith((".json", ".yaml", ".yml", ".txt", ".pem")):
return False
if lower.startswith(MUTATING_SCRIPT_PREFIXES):
return False
return lower.startswith(READ_ONLY_SCRIPT_PREFIXES)
def _list_scripts() -> list[ScriptInfo]:
if not SCRIPTS_ROOT.exists():
return []
results: list[ScriptInfo] = []
for path in sorted(SCRIPTS_ROOT.rglob("*")):
if not path.is_file():
continue
relative = path.relative_to(REPO_ROOT).as_posix()
results.append(
ScriptInfo(
path=path,
relative_path=relative,
is_read_only=_is_read_only_script(path.name),
kind=_script_kind(path.name),
)
)
return results
def _resolve_script(script_name: str) -> ScriptInfo:
script_name = script_name.replace("\\", "/").strip()
candidates = _list_scripts()
exact = [item for item in candidates if item.relative_path == script_name or item.path.name == script_name]
if len(exact) == 1:
return exact[0]
if len(exact) > 1:
raise ValueError(f"multiple scripts matched '{script_name}', use a repo-relative path")
fuzzy = [item for item in candidates if script_name.lower() in item.relative_path.lower()]
if len(fuzzy) == 1:
return fuzzy[0]
if len(fuzzy) > 1:
names = ", ".join(item.relative_path for item in fuzzy[:10])
raise ValueError(f"multiple scripts matched '{script_name}': {names}")
raise ValueError(f"script not found: {script_name}")
def _read_text(path: Path, max_chars: int = 20000) -> str:
resolved = _ensure_in_repo(path)
text = resolved.read_text(encoding="utf-8", errors="replace")
if len(text) > max_chars:
return text[:max_chars] + "\n... [truncated]"
return text
def _parse_markdown_table(lines: list[str], start_index: int) -> tuple[list[dict[str, str]], int]:
header_line = lines[start_index].strip()
separator_index = start_index + 1
if separator_index >= len(lines):
return [], start_index + 1
separator_line = lines[separator_index].strip()
if "|" not in header_line or "|" not in separator_line:
return [], start_index + 1
headers = [part.strip(" `") for part in header_line.strip("|").split("|")]
rows: list[dict[str, str]] = []
index = start_index + 2
while index < len(lines):
line = lines[index].rstrip()
if "|" not in line or not line.strip().startswith("|"):
break
values = [part.strip() for part in line.strip().strip("|").split("|")]
if len(values) == len(headers):
rows.append(dict(zip(headers, values)))
index += 1
return rows, index
def _parse_inventory_file(path: Path) -> dict[str, Any]:
lines = _read_text(path, max_chars=120000).splitlines()
parsed: dict[str, Any] = {"file": path.relative_to(REPO_ROOT).as_posix(), "sections": {}}
current_section = "root"
parsed["sections"][current_section] = {"tables": [], "paragraphs": []}
index = 0
while index < len(lines):
line = lines[index].rstrip()
if line.startswith("#"):
current_section = line.lstrip("#").strip()
parsed["sections"].setdefault(current_section, {"tables": [], "paragraphs": []})
index += 1
continue
if line.strip().startswith("|"):
rows, next_index = _parse_markdown_table(lines, index)
if rows:
parsed["sections"][current_section]["tables"].append(rows)
index = next_index
continue
if line.strip():
parsed["sections"][current_section]["paragraphs"].append(line.strip())
index += 1
return parsed
def _iter_inventory_files() -> list[Path]:
return sorted(VPS_ROOT.rglob("services_inventory.md")) + sorted(K3S_ROOT.rglob("services_inventory.md"))
def _match_service(query: str, row: dict[str, str]) -> bool:
haystack = " ".join(str(value) for value in row.values()).lower()
return query.lower() in haystack
def _safe_doc_path(relative_path: str) -> Path:
relative = Path(relative_path)
candidate = _ensure_in_repo(REPO_ROOT / relative)
for allowed in DOC_ALLOWLIST:
allowed_resolved = allowed.resolve()
if candidate == allowed_resolved or allowed_resolved in candidate.parents:
return candidate
raise ValueError(f"path not allowed: {relative_path}")
@mcp.tool(
description="List scripts available in scripts/auto-organized, including whether each one is safe for read-only execution.",
)
def list_repo_scripts(name_filter: str | None = None) -> list[dict[str, Any]]:
scripts = _list_scripts()
if name_filter:
scripts = [item for item in scripts if name_filter.lower() in item.relative_path.lower()]
return [
{
"name": item.path.name,
"relative_path": item.relative_path,
"kind": item.kind,
"read_only": item.is_read_only,
}
for item in scripts
]
@mcp.tool(
description="Run an existing repo script from scripts/auto-organized. Only read-only diagnostic scripts are executable.",
)
def run_repo_script(script_name: str, args: list[str] | None = None, timeout_seconds: int = 60) -> dict[str, Any]:
script = _resolve_script(script_name)
if not script.is_read_only:
raise ValueError(
f"script '{script.relative_path}' is not classified as read-only and cannot be executed by this tool"
)
args = args or []
if script.kind == "powershell":
command = [
"powershell",
"-NoProfile",
"-ExecutionPolicy",
"Bypass",
"-File",
str(script.path),
*args,
]
elif script.kind == "shell":
command = ["bash", str(script.path), *args]
else:
raise ValueError(f"unsupported script type: {script.kind}")
completed = subprocess.run(
command,
cwd=REPO_ROOT,
capture_output=True,
text=True,
timeout=timeout_seconds,
check=False,
)
return {
"script": script.relative_path,
"exit_code": completed.returncode,
"stdout": completed.stdout[-12000:],
"stderr": completed.stderr[-12000:],
}
@mcp.tool(
description="Parse one services_inventory.md file into structured JSON. Server examples: redbull, vim, nc1, k3s.",
)
def read_services_inventory(server: str) -> dict[str, Any]:
inventory_files = {path.parent.name.lower(): path for path in _iter_inventory_files()}
server_key = server.lower().strip()
if server_key not in inventory_files:
raise ValueError(f"inventory not found for '{server}'. Available: {', '.join(sorted(inventory_files))}")
return _parse_inventory_file(inventory_files[server_key])
@mcp.tool(
description="Search all inventory files for an app, UUID, domain, server, or other service text.",
)
def find_service(query: str) -> list[dict[str, Any]]:
matches: list[dict[str, Any]] = []
for inventory_path in _iter_inventory_files():
parsed = _parse_inventory_file(inventory_path)
for section_name, section in parsed["sections"].items():
for table in section["tables"]:
for row in table:
if _match_service(query, row):
matches.append(
{
"inventory": parsed["file"],
"section": section_name,
"row": row,
}
)
return matches
@mcp.tool(
description="List Podman/Systemd unit files under containers/ and optionally filter by app name.",
)
def list_container_units(name_filter: str | None = None) -> list[dict[str, str]]:
results: list[dict[str, str]] = []
for path in sorted(CONTAINERS_ROOT.iterdir()):
if not path.is_file():
continue
if path.suffix not in {".container", ".service"}:
continue
relative = path.relative_to(REPO_ROOT).as_posix()
if name_filter and name_filter.lower() not in relative.lower():
continue
results.append({"name": path.name, "relative_path": relative, "kind": path.suffix.lstrip(".")})
return results
@mcp.tool(
description="Read a container unit file from containers/ for Podman/Systemd runtime analysis.",
)
def read_container_unit(name: str) -> dict[str, str]:
candidates = [
path
for path in CONTAINERS_ROOT.iterdir()
if path.is_file() and path.suffix in {".container", ".service"} and (path.name == name or name.lower() in path.name.lower())
]
if not candidates:
raise ValueError(f"container unit not found: {name}")
if len(candidates) > 1:
names = ", ".join(path.name for path in candidates)
raise ValueError(f"multiple container units matched '{name}': {names}")
path = candidates[0]
return {
"relative_path": path.relative_to(REPO_ROOT).as_posix(),
"content": _read_text(path, max_chars=16000),
}
@mcp.tool(
description="Read a repo document from README, vps, databases, k3s, or containers paths.",
)
def read_repo_document(relative_path: str, max_chars: int = 12000) -> dict[str, str]:
path = _safe_doc_path(relative_path)
return {
"relative_path": path.relative_to(REPO_ROOT).as_posix(),
"content": _read_text(path, max_chars=max_chars),
}
@mcp.tool(
description="Search the repo for infrastructure terms such as app names, domains, env keys, or container names.",
)
def grep_repo(query: str, glob: str | None = None) -> dict[str, Any]:
command = ["rg", "-n", query, str(REPO_ROOT)]
if glob:
command.extend(["-g", glob])
completed = subprocess.run(
command,
cwd=REPO_ROOT,
capture_output=True,
text=True,
timeout=30,
check=False,
)
results = completed.stdout.splitlines()
return {
"exit_code": completed.returncode,
"matches": results[:200],
"truncated": len(results) > 200,
"stderr": completed.stderr[-4000:],
}
@mcp.tool(
description="Return a compact summary of the actual infracloud repo layout so agents do not assume missing folders like dev-scripts or docs/openproject.",
)
def repo_layout_summary() -> dict[str, Any]:
return {
"repo_root": str(REPO_ROOT),
"present_top_level_dirs": sorted(path.name for path in REPO_ROOT.iterdir() if path.is_dir()),
"scripts_root": SCRIPTS_ROOT.relative_to(REPO_ROOT).as_posix() if SCRIPTS_ROOT.exists() else None,
"inventory_files": [path.relative_to(REPO_ROOT).as_posix() for path in _iter_inventory_files()],
"container_units": [path.name for path in CONTAINERS_ROOT.iterdir() if path.is_file() and path.suffix in {".container", ".service"}],
"notes": [
"The repo uses scripts/auto-organized instead of dev-scripts.",
"The repo does not currently include docs/openproject.",
"AGENT.md contains secrets and should not be used as a runtime configuration source.",
],
}
if __name__ == "__main__":
mcp.run()