193 lines
8.5 KiB
Python
193 lines
8.5 KiB
Python
#!/usr/bin/env python3
|
|
"""Verifica todas as conexoes da infraestrutura Rede5/Inventcloud"""
|
|
|
|
import subprocess
|
|
import json
|
|
import os
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
SSH_DIR = Path.home() / ".ssh"
|
|
RESULTS = {"date": datetime.now().isoformat(), "connections": {}, "errors": []}
|
|
|
|
def run_cmd(cmd, timeout=15):
|
|
try:
|
|
result = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=timeout)
|
|
return result.returncode == 0, result.stdout.strip(), result.stderr.strip()
|
|
except subprocess.TimeoutExpired:
|
|
return False, "", "Timeout"
|
|
except Exception as e:
|
|
return False, "", str(e)
|
|
|
|
def check_ssh_host(host, ip, user="root", key="civo"):
|
|
key_path = SSH_DIR / key
|
|
if not key_path.exists():
|
|
return {"status": "ERROR", "error": f"Key {key} not found"}
|
|
ok, out, err = run_cmd(f'ssh -i "{key_path}" -o ConnectTimeout=5 -o BatchMode=yes {user}@{ip} "echo OK" 2>&1')
|
|
return {"status": "OK" if ok and "OK" in out else "ERROR", "output": out or err}
|
|
|
|
def check_ssh_password(host):
|
|
"""Verifica SSH com senha (apenas conectividade)"""
|
|
ok, out, err = run_cmd(f'echo "test" | ssh -o ConnectTimeout=5 -o BatchMode=yes -o PreferredAuthentications=none {host} 2>&1')
|
|
# Se receber "Permission denied" significa que o host está acessível
|
|
return {"status": "OK" if "denied" in out.lower() or "permission" in out.lower() else "ERROR", "output": "Host acessivel (requer senha)"}
|
|
|
|
def check_ssh_simple(host):
|
|
ok, out, err = run_cmd(f'ssh -o ConnectTimeout=5 -o BatchMode=yes {host} "echo OK" 2>&1')
|
|
return {"status": "OK" if ok and "OK" in out else "ERROR", "output": out or err}
|
|
|
|
def check_api(url, headers=None, token_file=None):
|
|
headers = headers or []
|
|
if token_file:
|
|
token_path = SSH_DIR / token_file
|
|
if token_path.exists():
|
|
token = token_path.read_text().strip()
|
|
if ":" in token:
|
|
headers = [f"Authorization: Token {token}"]
|
|
else:
|
|
headers = [f"Authorization: Bearer {token}"]
|
|
hdr = " ".join([f'-H "{h}"' for h in headers])
|
|
ok, out, err = run_cmd(f'curl -s -o /dev/null -w "%{{http_code}}" {hdr} "{url}"')
|
|
code = out if out.isdigit() else "000"
|
|
return {"status": "OK" if code in ["200", "302"] else "ERROR", "http_code": code}
|
|
|
|
def check_ssh_git(host):
|
|
ok, out, err = run_cmd(f'ssh -T git@{host} 2>&1', timeout=10)
|
|
authenticated = "authenticated" in out.lower() or "success" in out.lower()
|
|
return {"status": "OK" if authenticated else "ERROR", "output": out[:200]}
|
|
|
|
def main():
|
|
print("=== Verificando conexoes ===\n")
|
|
|
|
# VPS
|
|
print("VPS...")
|
|
RESULTS["connections"]["vps"] = {
|
|
"redbull": check_ssh_host("redbull", "185.194.141.70"),
|
|
"echo": check_ssh_host("echo", "152.53.120.181"),
|
|
"nc2": check_ssh_host("nc2", "212.56.41.211"),
|
|
"absam-io": check_ssh_password("absam-io"),
|
|
}
|
|
|
|
# Git Providers
|
|
print("Git Providers...")
|
|
RESULTS["connections"]["git"] = {
|
|
"github": check_ssh_git("github.com"),
|
|
"bitbucket": {"status": "OK", "output": "Configured in ~/.ssh/config"},
|
|
}
|
|
|
|
# APIs
|
|
print("APIs...")
|
|
RESULTS["connections"]["api"] = {
|
|
"coolify": check_api("https://redbull.rede5.com.br/api/v1/applications", token_file="coolify-redbull-token"),
|
|
"forgejo": check_api("https://pipe.gohorsejobs.com/api/v1/user", token_file="forgejo-token"),
|
|
"github": check_api("https://api.github.com/user", token_file="github-token"),
|
|
"bookstack": check_api("https://docs.rede5.com.br/api/books", token_file="bookstack-token"),
|
|
}
|
|
|
|
# Cloudflare
|
|
print("Cloudflare...")
|
|
cf_token = (SSH_DIR / "cloudflare-token").read_text().strip() if (SSH_DIR / "cloudflare-token").exists() else ""
|
|
cf_inv = (SSH_DIR / "cloudflare-token-inventcloud").read_text().strip() if (SSH_DIR / "cloudflare-token-inventcloud").exists() else ""
|
|
|
|
RESULTS["connections"]["cloudflare"] = {}
|
|
|
|
if cf_token:
|
|
ok, out, err = run_cmd(f'curl -s -H "X-Auth-Email: yamamoto@rede5.com.br" -H "X-Auth-Key: {cf_token}" "https://api.cloudflare.com/client/v4/zones"')
|
|
if ok:
|
|
try:
|
|
data = json.loads(out)
|
|
zones = len(data.get("result", []))
|
|
RESULTS["connections"]["cloudflare"]["rede5"] = {"status": "OK", "zones": zones}
|
|
except:
|
|
RESULTS["connections"]["cloudflare"]["rede5"] = {"status": "ERROR", "error": "Parse error"}
|
|
else:
|
|
RESULTS["connections"]["cloudflare"]["rede5"] = {"status": "ERROR"}
|
|
|
|
if cf_inv:
|
|
lines = cf_inv.split("\n")
|
|
token, email = lines[0], lines[1] if len(lines) > 1 else ""
|
|
ok, out, err = run_cmd(f'curl -s -H "X-Auth-Email: {email}" -H "X-Auth-Key: {token}" "https://api.cloudflare.com/client/v4/zones"')
|
|
if ok:
|
|
try:
|
|
data = json.loads(out)
|
|
zones = len(data.get("result", []))
|
|
RESULTS["connections"]["cloudflare"]["inventcloud"] = {"status": "OK", "zones": zones}
|
|
except:
|
|
RESULTS["connections"]["cloudflare"]["inventcloud"] = {"status": "ERROR", "error": "Parse error"}
|
|
else:
|
|
RESULTS["connections"]["cloudflare"]["inventcloud"] = {"status": "ERROR"}
|
|
|
|
# MXRoute
|
|
print("MXRoute...")
|
|
mx_key = (SSH_DIR / "mxroute-api-key").read_text().strip() if (SSH_DIR / "mxroute-api-key").exists() else ""
|
|
if mx_key:
|
|
api_key = mx_key.split(": ")[1] if ": " in mx_key else mx_key
|
|
ok, out, err = run_cmd(f'curl -s -o /dev/null -w "%{{http_code}}" -H "X-Server: everest.mxrouting.net" -H "X-Username: net5cloud" -H "X-API-Key: {api_key}" "https://api.mxroute.com/domains"')
|
|
RESULTS["connections"]["mxroute"] = {"status": "OK" if out == "200" else "ERROR", "http_code": out}
|
|
|
|
# OCI
|
|
print("OCI...")
|
|
ok, out, err = run_cmd("oci os ns get 2>&1")
|
|
RESULTS["connections"]["oci"] = {"status": "OK" if ok else "ERROR", "namespace": out}
|
|
|
|
# Kubernetes
|
|
print("Kubernetes...")
|
|
ok, out, err = run_cmd("kubectl cluster-info 2>&1 | head -1")
|
|
RESULTS["connections"]["kubernetes"] = {"status": "OK" if ok else "ERROR", "cluster": out[:100]}
|
|
|
|
# Object Storage
|
|
print("Object Storage...")
|
|
RESULTS["connections"]["object_storage"] = {}
|
|
|
|
# Verifica via boto3 se disponível
|
|
try:
|
|
import boto3
|
|
from botocore.config import Config
|
|
config = Config(s3={'addressing_style': 'path'})
|
|
|
|
# Civo
|
|
try:
|
|
s3_civo = boto3.client('s3',
|
|
endpoint_url='https://objectstore.nyc1.civo.com',
|
|
aws_access_key_id='0UZ69TH03Q292DMTB82B',
|
|
aws_secret_access_key='JJ5XXZYvoWdnqBCNP5oREjACyrXeH6EgSqeSybT7',
|
|
config=config)
|
|
s3_civo.list_objects_v2(Bucket='rede5', MaxKeys=1)
|
|
RESULTS["connections"]["object_storage"]["civo"] = {"status": "OK"}
|
|
except:
|
|
RESULTS["connections"]["object_storage"]["civo"] = {"status": "ERROR"}
|
|
|
|
# Euronodes
|
|
try:
|
|
s3_euro = boto3.client('s3',
|
|
endpoint_url='https://eu-west-1.euronodes.com',
|
|
aws_access_key_id='XZNFA56V35MUY605XOUL',
|
|
aws_secret_access_key='FYATWkgSafaEMRQlFNdSQ6BoCSxG74MY9Cd7D8AF',
|
|
config=config)
|
|
s3_euro.list_objects_v2(Bucket='vault', MaxKeys=1)
|
|
RESULTS["connections"]["object_storage"]["euronodes"] = {"status": "OK"}
|
|
except:
|
|
RESULTS["connections"]["object_storage"]["euronodes"] = {"status": "ERROR"}
|
|
except ImportError:
|
|
RESULTS["connections"]["object_storage"]["civo"] = {"status": "SKIP", "error": "boto3 not installed"}
|
|
RESULTS["connections"]["object_storage"]["euronodes"] = {"status": "SKIP", "error": "boto3 not installed"}
|
|
|
|
# Summary
|
|
total = sum(len(v) if isinstance(v, dict) else 1 for v in RESULTS["connections"].values())
|
|
ok_count = sum(1 for cat in RESULTS["connections"].values() for k, v in (cat.items() if isinstance(cat, dict) else []) if isinstance(v, dict) and v.get("status") == "OK")
|
|
|
|
RESULTS["summary"] = {"total": total, "ok": ok_count, "errors": total - ok_count}
|
|
|
|
# Output
|
|
output_file = Path(__file__).parent / "connection-status.json"
|
|
output_file.write_text(json.dumps(RESULTS, indent=2))
|
|
|
|
print(f"\n=== RESUMO ===")
|
|
print(f"Total: {total} | OK: {ok_count} | Erros: {total - ok_count}")
|
|
print(f"Salvo em: {output_file}")
|
|
|
|
return 0 if ok_count == total else 1
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|