Files
DevOpsLab/app/auth/security.py
Сергей Антропов 1fbf9185a2 feat: добавлена пометка типа операции (Build/Push) в истории сборок Dockerfile
- Добавлена колонка 'Тип' во все таблицы истории сборок
- Для push операций отображается registry вместо платформ
- Сохранение пользователя при создании push лога
- Исправлена ошибка с logger в push_docker_image endpoint
- Улучшено отображение истории сборок с визуальными индикаторами
2026-02-15 22:59:02 +03:00

181 lines
7.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Безопасность и аутентификация
Автор: Сергей Антропов
Сайт: https://devops.org.ru
"""
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from app.core.config import settings
# Ленивая инициализация CryptContext для избежания ошибок при импорте
_pwd_context = None
# Глобальный патч для bcrypt.hashpw, чтобы ограничить длину пароля
_bcrypt_patched = False
def _patch_bcrypt_hashpw():
"""Патч для bcrypt.hashpw, чтобы ограничить длину пароля до 72 байт"""
global _bcrypt_patched
if _bcrypt_patched:
return
try:
# Патчим bcrypt модуль напрямую
import bcrypt
original_hashpw = bcrypt.hashpw
def safe_hashpw(secret, salt):
"""Обертка для hashpw с ограничением длины до 72 байт"""
if isinstance(secret, str):
secret_bytes = secret.encode('utf-8')
else:
secret_bytes = secret
if len(secret_bytes) > 72:
secret_bytes = secret_bytes[:72]
return original_hashpw(secret_bytes, salt)
# Заменяем hashpw глобально в модуле bcrypt
bcrypt.hashpw = safe_hashpw
# Также патчим _bcrypt, если он доступен
try:
import bcrypt._bcrypt as _bcrypt
_bcrypt.hashpw = safe_hashpw
except (ImportError, AttributeError):
pass
_bcrypt_patched = True
except Exception:
# Если не удалось патчить, продолжаем без патча
pass
# Применяем патч при импорте модуля
_patch_bcrypt_hashpw()
def _get_pwd_context():
"""Получение CryptContext с ленивой инициализацией"""
global _pwd_context
if _pwd_context is None:
try:
import os
import warnings
# Отключаем предупреждения
os.environ.setdefault('PASSLIB_SUPPRESS_USER_WARNING', '1')
warnings.filterwarnings('ignore', category=UserWarning)
# Патчим passlib.handlers.bcrypt._bcrypt перед инициализацией
# Это необходимо, чтобы избежать ошибки при автоматической проверке
try:
# Импортируем модуль passlib.handlers.bcrypt
import passlib.handlers.bcrypt as bcrypt_handler
# Сохраняем оригинальную функцию hashpw
if hasattr(bcrypt_handler, '_bcrypt') and hasattr(bcrypt_handler._bcrypt, 'hashpw'):
original_hashpw = bcrypt_handler._bcrypt.hashpw
def safe_hashpw(secret, salt):
"""Обертка для hashpw с ограничением длины до 72 байт"""
if isinstance(secret, str):
secret_bytes = secret.encode('utf-8')
else:
secret_bytes = secret
if len(secret_bytes) > 72:
secret_bytes = secret_bytes[:72]
return original_hashpw(secret_bytes, salt)
# Заменяем hashpw НАВСЕГДА (не восстанавливаем)
bcrypt_handler._bcrypt.hashpw = safe_hashpw
# Импортируем и инициализируем CryptContext
from passlib.context import CryptContext
_pwd_context = CryptContext(
schemes=["bcrypt"],
deprecated="auto",
bcrypt__rounds=12
)
else:
# Если не удалось найти _bcrypt, инициализируем без патча
from passlib.context import CryptContext
_pwd_context = CryptContext(
schemes=["bcrypt"],
deprecated="auto",
bcrypt__rounds=12
)
except ImportError:
# Если bcrypt не установлен, используем простой хеш
raise ImportError("bcrypt не установлен")
except Exception as e:
# Если не удалось инициализировать bcrypt, используем простой хеш
import hashlib
import logging
logger = logging.getLogger(__name__)
logger.warning(f"Не удалось инициализировать bcrypt: {e}. Используется простой хеш.")
class SimpleHash:
def hash(self, password: str) -> str:
# Ограничиваем длину для безопасности
if len(password.encode('utf-8')) > 72:
password = password[:72]
return hashlib.sha256(password.encode()).hexdigest()
def verify(self, plain: str, hashed: str) -> bool:
if len(plain.encode('utf-8')) > 72:
plain = plain[:72]
return self.hash(plain) == hashed
_pwd_context = SimpleHash()
return _pwd_context
def _truncate_password_bytes(password: str, max_bytes: int = 72) -> str:
"""Обрезает пароль до максимального количества байт"""
password_bytes = password.encode('utf-8')
if len(password_bytes) <= max_bytes:
return password
# Обрезаем до max_bytes байт
truncated_bytes = password_bytes[:max_bytes]
# Декодируем обратно в строку, игнорируя ошибки если обрезали посередине символа
return truncated_bytes.decode('utf-8', errors='ignore')
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Проверка пароля"""
pwd_context = _get_pwd_context()
# Ограничиваем длину пароля до 72 байт для bcrypt
plain_password = _truncate_password_bytes(plain_password, 72)
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Хеширование пароля"""
pwd_context = _get_pwd_context()
# Ограничиваем длину пароля до 72 байт для bcrypt
password = _truncate_password_bytes(password, 72)
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Создание JWT токена"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def decode_access_token(token: str) -> Optional[dict]:
"""Декодирование JWT токена"""
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
return payload
except JWTError:
return None