feat(mcp): upgrade to multi-project support
Added support for gohorsejobs, core, and saveinmed projects. Updated tools to accept an optional 'project' parameter. Configured Linux-compatible config.json. Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
parent
d2d6dfb35e
commit
806f9be919
2 changed files with 94 additions and 455 deletions
14
mcp/config.json
Normal file
14
mcp/config.json
Normal file
|
|
@ -0,0 +1,14 @@
|
|||
{
|
||||
"mcpServers": {
|
||||
"infracloud-multi-project": {
|
||||
"command": "python3",
|
||||
"args": [
|
||||
"/root/infracloud/mcp/server.py"
|
||||
],
|
||||
"env": {
|
||||
"INFRA_MCP_POSTGRES_DSN": "postgresql://user:password@localhost:5432/infracloud_mcp",
|
||||
"INFRA_MCP_TRANSPORT": "stdio"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
535
mcp/server.py
535
mcp/server.py
|
|
@ -10,56 +10,39 @@ from mcp.server.fastmcp import FastMCP
|
|||
import psycopg
|
||||
from psycopg.rows import dict_row
|
||||
|
||||
# Mapeamento de Projetos
|
||||
PROJECTS = {
|
||||
"infracloud": Path("/root/infracloud"),
|
||||
"gohorsejobs": Path("/root/gohorsejobs"),
|
||||
"core": Path("/root/core"),
|
||||
"saveinmed": Path("/root/saveinmed"),
|
||||
}
|
||||
|
||||
DEFAULT_PROJECT = "infracloud"
|
||||
|
||||
REPO_ROOT = Path(__file__).resolve().parents[1]
|
||||
SCRIPTS_ROOT = REPO_ROOT / "scripts" / "auto-organized"
|
||||
VPS_ROOT = REPO_ROOT / "vps"
|
||||
CONTAINERS_ROOT = REPO_ROOT / "containers"
|
||||
DATABASES_ROOT = REPO_ROOT / "databases"
|
||||
K3S_ROOT = REPO_ROOT / "k3s"
|
||||
POSTGRES_DSN_ENV = "INFRA_MCP_POSTGRES_DSN"
|
||||
TRANSPORT_ENV = "INFRA_MCP_TRANSPORT"
|
||||
HOST_ENV = "INFRA_MCP_HOST"
|
||||
PORT_ENV = "INFRA_MCP_PORT"
|
||||
DOC_ALLOWLIST = (
|
||||
REPO_ROOT / "README.md",
|
||||
VPS_ROOT,
|
||||
CONTAINERS_ROOT,
|
||||
DATABASES_ROOT,
|
||||
K3S_ROOT,
|
||||
)
|
||||
|
||||
READ_ONLY_SCRIPT_PREFIXES = (
|
||||
"check_",
|
||||
"fetch_",
|
||||
"get_",
|
||||
"inspect_",
|
||||
"verify_",
|
||||
"final_status",
|
||||
"watch_",
|
||||
"check_", "fetch_", "get_", "inspect_", "verify_", "final_status", "watch_",
|
||||
)
|
||||
MUTATING_SCRIPT_PREFIXES = (
|
||||
"approve_",
|
||||
"complete_",
|
||||
"fix_",
|
||||
"merge_",
|
||||
"retrigger_",
|
||||
"revert_",
|
||||
"approve_", "complete_", "fix_", "merge_", "retrigger_", "revert_",
|
||||
)
|
||||
|
||||
mcp = FastMCP(
|
||||
"infracloud-sustentacao",
|
||||
"infracloud-multi-project",
|
||||
instructions=(
|
||||
"Use the real infracloud repository as the source of truth. "
|
||||
"Prefer inventory markdown, container unit files, and existing scripts. "
|
||||
"Do not assume paths like dev-scripts or docs/openproject if they do not exist. "
|
||||
"If Postgres is configured, prefer the MCP Postgres helpers for server-side persistence and diagnostics."
|
||||
"Source of truth for infracloud, gohorsejobs, and core repositories. "
|
||||
"Always specify the project parameter when working outside infracloud. "
|
||||
"Use real repository files and inventory markdown as reference."
|
||||
),
|
||||
host=os.getenv(HOST_ENV, "127.0.0.1"),
|
||||
port=int(os.getenv(PORT_ENV, "8000")),
|
||||
)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ScriptInfo:
|
||||
path: Path
|
||||
|
|
@ -67,15 +50,21 @@ class ScriptInfo:
|
|||
is_read_only: bool
|
||||
kind: str
|
||||
|
||||
def _get_project_root(project: str | None = None) -> Path:
|
||||
name = project or DEFAULT_PROJECT
|
||||
if name not in PROJECTS:
|
||||
raise ValueError(f"Project '{name}' not found. Available: {', '.join(PROJECTS.keys())}")
|
||||
return PROJECTS[name]
|
||||
|
||||
def _ensure_in_project(path: Path, project: str | None = None) -> Path:
|
||||
root = _get_project_root(project)
|
||||
resolved = path.resolve()
|
||||
if root not in resolved.parents and resolved != root:
|
||||
raise ValueError(f"Path escapes project root: {path}")
|
||||
return resolved
|
||||
|
||||
def _postgres_dsn() -> str | None:
|
||||
value = os.getenv(POSTGRES_DSN_ENV, "").strip()
|
||||
return value or None
|
||||
|
||||
|
||||
def _postgres_enabled() -> bool:
|
||||
return _postgres_dsn() is not None
|
||||
|
||||
return os.getenv(POSTGRES_DSN_ENV, "").strip() or None
|
||||
|
||||
def _get_pg_connection():
|
||||
dsn = _postgres_dsn()
|
||||
|
|
@ -83,459 +72,95 @@ def _get_pg_connection():
|
|||
raise ValueError(f"{POSTGRES_DSN_ENV} is not configured")
|
||||
return psycopg.connect(dsn, row_factory=dict_row)
|
||||
|
||||
|
||||
def _ensure_mcp_tables() -> None:
|
||||
if not _postgres_enabled():
|
||||
return
|
||||
|
||||
if not _postgres_dsn(): return
|
||||
with _get_pg_connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
cur.execute("""
|
||||
CREATE TABLE IF NOT EXISTS infra_mcp_notes (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
scope TEXT NOT NULL,
|
||||
project TEXT NOT NULL DEFAULT 'infracloud',
|
||||
title TEXT NOT NULL,
|
||||
body TEXT NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
)
|
||||
"""
|
||||
)
|
||||
""")
|
||||
conn.commit()
|
||||
|
||||
@mcp.tool()
|
||||
def list_repo_scripts(project: str | None = None, name_filter: str | None = None) -> list[dict[str, Any]]:
|
||||
"""List scripts in scripts/auto-organized for the specified project."""
|
||||
root = _get_project_root(project)
|
||||
scripts_dir = root / "scripts" / "auto-organized"
|
||||
if not scripts_dir.exists(): return []
|
||||
|
||||
def _ensure_in_repo(path: Path) -> Path:
|
||||
resolved = path.resolve()
|
||||
if REPO_ROOT not in resolved.parents and resolved != REPO_ROOT:
|
||||
raise ValueError(f"path escapes repository root: {path}")
|
||||
return resolved
|
||||
|
||||
|
||||
def _script_kind(name: str) -> str:
|
||||
lower = name.lower()
|
||||
if lower.endswith(".ps1"):
|
||||
return "powershell"
|
||||
if lower.endswith(".sh"):
|
||||
return "shell"
|
||||
return "other"
|
||||
|
||||
|
||||
def _is_read_only_script(name: str) -> bool:
|
||||
lower = name.lower()
|
||||
if lower.endswith((".json", ".yaml", ".yml", ".txt", ".pem")):
|
||||
return False
|
||||
if lower.startswith(MUTATING_SCRIPT_PREFIXES):
|
||||
return False
|
||||
return lower.startswith(READ_ONLY_SCRIPT_PREFIXES)
|
||||
|
||||
|
||||
def _list_scripts() -> list[ScriptInfo]:
|
||||
if not SCRIPTS_ROOT.exists():
|
||||
return []
|
||||
|
||||
results: list[ScriptInfo] = []
|
||||
for path in sorted(SCRIPTS_ROOT.rglob("*")):
|
||||
if not path.is_file():
|
||||
continue
|
||||
relative = path.relative_to(REPO_ROOT).as_posix()
|
||||
results.append(
|
||||
ScriptInfo(
|
||||
path=path,
|
||||
relative_path=relative,
|
||||
is_read_only=_is_read_only_script(path.name),
|
||||
kind=_script_kind(path.name),
|
||||
)
|
||||
)
|
||||
return results
|
||||
|
||||
|
||||
def _resolve_script(script_name: str) -> ScriptInfo:
|
||||
script_name = script_name.replace("\\", "/").strip()
|
||||
candidates = _list_scripts()
|
||||
|
||||
exact = [item for item in candidates if item.relative_path == script_name or item.path.name == script_name]
|
||||
if len(exact) == 1:
|
||||
return exact[0]
|
||||
if len(exact) > 1:
|
||||
raise ValueError(f"multiple scripts matched '{script_name}', use a repo-relative path")
|
||||
|
||||
fuzzy = [item for item in candidates if script_name.lower() in item.relative_path.lower()]
|
||||
if len(fuzzy) == 1:
|
||||
return fuzzy[0]
|
||||
if len(fuzzy) > 1:
|
||||
names = ", ".join(item.relative_path for item in fuzzy[:10])
|
||||
raise ValueError(f"multiple scripts matched '{script_name}': {names}")
|
||||
|
||||
raise ValueError(f"script not found: {script_name}")
|
||||
|
||||
|
||||
def _read_text(path: Path, max_chars: int = 20000) -> str:
|
||||
resolved = _ensure_in_repo(path)
|
||||
text = resolved.read_text(encoding="utf-8", errors="replace")
|
||||
if len(text) > max_chars:
|
||||
return text[:max_chars] + "\n... [truncated]"
|
||||
return text
|
||||
|
||||
|
||||
def _parse_markdown_table(lines: list[str], start_index: int) -> tuple[list[dict[str, str]], int]:
|
||||
header_line = lines[start_index].strip()
|
||||
separator_index = start_index + 1
|
||||
if separator_index >= len(lines):
|
||||
return [], start_index + 1
|
||||
|
||||
separator_line = lines[separator_index].strip()
|
||||
if "|" not in header_line or "|" not in separator_line:
|
||||
return [], start_index + 1
|
||||
|
||||
headers = [part.strip(" `") for part in header_line.strip("|").split("|")]
|
||||
rows: list[dict[str, str]] = []
|
||||
index = start_index + 2
|
||||
while index < len(lines):
|
||||
line = lines[index].rstrip()
|
||||
if "|" not in line or not line.strip().startswith("|"):
|
||||
break
|
||||
values = [part.strip() for part in line.strip().strip("|").split("|")]
|
||||
if len(values) == len(headers):
|
||||
rows.append(dict(zip(headers, values)))
|
||||
index += 1
|
||||
|
||||
return rows, index
|
||||
|
||||
|
||||
def _parse_inventory_file(path: Path) -> dict[str, Any]:
|
||||
lines = _read_text(path, max_chars=120000).splitlines()
|
||||
parsed: dict[str, Any] = {"file": path.relative_to(REPO_ROOT).as_posix(), "sections": {}}
|
||||
current_section = "root"
|
||||
parsed["sections"][current_section] = {"tables": [], "paragraphs": []}
|
||||
|
||||
index = 0
|
||||
while index < len(lines):
|
||||
line = lines[index].rstrip()
|
||||
if line.startswith("#"):
|
||||
current_section = line.lstrip("#").strip()
|
||||
parsed["sections"].setdefault(current_section, {"tables": [], "paragraphs": []})
|
||||
index += 1
|
||||
continue
|
||||
|
||||
if line.strip().startswith("|"):
|
||||
rows, next_index = _parse_markdown_table(lines, index)
|
||||
if rows:
|
||||
parsed["sections"][current_section]["tables"].append(rows)
|
||||
index = next_index
|
||||
continue
|
||||
|
||||
if line.strip():
|
||||
parsed["sections"][current_section]["paragraphs"].append(line.strip())
|
||||
index += 1
|
||||
|
||||
return parsed
|
||||
|
||||
|
||||
def _iter_inventory_files() -> list[Path]:
|
||||
return sorted(VPS_ROOT.rglob("services_inventory.md")) + sorted(K3S_ROOT.rglob("services_inventory.md"))
|
||||
|
||||
|
||||
def _match_service(query: str, row: dict[str, str]) -> bool:
|
||||
haystack = " ".join(str(value) for value in row.values()).lower()
|
||||
return query.lower() in haystack
|
||||
|
||||
|
||||
def _safe_doc_path(relative_path: str) -> Path:
|
||||
relative = Path(relative_path)
|
||||
candidate = _ensure_in_repo(REPO_ROOT / relative)
|
||||
for allowed in DOC_ALLOWLIST:
|
||||
allowed_resolved = allowed.resolve()
|
||||
if candidate == allowed_resolved or allowed_resolved in candidate.parents:
|
||||
return candidate
|
||||
raise ValueError(f"path not allowed: {relative_path}")
|
||||
|
||||
|
||||
def _ensure_read_only_sql(sql: str) -> str:
|
||||
normalized = sql.strip().lstrip("(").strip().lower()
|
||||
if not normalized.startswith("select"):
|
||||
raise ValueError("only SELECT queries are allowed")
|
||||
forbidden = ("insert ", "update ", "delete ", "drop ", "alter ", "truncate ", "create ", "grant ", "revoke ")
|
||||
if any(token in normalized for token in forbidden):
|
||||
raise ValueError("query contains non-read-only statements")
|
||||
return sql
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
description="List scripts available in scripts/auto-organized, including whether each one is safe for read-only execution.",
|
||||
)
|
||||
def list_repo_scripts(name_filter: str | None = None) -> list[dict[str, Any]]:
|
||||
scripts = _list_scripts()
|
||||
results = []
|
||||
for path in sorted(scripts_dir.rglob("*")):
|
||||
if not path.is_file(): continue
|
||||
results.append({
|
||||
"name": path.name,
|
||||
"relative_path": path.relative_to(root).as_posix(),
|
||||
"read_only": path.name.lower().startswith(READ_ONLY_SCRIPT_PREFIXES),
|
||||
"kind": "shell" if path.suffix == ".sh" else "powershell" if path.suffix == ".ps1" else "other"
|
||||
})
|
||||
if name_filter:
|
||||
scripts = [item for item in scripts if name_filter.lower() in item.relative_path.lower()]
|
||||
|
||||
return [
|
||||
{
|
||||
"name": item.path.name,
|
||||
"relative_path": item.relative_path,
|
||||
"kind": item.kind,
|
||||
"read_only": item.is_read_only,
|
||||
}
|
||||
for item in scripts
|
||||
]
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
description="Run an existing repo script from scripts/auto-organized. Only read-only diagnostic scripts are executable.",
|
||||
)
|
||||
def run_repo_script(script_name: str, args: list[str] | None = None, timeout_seconds: int = 60) -> dict[str, Any]:
|
||||
script = _resolve_script(script_name)
|
||||
if not script.is_read_only:
|
||||
raise ValueError(
|
||||
f"script '{script.relative_path}' is not classified as read-only and cannot be executed by this tool"
|
||||
)
|
||||
|
||||
args = args or []
|
||||
if script.kind == "powershell":
|
||||
command = [
|
||||
"powershell",
|
||||
"-NoProfile",
|
||||
"-ExecutionPolicy",
|
||||
"Bypass",
|
||||
"-File",
|
||||
str(script.path),
|
||||
*args,
|
||||
]
|
||||
elif script.kind == "shell":
|
||||
command = ["bash", str(script.path), *args]
|
||||
else:
|
||||
raise ValueError(f"unsupported script type: {script.kind}")
|
||||
|
||||
completed = subprocess.run(
|
||||
command,
|
||||
cwd=REPO_ROOT,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=timeout_seconds,
|
||||
check=False,
|
||||
)
|
||||
return {
|
||||
"script": script.relative_path,
|
||||
"exit_code": completed.returncode,
|
||||
"stdout": completed.stdout[-12000:],
|
||||
"stderr": completed.stderr[-12000:],
|
||||
}
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
description="Parse one services_inventory.md file into structured JSON. Server examples: redbull, vim, nc1, k3s.",
|
||||
)
|
||||
def read_services_inventory(server: str) -> dict[str, Any]:
|
||||
inventory_files = {path.parent.name.lower(): path for path in _iter_inventory_files()}
|
||||
server_key = server.lower().strip()
|
||||
if server_key not in inventory_files:
|
||||
raise ValueError(f"inventory not found for '{server}'. Available: {', '.join(sorted(inventory_files))}")
|
||||
return _parse_inventory_file(inventory_files[server_key])
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
description="Search all inventory files for an app, UUID, domain, server, or other service text.",
|
||||
)
|
||||
def find_service(query: str) -> list[dict[str, Any]]:
|
||||
matches: list[dict[str, Any]] = []
|
||||
for inventory_path in _iter_inventory_files():
|
||||
parsed = _parse_inventory_file(inventory_path)
|
||||
for section_name, section in parsed["sections"].items():
|
||||
for table in section["tables"]:
|
||||
for row in table:
|
||||
if _match_service(query, row):
|
||||
matches.append(
|
||||
{
|
||||
"inventory": parsed["file"],
|
||||
"section": section_name,
|
||||
"row": row,
|
||||
}
|
||||
)
|
||||
return matches
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
description="List Podman/Systemd unit files under containers/ and optionally filter by app name.",
|
||||
)
|
||||
def list_container_units(name_filter: str | None = None) -> list[dict[str, str]]:
|
||||
results: list[dict[str, str]] = []
|
||||
for path in sorted(CONTAINERS_ROOT.iterdir()):
|
||||
if not path.is_file():
|
||||
continue
|
||||
if path.suffix not in {".container", ".service"}:
|
||||
continue
|
||||
relative = path.relative_to(REPO_ROOT).as_posix()
|
||||
if name_filter and name_filter.lower() not in relative.lower():
|
||||
continue
|
||||
results.append({"name": path.name, "relative_path": relative, "kind": path.suffix.lstrip(".")})
|
||||
results = [s for s in results if name_filter.lower() in s["relative_path"].lower()]
|
||||
return results
|
||||
|
||||
@mcp.tool()
|
||||
def grep_project(query: str, project: str | None = None, glob: str | None = None) -> dict[str, Any]:
|
||||
"""Search the specified project for infrastructure or code terms."""
|
||||
root = _get_project_root(project)
|
||||
command = ["rg", "-n", query, str(root)]
|
||||
if glob: command.extend(["-g", glob])
|
||||
|
||||
@mcp.tool(
|
||||
description="Read a container unit file from containers/ for Podman/Systemd runtime analysis.",
|
||||
)
|
||||
def read_container_unit(name: str) -> dict[str, str]:
|
||||
candidates = [
|
||||
path
|
||||
for path in CONTAINERS_ROOT.iterdir()
|
||||
if path.is_file() and path.suffix in {".container", ".service"} and (path.name == name or name.lower() in path.name.lower())
|
||||
]
|
||||
if not candidates:
|
||||
raise ValueError(f"container unit not found: {name}")
|
||||
if len(candidates) > 1:
|
||||
names = ", ".join(path.name for path in candidates)
|
||||
raise ValueError(f"multiple container units matched '{name}': {names}")
|
||||
|
||||
path = candidates[0]
|
||||
return {
|
||||
"relative_path": path.relative_to(REPO_ROOT).as_posix(),
|
||||
"content": _read_text(path, max_chars=16000),
|
||||
}
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
description="Read a repo document from README, vps, databases, k3s, or containers paths.",
|
||||
)
|
||||
def read_repo_document(relative_path: str, max_chars: int = 12000) -> dict[str, str]:
|
||||
path = _safe_doc_path(relative_path)
|
||||
return {
|
||||
"relative_path": path.relative_to(REPO_ROOT).as_posix(),
|
||||
"content": _read_text(path, max_chars=max_chars),
|
||||
}
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
description="Search the repo for infrastructure terms such as app names, domains, env keys, or container names.",
|
||||
)
|
||||
def grep_repo(query: str, glob: str | None = None) -> dict[str, Any]:
|
||||
command = ["rg", "-n", query, str(REPO_ROOT)]
|
||||
if glob:
|
||||
command.extend(["-g", glob])
|
||||
|
||||
completed = subprocess.run(
|
||||
command,
|
||||
cwd=REPO_ROOT,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
check=False,
|
||||
)
|
||||
completed = subprocess.run(command, capture_output=True, text=True, timeout=30)
|
||||
results = completed.stdout.splitlines()
|
||||
return {
|
||||
"exit_code": completed.returncode,
|
||||
"project": project or DEFAULT_PROJECT,
|
||||
"matches": results[:200],
|
||||
"truncated": len(results) > 200,
|
||||
"stderr": completed.stderr[-4000:],
|
||||
}
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
description="Return a compact summary of the actual infracloud repo layout so agents do not assume missing folders like dev-scripts or docs/openproject.",
|
||||
)
|
||||
def repo_layout_summary() -> dict[str, Any]:
|
||||
@mcp.tool()
|
||||
def repo_layout_summary(project: str | None = None) -> dict[str, Any]:
|
||||
"""Return a compact summary of the specified project layout."""
|
||||
root = _get_project_root(project)
|
||||
return {
|
||||
"repo_root": str(REPO_ROOT),
|
||||
"present_top_level_dirs": sorted(path.name for path in REPO_ROOT.iterdir() if path.is_dir()),
|
||||
"scripts_root": SCRIPTS_ROOT.relative_to(REPO_ROOT).as_posix() if SCRIPTS_ROOT.exists() else None,
|
||||
"inventory_files": [path.relative_to(REPO_ROOT).as_posix() for path in _iter_inventory_files()],
|
||||
"container_units": [path.name for path in CONTAINERS_ROOT.iterdir() if path.is_file() and path.suffix in {".container", ".service"}],
|
||||
"notes": [
|
||||
"The repo uses scripts/auto-organized instead of dev-scripts.",
|
||||
"The repo does not currently include docs/openproject.",
|
||||
"AGENT.md contains secrets and should not be used as a runtime configuration source.",
|
||||
f"Optional Postgres integration is enabled through {POSTGRES_DSN_ENV}.",
|
||||
],
|
||||
"project": project or DEFAULT_PROJECT,
|
||||
"root": str(root),
|
||||
"dirs": sorted(path.name for path in root.iterdir() if path.is_dir() and not path.name.startswith(".")),
|
||||
"important_files": [f.name for f in root.glob("*.md")]
|
||||
}
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
description="Return the configured Postgres status for the MCP server and basic connectivity details.",
|
||||
)
|
||||
def postgres_healthcheck() -> dict[str, Any]:
|
||||
dsn = _postgres_dsn()
|
||||
if not dsn:
|
||||
return {"configured": False, "env_var": POSTGRES_DSN_ENV}
|
||||
|
||||
with _get_pg_connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SELECT current_database() AS database, current_user AS user, version() AS version")
|
||||
row = cur.fetchone()
|
||||
@mcp.tool()
|
||||
def read_project_file(relative_path: str, project: str | None = None) -> dict[str, str]:
|
||||
"""Read a specific file from the selected project."""
|
||||
root = _get_project_root(project)
|
||||
path = _ensure_in_project(root / relative_path, project)
|
||||
return {
|
||||
"configured": True,
|
||||
"env_var": POSTGRES_DSN_ENV,
|
||||
"database": row["database"],
|
||||
"user": row["user"],
|
||||
"version": row["version"],
|
||||
"project": project or DEFAULT_PROJECT,
|
||||
"content": path.read_text(encoding="utf-8", errors="replace")[:20000]
|
||||
}
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
description="Execute a read-only SELECT query against the MCP Postgres database.",
|
||||
)
|
||||
def postgres_query(sql: str, limit: int = 100) -> dict[str, Any]:
|
||||
if limit < 1 or limit > 500:
|
||||
raise ValueError("limit must be between 1 and 500")
|
||||
|
||||
safe_sql = _ensure_read_only_sql(sql)
|
||||
wrapped = f"SELECT * FROM ({safe_sql.rstrip().rstrip(';')}) AS q LIMIT {limit}"
|
||||
|
||||
with _get_pg_connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(wrapped)
|
||||
rows = cur.fetchall()
|
||||
return {
|
||||
"row_count": len(rows),
|
||||
"rows": rows,
|
||||
}
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
description="Store a short operational note in the MCP Postgres database for future support sessions.",
|
||||
)
|
||||
def add_operational_note(scope: str, title: str, body: str) -> dict[str, Any]:
|
||||
if not scope.strip() or not title.strip() or not body.strip():
|
||||
raise ValueError("scope, title, and body are required")
|
||||
|
||||
@mcp.tool()
|
||||
def add_project_note(title: str, body: str, project: str | None = None, scope: str = "general") -> dict[str, Any]:
|
||||
"""Store an operational note for a specific project in the database."""
|
||||
_ensure_mcp_tables()
|
||||
p_name = project or DEFAULT_PROJECT
|
||||
with _get_pg_connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"""
|
||||
INSERT INTO infra_mcp_notes (scope, title, body)
|
||||
VALUES (%s, %s, %s)
|
||||
RETURNING id, scope, title, body, created_at
|
||||
""",
|
||||
(scope.strip(), title.strip(), body.strip()),
|
||||
)
|
||||
cur.execute("""
|
||||
INSERT INTO infra_mcp_notes (scope, project, title, body)
|
||||
VALUES (%s, %s, %s, %s)
|
||||
RETURNING id, project, title, body, created_at
|
||||
""", (scope, p_name, title, body))
|
||||
row = cur.fetchone()
|
||||
conn.commit()
|
||||
return row
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
description="List recent operational notes stored in the MCP Postgres database.",
|
||||
)
|
||||
def list_operational_notes(scope: str | None = None, limit: int = 20) -> list[dict[str, Any]]:
|
||||
if limit < 1 or limit > 200:
|
||||
raise ValueError("limit must be between 1 and 200")
|
||||
|
||||
_ensure_mcp_tables()
|
||||
query = """
|
||||
SELECT id, scope, title, body, created_at
|
||||
FROM infra_mcp_notes
|
||||
"""
|
||||
params: list[Any] = []
|
||||
if scope and scope.strip():
|
||||
query += " WHERE scope = %s"
|
||||
params.append(scope.strip())
|
||||
query += " ORDER BY created_at DESC LIMIT %s"
|
||||
params.append(limit)
|
||||
|
||||
with _get_pg_connection() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(query, params)
|
||||
return cur.fetchall()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
_ensure_mcp_tables()
|
||||
mcp.run(transport=os.getenv(TRANSPORT_ENV, "stdio"))
|
||||
|
|
|
|||
Loading…
Reference in a new issue