#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ LogBoard+ - Аутентификация Автор: Сергей Антропов Сайт: https://devops.org.ru """ import os from datetime import datetime, timedelta from typing import Optional from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from passlib.context import CryptContext import jwt from core.config import ( SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES, ADMIN_USERNAME, ADMIN_PASSWORD ) # Инициализация безопасности security = HTTPBearer() pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # Функции для работы с паролями def verify_password(plain_password: str, hashed_password: str) -> bool: """Проверяет пароль""" return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: """Хеширует пароль""" return pwd_context.hash(password) # Функции для работы с JWT токенами def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): """Создает JWT токен""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def verify_token(token: str) -> Optional[str]: """Проверяет JWT токен и возвращает имя пользователя""" try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: return None return username except jwt.PyJWTError: return None # Функция для проверки аутентификации async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str: """Получает текущего пользователя из токена""" token = credentials.credentials username = verify_token(token) if username is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Недействительный токен. Требуется авторизация.", headers={"WWW-Authenticate": "Bearer"}, ) return username # Функция для проверки пользователя def authenticate_user(username: str, password: str) -> bool: """Аутентифицирует пользователя""" if username == ADMIN_USERNAME: # Для простоты используем прямое сравнение паролей # В продакшене рекомендуется использовать хешированные пароли return password == ADMIN_PASSWORD return False # Функция для проверки API ключей def verify_api_key(api_key: str) -> bool: """ Проверяет API ключ для удаленных клиентов Args: api_key: API ключ для проверки Returns: bool: True если ключ валидный, False в противном случае """ # Получаем список разрешенных API ключей из переменной окружения allowed_keys = os.getenv('LOGBOARD_API_KEYS', '').split(',') allowed_keys = [key.strip() for key in allowed_keys if key.strip()] # Если ключи не настроены, разрешаем только для разработки if not allowed_keys: # В продакшене это должно быть отключено return api_key == os.getenv('LOGBOARD_DEFAULT_API_KEY', 'dev-key-123') return api_key in allowed_keys