saveinmed/saveinmed-bff/src/modules/usuario_empresa_perfis/service.py
Tiago Yamamoto b39caf0fd0 first commit
2025-12-17 13:58:26 -03:00

280 lines
11 KiB
Python

from __future__ import annotations
import asyncio
from typing import Any, Dict, List, Optional
from appwrite.exception import AppwriteException
from appwrite.query import Query
from appwrite.services.databases import Databases
from fastapi import HTTPException
from .schemas import UsuarioEmpresaPerfilCreate, UsuarioEmpresaPerfilOut, UsuarioEmpresaPerfilUpdate
def _extract_rel_id(value: Any) -> Optional[str]:
if value is None:
return None
if isinstance(value, str):
return value
if isinstance(value, dict):
return value.get("$id") or value.get("id") or value.get("_id")
if isinstance(value, list) and value:
first = value[0]
if isinstance(first, dict):
return first.get("$id") or first.get("id") or first.get("_id")
if isinstance(first, str):
return first
return None
_ALLOWED_SORT_FIELDS = {"usuarios", "empresas-dados", "usuario-perfis", "$createdAt"}
_ALLOWED_ORDER_VALUES = {"asc", "desc"}
_DEFAULT_LIMIT = 20
class UsuarioEmpresaPerfisService:
def __init__(
self,
*,
databases: Databases,
database_id: str,
collection_id: str,
usuarios_collection_id: str,
empresas_collection_id: str,
perfis_collection_id: str,
) -> None:
self._db = databases
self._db_id = database_id
self._col = collection_id
self._usuarios_col = usuarios_collection_id
self._empresas_col = empresas_collection_id
self._perfis_col = perfis_collection_id
async def _io(self, fn: Any, *args: Any, **kwargs: Any) -> Any:
return await asyncio.to_thread(fn, *args, **kwargs)
async def list(
self,
*,
page: int,
limit: int | None,
q: str | None,
sort: str | None,
order: str | None,
empresa_id: str | None,
) -> Dict[str, Any]:
limit_v = limit or _DEFAULT_LIMIT
off = (page - 1) * limit_v
queries: List[Any] = [Query.limit(limit_v), Query.offset(off)]
if empresa_id:
queries.append(Query.equal("empresas-dados", empresa_id))
if sort:
if sort not in _ALLOWED_SORT_FIELDS:
raise HTTPException(status_code=400, detail="Campo de ordenação inválido.")
if order and order not in _ALLOWED_ORDER_VALUES:
raise HTTPException(status_code=400, detail="Ordem inválida.")
queries.append(Query.order_desc(sort) if order == "desc" else Query.order_asc(sort))
if q and q.strip():
term = q.strip()
queries.append(
Query.or_(
[
Query.search("usuarios", term),
Query.search("empresas-dados", term),
Query.search("usuario-perfis", term),
]
)
)
try:
res = await self._io(self._db.list_documents, self._db_id, self._col, queries)
except AppwriteException as e:
raise HTTPException(status_code=e.code or 500, detail=e.message) from e
docs = res.get("documents", [])
cache: Dict[str, Dict[str, Dict[str, Any]]] = {
"usuarios": {},
"empresas-dados": {},
"usuario-perfis": {},
}
items: List[Dict[str, Any]] = []
for doc in docs:
mapped = await self._map_with_relations(doc, cache)
items.append(UsuarioEmpresaPerfilOut.model_validate(mapped).model_dump())
return {"items": items, "total": res.get("total", len(items))}
async def get(self, vinculo_id: str, *, empresa_id: str | None = None) -> UsuarioEmpresaPerfilOut:
try:
doc = await self._io(self._db.get_document, self._db_id, self._col, vinculo_id)
except AppwriteException as e:
if e.code == 404:
raise HTTPException(status_code=404, detail="Vínculo não encontrado.") from e
raise HTTPException(status_code=e.code or 500, detail=e.message) from e
vinculo_empresa_id = _extract_rel_id(doc.get("empresas-dados"))
if empresa_id and vinculo_empresa_id != empresa_id:
raise HTTPException(status_code=403, detail="Vínculo fora do escopo da empresa.")
cache = {"usuarios": {}, "empresas-dados": {}, "usuario-perfis": {}}
mapped = await self._map_with_relations(doc, cache)
return UsuarioEmpresaPerfilOut.model_validate(mapped)
async def create(
self, payload: UsuarioEmpresaPerfilCreate, *, empresa_id: str | None = None
) -> UsuarioEmpresaPerfilOut:
data = payload.model_dump(by_alias=True, exclude_unset=True)
if empresa_id and data.get("empresas-dados") != empresa_id:
raise HTTPException(status_code=403, detail="Empresa do vínculo difere do escopo informado.")
await self._ensure_references_exist(data)
try:
doc = await self._io(self._db.create_document, self._db_id, self._col, "unique()", data)
except AppwriteException as e:
raise HTTPException(status_code=e.code or 500, detail=e.message) from e
cache = {"usuarios": {}, "empresas-dados": {}, "usuario-perfis": {}}
mapped = await self._map_with_relations(doc, cache)
return UsuarioEmpresaPerfilOut.model_validate(mapped)
async def update(
self,
vinculo_id: str,
payload: UsuarioEmpresaPerfilUpdate,
*,
empresa_id: str | None = None,
) -> UsuarioEmpresaPerfilOut:
updates = payload.model_dump(by_alias=True, exclude_unset=True)
if updates:
await self._ensure_references_exist(updates)
try:
existing = await self._io(self._db.get_document, self._db_id, self._col, vinculo_id)
except AppwriteException as e:
if e.code == 404:
raise HTTPException(status_code=404, detail="Vínculo não encontrado.") from e
raise HTTPException(status_code=e.code or 500, detail=e.message) from e
existing_empresa_id = _extract_rel_id(existing.get("empresas-dados"))
if empresa_id and existing_empresa_id != empresa_id:
raise HTTPException(status_code=403, detail="Vínculo fora do escopo da empresa.")
if empresa_id and updates.get("empresas-dados") not in (None, empresa_id):
raise HTTPException(
status_code=403,
detail="Não é permitido alterar a empresa do vínculo para outro escopo.",
)
try:
doc = await self._io(
self._db.update_document,
self._db_id,
self._col,
vinculo_id,
updates,
)
except AppwriteException as e:
if e.code == 404:
raise HTTPException(status_code=404, detail="Vínculo não encontrado.") from e
raise HTTPException(status_code=e.code or 500, detail=e.message) from e
cache = {"usuarios": {}, "empresas-dados": {}, "usuario-perfis": {}}
mapped = await self._map_with_relations(doc, cache)
return UsuarioEmpresaPerfilOut.model_validate(mapped)
async def delete(self, vinculo_id: str, *, empresa_id: str | None = None) -> None:
try:
doc = await self._io(self._db.get_document, self._db_id, self._col, vinculo_id)
except AppwriteException as e:
if e.code == 404:
raise HTTPException(status_code=404, detail="Vínculo não encontrado.") from e
raise HTTPException(status_code=e.code or 500, detail=e.message) from e
if empresa_id and _extract_rel_id(doc.get("empresas-dados")) != empresa_id:
raise HTTPException(status_code=403, detail="Vínculo fora do escopo da empresa.")
try:
await self._io(self._db.delete_document, self._db_id, self._col, vinculo_id)
except AppwriteException as e:
if e.code == 404:
raise HTTPException(status_code=404, detail="Vínculo não encontrado.") from e
raise HTTPException(status_code=e.code or 500, detail=e.message) from e
async def _ensure_references_exist(self, data: Dict[str, Any]) -> None:
checks = [
("usuarios", self._usuarios_col, "Usuário não encontrado."),
("empresas-dados", self._empresas_col, "Empresa não encontrada."),
("usuario-perfis", self._perfis_col, "Perfil de usuário não encontrado."),
]
for field, collection, message in checks:
value = data.get(field)
if value is None:
continue
await self._ensure_document_exists(collection, value, message)
async def _ensure_document_exists(self, collection_id: str, document_id: str, message: str) -> None:
try:
await self._io(self._db.get_document, self._db_id, collection_id, document_id)
except AppwriteException as e:
if e.code == 404:
raise HTTPException(status_code=404, detail=message) from e
raise HTTPException(status_code=e.code or 500, detail=e.message) from e
async def _map_with_relations(
self,
doc: Dict[str, Any],
cache: Dict[str, Dict[str, Dict[str, Any]]],
) -> Dict[str, Any]:
usuario_rel = await self._fetch_related(
cache["usuarios"],
self._usuarios_col,
doc.get("usuarios"),
("identificador", "nome", "nome-completo"),
)
empresa_rel = await self._fetch_related(
cache["empresas-dados"],
self._empresas_col,
doc.get("empresas-dados"),
("nome-fantasia", "razao-social", "cnpj"),
)
perfil_rel = await self._fetch_related(
cache["usuario-perfis"],
self._perfis_col,
doc.get("usuario-perfis"),
("nome", "codigo"),
)
return {
"id": doc.get("$id"),
"usuarioId": _extract_rel_id(doc.get("usuarios")) or "",
"empresaId": _extract_rel_id(doc.get("empresas-dados")) or "",
"perfilId": _extract_rel_id(doc.get("usuario-perfis")) or "",
"usuario": usuario_rel,
"empresa": empresa_rel,
"perfil": perfil_rel,
"createdAt": doc.get("$createdAt", ""),
"updatedAt": doc.get("$updatedAt", ""),
}
async def _fetch_related(
self,
cache: Dict[str, Dict[str, Any]],
collection_id: str,
document_id: str | None,
name_fields: tuple[str, ...],
) -> Dict[str, Any]:
if not document_id:
return {"id": "", "nome": None}
if document_id in cache:
return cache[document_id]
try:
doc = await self._io(self._db.get_document, self._db_id, collection_id, document_id)
except AppwriteException as e:
if e.code == 404:
entity = {"id": document_id, "nome": None}
else:
raise HTTPException(status_code=e.code or 500, detail=e.message) from e
else:
name_value = None
for field in name_fields:
value = doc.get(field)
if isinstance(value, str) and value.strip():
name_value = value
break
entity = {"id": doc.get("$id", document_id), "nome": name_value}
cache[document_id] = entity
return entity