- API и страницы профиля (редактирование, смена пароля, аватар), публичные карточки. - Сайдбар: блок пользователя, пункт Users для admin/root, исправлен порядок инициализации (показ admin-only после initAuthSession, currentUser). - GET /auth/me: ответ через ProfileMeResponse, исправлена валидация (is_founder bool). Команды и роли - Маршруты и UI команд; при редактировании роли: видимость Team, выбор команды в модалке, только команды с активным членством; API team_id в details/ update. - GET /api/v1/teams?membership=active для списка «своих» команд. - Форма роли: сегмент Team, панель выбора команды только при Team и не при с
64 lines
2.7 KiB
Python
64 lines
2.7 KiB
Python
from datetime import datetime, timezone
|
|
|
|
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
|
from jose import JWTError, jwt
|
|
|
|
from app.core.config import get_settings
|
|
from app.db.pool import get_pool
|
|
from app.services.account_status import clear_expired_temporary_ban
|
|
|
|
bearer = HTTPBearer(auto_error=True)
|
|
|
|
|
|
async def get_current_user_id(
|
|
credentials: HTTPAuthorizationCredentials = Depends(bearer),
|
|
) -> str:
|
|
settings = get_settings()
|
|
token = credentials.credentials
|
|
try:
|
|
payload = jwt.decode(token, settings.app_secret_key, algorithms=["HS256"])
|
|
except JWTError as exc:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") from exc
|
|
if payload.get("type") != "access" or not payload.get("sub"):
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid access token")
|
|
user_id = str(payload["sub"])
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
await clear_expired_temporary_ban(conn, user_id)
|
|
row = await conn.fetchrow(
|
|
"""
|
|
select coalesce(is_active, true) as is_active, deleted_at, ban_until
|
|
from users where id = $1::uuid
|
|
""",
|
|
user_id,
|
|
)
|
|
if not row:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
|
|
if row["deleted_at"] is not None:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Account removed from directory")
|
|
if row["ban_until"] is not None and row["ban_until"] > datetime.now(tz=timezone.utc):
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Account temporarily suspended")
|
|
if not row["is_active"]:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Account disabled")
|
|
return user_id
|
|
|
|
|
|
async def get_current_admin_user_id(user_id: str = Depends(get_current_user_id)) -> str:
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
role = await conn.fetchval("select role from users where id=$1::uuid", user_id)
|
|
if role not in ("admin", "root"):
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required")
|
|
return user_id
|
|
|
|
|
|
async def get_current_root_user_id(user_id: str = Depends(get_current_user_id)) -> str:
|
|
"""Chief administrator (group root): temporary bans, role assignment, etc."""
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
role = await conn.fetchval("select role from users where id=$1::uuid", user_id)
|
|
if role != "root":
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Chief administrator (root) access required")
|
|
return user_id
|