Files
RoleForge/app/deps/auth.py
Sergey Antropoff b2d3b6b803 Профиль и аккаунт
- API и страницы профиля (редактирование, смена пароля, аватар), публичные карточки.
- Сайдбар: блок пользователя, пункт Users для admin/root, исправлен порядок
  инициализации (показ admin-only после initAuthSession, currentUser).
- GET /auth/me: ответ через ProfileMeResponse, исправлена валидация (is_founder bool).

Команды и роли
- Маршруты и UI команд; при редактировании роли: видимость Team, выбор команды
  в модалке, только команды с активным членством; API team_id в details/ update.
- GET /api/v1/teams?membership=active для списка «своих» команд.
- Форма роли: сегмент Team, панель выбора команды только при Team и не при
  с
2026-05-05 08:15:21 +03:00

64 lines
2.7 KiB
Python

from datetime import datetime, timezone
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
from app.core.config import get_settings
from app.db.pool import get_pool
from app.services.account_status import clear_expired_temporary_ban
bearer = HTTPBearer(auto_error=True)
async def get_current_user_id(
credentials: HTTPAuthorizationCredentials = Depends(bearer),
) -> str:
settings = get_settings()
token = credentials.credentials
try:
payload = jwt.decode(token, settings.app_secret_key, algorithms=["HS256"])
except JWTError as exc:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") from exc
if payload.get("type") != "access" or not payload.get("sub"):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid access token")
user_id = str(payload["sub"])
pool = get_pool()
async with pool.acquire() as conn:
await clear_expired_temporary_ban(conn, user_id)
row = await conn.fetchrow(
"""
select coalesce(is_active, true) as is_active, deleted_at, ban_until
from users where id = $1::uuid
""",
user_id,
)
if not row:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
if row["deleted_at"] is not None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Account removed from directory")
if row["ban_until"] is not None and row["ban_until"] > datetime.now(tz=timezone.utc):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Account temporarily suspended")
if not row["is_active"]:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Account disabled")
return user_id
async def get_current_admin_user_id(user_id: str = Depends(get_current_user_id)) -> str:
pool = get_pool()
async with pool.acquire() as conn:
role = await conn.fetchval("select role from users where id=$1::uuid", user_id)
if role not in ("admin", "root"):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required")
return user_id
async def get_current_root_user_id(user_id: str = Depends(get_current_user_id)) -> str:
"""Chief administrator (group root): temporary bans, role assignment, etc."""
pool = get_pool()
async with pool.acquire() as conn:
role = await conn.fetchval("select role from users where id=$1::uuid", user_id)
if role != "root":
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Chief administrator (root) access required")
return user_id