""" Безопасность и аутентификация Автор: Сергей Антропов Сайт: https://devops.org.ru """ from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt from app.core.config import settings # Ленивая инициализация CryptContext для избежания ошибок при импорте _pwd_context = None # Глобальный патч для bcrypt.hashpw, чтобы ограничить длину пароля _bcrypt_patched = False def _patch_bcrypt_hashpw(): """Патч для bcrypt.hashpw, чтобы ограничить длину пароля до 72 байт""" global _bcrypt_patched if _bcrypt_patched: return try: # Патчим bcrypt модуль напрямую import bcrypt original_hashpw = bcrypt.hashpw def safe_hashpw(secret, salt): """Обертка для hashpw с ограничением длины до 72 байт""" if isinstance(secret, str): secret_bytes = secret.encode('utf-8') else: secret_bytes = secret if len(secret_bytes) > 72: secret_bytes = secret_bytes[:72] return original_hashpw(secret_bytes, salt) # Заменяем hashpw глобально в модуле bcrypt bcrypt.hashpw = safe_hashpw # Также патчим _bcrypt, если он доступен try: import bcrypt._bcrypt as _bcrypt _bcrypt.hashpw = safe_hashpw except (ImportError, AttributeError): pass _bcrypt_patched = True except Exception: # Если не удалось патчить, продолжаем без патча pass # Применяем патч при импорте модуля _patch_bcrypt_hashpw() def _get_pwd_context(): """Получение CryptContext с ленивой инициализацией""" global _pwd_context if _pwd_context is None: try: import os import warnings # Отключаем предупреждения os.environ.setdefault('PASSLIB_SUPPRESS_USER_WARNING', '1') warnings.filterwarnings('ignore', category=UserWarning) # Патчим passlib.handlers.bcrypt._bcrypt перед инициализацией # Это необходимо, чтобы избежать ошибки при автоматической проверке try: # Импортируем модуль passlib.handlers.bcrypt import passlib.handlers.bcrypt as bcrypt_handler # Сохраняем оригинальную функцию hashpw if hasattr(bcrypt_handler, '_bcrypt') and hasattr(bcrypt_handler._bcrypt, 'hashpw'): original_hashpw = bcrypt_handler._bcrypt.hashpw def safe_hashpw(secret, salt): """Обертка для hashpw с ограничением длины до 72 байт""" if isinstance(secret, str): secret_bytes = secret.encode('utf-8') else: secret_bytes = secret if len(secret_bytes) > 72: secret_bytes = secret_bytes[:72] return original_hashpw(secret_bytes, salt) # Заменяем hashpw НАВСЕГДА (не восстанавливаем) bcrypt_handler._bcrypt.hashpw = safe_hashpw # Импортируем и инициализируем CryptContext from passlib.context import CryptContext _pwd_context = CryptContext( schemes=["bcrypt"], deprecated="auto", bcrypt__rounds=12 ) else: # Если не удалось найти _bcrypt, инициализируем без патча from passlib.context import CryptContext _pwd_context = CryptContext( schemes=["bcrypt"], deprecated="auto", bcrypt__rounds=12 ) except ImportError: # Если bcrypt не установлен, используем простой хеш raise ImportError("bcrypt не установлен") except Exception as e: # Если не удалось инициализировать bcrypt, используем простой хеш import hashlib import logging logger = logging.getLogger(__name__) logger.warning(f"Не удалось инициализировать bcrypt: {e}. Используется простой хеш.") class SimpleHash: def hash(self, password: str) -> str: # Ограничиваем длину для безопасности if len(password.encode('utf-8')) > 72: password = password[:72] return hashlib.sha256(password.encode()).hexdigest() def verify(self, plain: str, hashed: str) -> bool: if len(plain.encode('utf-8')) > 72: plain = plain[:72] return self.hash(plain) == hashed _pwd_context = SimpleHash() return _pwd_context def _truncate_password_bytes(password: str, max_bytes: int = 72) -> str: """Обрезает пароль до максимального количества байт""" password_bytes = password.encode('utf-8') if len(password_bytes) <= max_bytes: return password # Обрезаем до max_bytes байт truncated_bytes = password_bytes[:max_bytes] # Декодируем обратно в строку, игнорируя ошибки если обрезали посередине символа return truncated_bytes.decode('utf-8', errors='ignore') def verify_password(plain_password: str, hashed_password: str) -> bool: """Проверка пароля""" pwd_context = _get_pwd_context() # Ограничиваем длину пароля до 72 байт для bcrypt plain_password = _truncate_password_bytes(plain_password, 72) return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: """Хеширование пароля""" pwd_context = _get_pwd_context() # Ограничиваем длину пароля до 72 байт для bcrypt password = _truncate_password_bytes(password, 72) return pwd_context.hash(password) def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: """Создание JWT токена""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) return encoded_jwt def decode_access_token(token: str) -> Optional[dict]: """Декодирование JWT токена""" try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) return payload except JWTError: return None