diff --git a/mcp/config.json b/mcp/config.json new file mode 100644 index 0000000..052b26a --- /dev/null +++ b/mcp/config.json @@ -0,0 +1,14 @@ +{ + "mcpServers": { + "infracloud-multi-project": { + "command": "python3", + "args": [ + "/root/infracloud/mcp/server.py" + ], + "env": { + "INFRA_MCP_POSTGRES_DSN": "postgresql://user:password@localhost:5432/infracloud_mcp", + "INFRA_MCP_TRANSPORT": "stdio" + } + } + } +} diff --git a/mcp/server.py b/mcp/server.py index be44381..843745e 100644 --- a/mcp/server.py +++ b/mcp/server.py @@ -10,56 +10,39 @@ from mcp.server.fastmcp import FastMCP import psycopg from psycopg.rows import dict_row +# Mapeamento de Projetos +PROJECTS = { + "infracloud": Path("/root/infracloud"), + "gohorsejobs": Path("/root/gohorsejobs"), + "core": Path("/root/core"), + "saveinmed": Path("/root/saveinmed"), +} + +DEFAULT_PROJECT = "infracloud" -REPO_ROOT = Path(__file__).resolve().parents[1] -SCRIPTS_ROOT = REPO_ROOT / "scripts" / "auto-organized" -VPS_ROOT = REPO_ROOT / "vps" -CONTAINERS_ROOT = REPO_ROOT / "containers" -DATABASES_ROOT = REPO_ROOT / "databases" -K3S_ROOT = REPO_ROOT / "k3s" POSTGRES_DSN_ENV = "INFRA_MCP_POSTGRES_DSN" TRANSPORT_ENV = "INFRA_MCP_TRANSPORT" HOST_ENV = "INFRA_MCP_HOST" PORT_ENV = "INFRA_MCP_PORT" -DOC_ALLOWLIST = ( - REPO_ROOT / "README.md", - VPS_ROOT, - CONTAINERS_ROOT, - DATABASES_ROOT, - K3S_ROOT, -) READ_ONLY_SCRIPT_PREFIXES = ( - "check_", - "fetch_", - "get_", - "inspect_", - "verify_", - "final_status", - "watch_", + "check_", "fetch_", "get_", "inspect_", "verify_", "final_status", "watch_", ) MUTATING_SCRIPT_PREFIXES = ( - "approve_", - "complete_", - "fix_", - "merge_", - "retrigger_", - "revert_", + "approve_", "complete_", "fix_", "merge_", "retrigger_", "revert_", ) mcp = FastMCP( - "infracloud-sustentacao", + "infracloud-multi-project", instructions=( - "Use the real infracloud repository as the source of truth. " - "Prefer inventory markdown, container unit files, and existing scripts. " - "Do not assume paths like dev-scripts or docs/openproject if they do not exist. " - "If Postgres is configured, prefer the MCP Postgres helpers for server-side persistence and diagnostics." + "Source of truth for infracloud, gohorsejobs, and core repositories. " + "Always specify the project parameter when working outside infracloud. " + "Use real repository files and inventory markdown as reference." ), host=os.getenv(HOST_ENV, "127.0.0.1"), port=int(os.getenv(PORT_ENV, "8000")), ) - @dataclass(frozen=True) class ScriptInfo: path: Path @@ -67,15 +50,21 @@ class ScriptInfo: is_read_only: bool kind: str +def _get_project_root(project: str | None = None) -> Path: + name = project or DEFAULT_PROJECT + if name not in PROJECTS: + raise ValueError(f"Project '{name}' not found. Available: {', '.join(PROJECTS.keys())}") + return PROJECTS[name] + +def _ensure_in_project(path: Path, project: str | None = None) -> Path: + root = _get_project_root(project) + resolved = path.resolve() + if root not in resolved.parents and resolved != root: + raise ValueError(f"Path escapes project root: {path}") + return resolved def _postgres_dsn() -> str | None: - value = os.getenv(POSTGRES_DSN_ENV, "").strip() - return value or None - - -def _postgres_enabled() -> bool: - return _postgres_dsn() is not None - + return os.getenv(POSTGRES_DSN_ENV, "").strip() or None def _get_pg_connection(): dsn = _postgres_dsn() @@ -83,459 +72,95 @@ def _get_pg_connection(): raise ValueError(f"{POSTGRES_DSN_ENV} is not configured") return psycopg.connect(dsn, row_factory=dict_row) - def _ensure_mcp_tables() -> None: - if not _postgres_enabled(): - return - + if not _postgres_dsn(): return with _get_pg_connection() as conn: with conn.cursor() as cur: - cur.execute( - """ + cur.execute(""" CREATE TABLE IF NOT EXISTS infra_mcp_notes ( id BIGSERIAL PRIMARY KEY, scope TEXT NOT NULL, + project TEXT NOT NULL DEFAULT 'infracloud', title TEXT NOT NULL, body TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ) - """ - ) + """) conn.commit() +@mcp.tool() +def list_repo_scripts(project: str | None = None, name_filter: str | None = None) -> list[dict[str, Any]]: + """List scripts in scripts/auto-organized for the specified project.""" + root = _get_project_root(project) + scripts_dir = root / "scripts" / "auto-organized" + if not scripts_dir.exists(): return [] -def _ensure_in_repo(path: Path) -> Path: - resolved = path.resolve() - if REPO_ROOT not in resolved.parents and resolved != REPO_ROOT: - raise ValueError(f"path escapes repository root: {path}") - return resolved - - -def _script_kind(name: str) -> str: - lower = name.lower() - if lower.endswith(".ps1"): - return "powershell" - if lower.endswith(".sh"): - return "shell" - return "other" - - -def _is_read_only_script(name: str) -> bool: - lower = name.lower() - if lower.endswith((".json", ".yaml", ".yml", ".txt", ".pem")): - return False - if lower.startswith(MUTATING_SCRIPT_PREFIXES): - return False - return lower.startswith(READ_ONLY_SCRIPT_PREFIXES) - - -def _list_scripts() -> list[ScriptInfo]: - if not SCRIPTS_ROOT.exists(): - return [] - - results: list[ScriptInfo] = [] - for path in sorted(SCRIPTS_ROOT.rglob("*")): - if not path.is_file(): - continue - relative = path.relative_to(REPO_ROOT).as_posix() - results.append( - ScriptInfo( - path=path, - relative_path=relative, - is_read_only=_is_read_only_script(path.name), - kind=_script_kind(path.name), - ) - ) - return results - - -def _resolve_script(script_name: str) -> ScriptInfo: - script_name = script_name.replace("\\", "/").strip() - candidates = _list_scripts() - - exact = [item for item in candidates if item.relative_path == script_name or item.path.name == script_name] - if len(exact) == 1: - return exact[0] - if len(exact) > 1: - raise ValueError(f"multiple scripts matched '{script_name}', use a repo-relative path") - - fuzzy = [item for item in candidates if script_name.lower() in item.relative_path.lower()] - if len(fuzzy) == 1: - return fuzzy[0] - if len(fuzzy) > 1: - names = ", ".join(item.relative_path for item in fuzzy[:10]) - raise ValueError(f"multiple scripts matched '{script_name}': {names}") - - raise ValueError(f"script not found: {script_name}") - - -def _read_text(path: Path, max_chars: int = 20000) -> str: - resolved = _ensure_in_repo(path) - text = resolved.read_text(encoding="utf-8", errors="replace") - if len(text) > max_chars: - return text[:max_chars] + "\n... [truncated]" - return text - - -def _parse_markdown_table(lines: list[str], start_index: int) -> tuple[list[dict[str, str]], int]: - header_line = lines[start_index].strip() - separator_index = start_index + 1 - if separator_index >= len(lines): - return [], start_index + 1 - - separator_line = lines[separator_index].strip() - if "|" not in header_line or "|" not in separator_line: - return [], start_index + 1 - - headers = [part.strip(" `") for part in header_line.strip("|").split("|")] - rows: list[dict[str, str]] = [] - index = start_index + 2 - while index < len(lines): - line = lines[index].rstrip() - if "|" not in line or not line.strip().startswith("|"): - break - values = [part.strip() for part in line.strip().strip("|").split("|")] - if len(values) == len(headers): - rows.append(dict(zip(headers, values))) - index += 1 - - return rows, index - - -def _parse_inventory_file(path: Path) -> dict[str, Any]: - lines = _read_text(path, max_chars=120000).splitlines() - parsed: dict[str, Any] = {"file": path.relative_to(REPO_ROOT).as_posix(), "sections": {}} - current_section = "root" - parsed["sections"][current_section] = {"tables": [], "paragraphs": []} - - index = 0 - while index < len(lines): - line = lines[index].rstrip() - if line.startswith("#"): - current_section = line.lstrip("#").strip() - parsed["sections"].setdefault(current_section, {"tables": [], "paragraphs": []}) - index += 1 - continue - - if line.strip().startswith("|"): - rows, next_index = _parse_markdown_table(lines, index) - if rows: - parsed["sections"][current_section]["tables"].append(rows) - index = next_index - continue - - if line.strip(): - parsed["sections"][current_section]["paragraphs"].append(line.strip()) - index += 1 - - return parsed - - -def _iter_inventory_files() -> list[Path]: - return sorted(VPS_ROOT.rglob("services_inventory.md")) + sorted(K3S_ROOT.rglob("services_inventory.md")) - - -def _match_service(query: str, row: dict[str, str]) -> bool: - haystack = " ".join(str(value) for value in row.values()).lower() - return query.lower() in haystack - - -def _safe_doc_path(relative_path: str) -> Path: - relative = Path(relative_path) - candidate = _ensure_in_repo(REPO_ROOT / relative) - for allowed in DOC_ALLOWLIST: - allowed_resolved = allowed.resolve() - if candidate == allowed_resolved or allowed_resolved in candidate.parents: - return candidate - raise ValueError(f"path not allowed: {relative_path}") - - -def _ensure_read_only_sql(sql: str) -> str: - normalized = sql.strip().lstrip("(").strip().lower() - if not normalized.startswith("select"): - raise ValueError("only SELECT queries are allowed") - forbidden = ("insert ", "update ", "delete ", "drop ", "alter ", "truncate ", "create ", "grant ", "revoke ") - if any(token in normalized for token in forbidden): - raise ValueError("query contains non-read-only statements") - return sql - - -@mcp.tool( - description="List scripts available in scripts/auto-organized, including whether each one is safe for read-only execution.", -) -def list_repo_scripts(name_filter: str | None = None) -> list[dict[str, Any]]: - scripts = _list_scripts() + results = [] + for path in sorted(scripts_dir.rglob("*")): + if not path.is_file(): continue + results.append({ + "name": path.name, + "relative_path": path.relative_to(root).as_posix(), + "read_only": path.name.lower().startswith(READ_ONLY_SCRIPT_PREFIXES), + "kind": "shell" if path.suffix == ".sh" else "powershell" if path.suffix == ".ps1" else "other" + }) if name_filter: - scripts = [item for item in scripts if name_filter.lower() in item.relative_path.lower()] - - return [ - { - "name": item.path.name, - "relative_path": item.relative_path, - "kind": item.kind, - "read_only": item.is_read_only, - } - for item in scripts - ] - - -@mcp.tool( - description="Run an existing repo script from scripts/auto-organized. Only read-only diagnostic scripts are executable.", -) -def run_repo_script(script_name: str, args: list[str] | None = None, timeout_seconds: int = 60) -> dict[str, Any]: - script = _resolve_script(script_name) - if not script.is_read_only: - raise ValueError( - f"script '{script.relative_path}' is not classified as read-only and cannot be executed by this tool" - ) - - args = args or [] - if script.kind == "powershell": - command = [ - "powershell", - "-NoProfile", - "-ExecutionPolicy", - "Bypass", - "-File", - str(script.path), - *args, - ] - elif script.kind == "shell": - command = ["bash", str(script.path), *args] - else: - raise ValueError(f"unsupported script type: {script.kind}") - - completed = subprocess.run( - command, - cwd=REPO_ROOT, - capture_output=True, - text=True, - timeout=timeout_seconds, - check=False, - ) - return { - "script": script.relative_path, - "exit_code": completed.returncode, - "stdout": completed.stdout[-12000:], - "stderr": completed.stderr[-12000:], - } - - -@mcp.tool( - description="Parse one services_inventory.md file into structured JSON. Server examples: redbull, vim, nc1, k3s.", -) -def read_services_inventory(server: str) -> dict[str, Any]: - inventory_files = {path.parent.name.lower(): path for path in _iter_inventory_files()} - server_key = server.lower().strip() - if server_key not in inventory_files: - raise ValueError(f"inventory not found for '{server}'. Available: {', '.join(sorted(inventory_files))}") - return _parse_inventory_file(inventory_files[server_key]) - - -@mcp.tool( - description="Search all inventory files for an app, UUID, domain, server, or other service text.", -) -def find_service(query: str) -> list[dict[str, Any]]: - matches: list[dict[str, Any]] = [] - for inventory_path in _iter_inventory_files(): - parsed = _parse_inventory_file(inventory_path) - for section_name, section in parsed["sections"].items(): - for table in section["tables"]: - for row in table: - if _match_service(query, row): - matches.append( - { - "inventory": parsed["file"], - "section": section_name, - "row": row, - } - ) - return matches - - -@mcp.tool( - description="List Podman/Systemd unit files under containers/ and optionally filter by app name.", -) -def list_container_units(name_filter: str | None = None) -> list[dict[str, str]]: - results: list[dict[str, str]] = [] - for path in sorted(CONTAINERS_ROOT.iterdir()): - if not path.is_file(): - continue - if path.suffix not in {".container", ".service"}: - continue - relative = path.relative_to(REPO_ROOT).as_posix() - if name_filter and name_filter.lower() not in relative.lower(): - continue - results.append({"name": path.name, "relative_path": relative, "kind": path.suffix.lstrip(".")}) + results = [s for s in results if name_filter.lower() in s["relative_path"].lower()] return results +@mcp.tool() +def grep_project(query: str, project: str | None = None, glob: str | None = None) -> dict[str, Any]: + """Search the specified project for infrastructure or code terms.""" + root = _get_project_root(project) + command = ["rg", "-n", query, str(root)] + if glob: command.extend(["-g", glob]) -@mcp.tool( - description="Read a container unit file from containers/ for Podman/Systemd runtime analysis.", -) -def read_container_unit(name: str) -> dict[str, str]: - candidates = [ - path - for path in CONTAINERS_ROOT.iterdir() - if path.is_file() and path.suffix in {".container", ".service"} and (path.name == name or name.lower() in path.name.lower()) - ] - if not candidates: - raise ValueError(f"container unit not found: {name}") - if len(candidates) > 1: - names = ", ".join(path.name for path in candidates) - raise ValueError(f"multiple container units matched '{name}': {names}") - - path = candidates[0] - return { - "relative_path": path.relative_to(REPO_ROOT).as_posix(), - "content": _read_text(path, max_chars=16000), - } - - -@mcp.tool( - description="Read a repo document from README, vps, databases, k3s, or containers paths.", -) -def read_repo_document(relative_path: str, max_chars: int = 12000) -> dict[str, str]: - path = _safe_doc_path(relative_path) - return { - "relative_path": path.relative_to(REPO_ROOT).as_posix(), - "content": _read_text(path, max_chars=max_chars), - } - - -@mcp.tool( - description="Search the repo for infrastructure terms such as app names, domains, env keys, or container names.", -) -def grep_repo(query: str, glob: str | None = None) -> dict[str, Any]: - command = ["rg", "-n", query, str(REPO_ROOT)] - if glob: - command.extend(["-g", glob]) - - completed = subprocess.run( - command, - cwd=REPO_ROOT, - capture_output=True, - text=True, - timeout=30, - check=False, - ) + completed = subprocess.run(command, capture_output=True, text=True, timeout=30) results = completed.stdout.splitlines() return { - "exit_code": completed.returncode, + "project": project or DEFAULT_PROJECT, "matches": results[:200], "truncated": len(results) > 200, "stderr": completed.stderr[-4000:], } - -@mcp.tool( - description="Return a compact summary of the actual infracloud repo layout so agents do not assume missing folders like dev-scripts or docs/openproject.", -) -def repo_layout_summary() -> dict[str, Any]: +@mcp.tool() +def repo_layout_summary(project: str | None = None) -> dict[str, Any]: + """Return a compact summary of the specified project layout.""" + root = _get_project_root(project) return { - "repo_root": str(REPO_ROOT), - "present_top_level_dirs": sorted(path.name for path in REPO_ROOT.iterdir() if path.is_dir()), - "scripts_root": SCRIPTS_ROOT.relative_to(REPO_ROOT).as_posix() if SCRIPTS_ROOT.exists() else None, - "inventory_files": [path.relative_to(REPO_ROOT).as_posix() for path in _iter_inventory_files()], - "container_units": [path.name for path in CONTAINERS_ROOT.iterdir() if path.is_file() and path.suffix in {".container", ".service"}], - "notes": [ - "The repo uses scripts/auto-organized instead of dev-scripts.", - "The repo does not currently include docs/openproject.", - "AGENT.md contains secrets and should not be used as a runtime configuration source.", - f"Optional Postgres integration is enabled through {POSTGRES_DSN_ENV}.", - ], + "project": project or DEFAULT_PROJECT, + "root": str(root), + "dirs": sorted(path.name for path in root.iterdir() if path.is_dir() and not path.name.startswith(".")), + "important_files": [f.name for f in root.glob("*.md")] } - -@mcp.tool( - description="Return the configured Postgres status for the MCP server and basic connectivity details.", -) -def postgres_healthcheck() -> dict[str, Any]: - dsn = _postgres_dsn() - if not dsn: - return {"configured": False, "env_var": POSTGRES_DSN_ENV} - - with _get_pg_connection() as conn: - with conn.cursor() as cur: - cur.execute("SELECT current_database() AS database, current_user AS user, version() AS version") - row = cur.fetchone() +@mcp.tool() +def read_project_file(relative_path: str, project: str | None = None) -> dict[str, str]: + """Read a specific file from the selected project.""" + root = _get_project_root(project) + path = _ensure_in_project(root / relative_path, project) return { - "configured": True, - "env_var": POSTGRES_DSN_ENV, - "database": row["database"], - "user": row["user"], - "version": row["version"], + "project": project or DEFAULT_PROJECT, + "content": path.read_text(encoding="utf-8", errors="replace")[:20000] } - -@mcp.tool( - description="Execute a read-only SELECT query against the MCP Postgres database.", -) -def postgres_query(sql: str, limit: int = 100) -> dict[str, Any]: - if limit < 1 or limit > 500: - raise ValueError("limit must be between 1 and 500") - - safe_sql = _ensure_read_only_sql(sql) - wrapped = f"SELECT * FROM ({safe_sql.rstrip().rstrip(';')}) AS q LIMIT {limit}" - - with _get_pg_connection() as conn: - with conn.cursor() as cur: - cur.execute(wrapped) - rows = cur.fetchall() - return { - "row_count": len(rows), - "rows": rows, - } - - -@mcp.tool( - description="Store a short operational note in the MCP Postgres database for future support sessions.", -) -def add_operational_note(scope: str, title: str, body: str) -> dict[str, Any]: - if not scope.strip() or not title.strip() or not body.strip(): - raise ValueError("scope, title, and body are required") - +@mcp.tool() +def add_project_note(title: str, body: str, project: str | None = None, scope: str = "general") -> dict[str, Any]: + """Store an operational note for a specific project in the database.""" _ensure_mcp_tables() + p_name = project or DEFAULT_PROJECT with _get_pg_connection() as conn: with conn.cursor() as cur: - cur.execute( - """ - INSERT INTO infra_mcp_notes (scope, title, body) - VALUES (%s, %s, %s) - RETURNING id, scope, title, body, created_at - """, - (scope.strip(), title.strip(), body.strip()), - ) + cur.execute(""" + INSERT INTO infra_mcp_notes (scope, project, title, body) + VALUES (%s, %s, %s, %s) + RETURNING id, project, title, body, created_at + """, (scope, p_name, title, body)) row = cur.fetchone() conn.commit() return row - -@mcp.tool( - description="List recent operational notes stored in the MCP Postgres database.", -) -def list_operational_notes(scope: str | None = None, limit: int = 20) -> list[dict[str, Any]]: - if limit < 1 or limit > 200: - raise ValueError("limit must be between 1 and 200") - - _ensure_mcp_tables() - query = """ - SELECT id, scope, title, body, created_at - FROM infra_mcp_notes - """ - params: list[Any] = [] - if scope and scope.strip(): - query += " WHERE scope = %s" - params.append(scope.strip()) - query += " ORDER BY created_at DESC LIMIT %s" - params.append(limit) - - with _get_pg_connection() as conn: - with conn.cursor() as cur: - cur.execute(query, params) - return cur.fetchall() - - if __name__ == "__main__": _ensure_mcp_tables() mcp.run(transport=os.getenv(TRANSPORT_ENV, "stdio"))