- Добавлена колонка 'Тип' во все таблицы истории сборок - Для push операций отображается registry вместо платформ - Сохранение пользователя при создании push лога - Исправлена ошибка с logger в push_docker_image endpoint - Улучшено отображение истории сборок с визуальными индикаторами
181 lines
7.8 KiB
Python
181 lines
7.8 KiB
Python
"""
|
||
Безопасность и аутентификация
|
||
Автор: Сергей Антропов
|
||
Сайт: https://devops.org.ru
|
||
"""
|
||
|
||
from datetime import datetime, timedelta
|
||
from typing import Optional
|
||
from jose import JWTError, jwt
|
||
from app.core.config import settings
|
||
|
||
# Ленивая инициализация CryptContext для избежания ошибок при импорте
|
||
_pwd_context = None
|
||
|
||
# Глобальный патч для bcrypt.hashpw, чтобы ограничить длину пароля
|
||
_bcrypt_patched = False
|
||
|
||
def _patch_bcrypt_hashpw():
|
||
"""Патч для bcrypt.hashpw, чтобы ограничить длину пароля до 72 байт"""
|
||
global _bcrypt_patched
|
||
if _bcrypt_patched:
|
||
return
|
||
|
||
try:
|
||
# Патчим bcrypt модуль напрямую
|
||
import bcrypt
|
||
original_hashpw = bcrypt.hashpw
|
||
|
||
def safe_hashpw(secret, salt):
|
||
"""Обертка для hashpw с ограничением длины до 72 байт"""
|
||
if isinstance(secret, str):
|
||
secret_bytes = secret.encode('utf-8')
|
||
else:
|
||
secret_bytes = secret
|
||
if len(secret_bytes) > 72:
|
||
secret_bytes = secret_bytes[:72]
|
||
return original_hashpw(secret_bytes, salt)
|
||
|
||
# Заменяем hashpw глобально в модуле bcrypt
|
||
bcrypt.hashpw = safe_hashpw
|
||
|
||
# Также патчим _bcrypt, если он доступен
|
||
try:
|
||
import bcrypt._bcrypt as _bcrypt
|
||
_bcrypt.hashpw = safe_hashpw
|
||
except (ImportError, AttributeError):
|
||
pass
|
||
|
||
_bcrypt_patched = True
|
||
except Exception:
|
||
# Если не удалось патчить, продолжаем без патча
|
||
pass
|
||
|
||
# Применяем патч при импорте модуля
|
||
_patch_bcrypt_hashpw()
|
||
|
||
|
||
def _get_pwd_context():
|
||
"""Получение CryptContext с ленивой инициализацией"""
|
||
global _pwd_context
|
||
if _pwd_context is None:
|
||
try:
|
||
import os
|
||
import warnings
|
||
|
||
# Отключаем предупреждения
|
||
os.environ.setdefault('PASSLIB_SUPPRESS_USER_WARNING', '1')
|
||
warnings.filterwarnings('ignore', category=UserWarning)
|
||
|
||
# Патчим passlib.handlers.bcrypt._bcrypt перед инициализацией
|
||
# Это необходимо, чтобы избежать ошибки при автоматической проверке
|
||
try:
|
||
# Импортируем модуль passlib.handlers.bcrypt
|
||
import passlib.handlers.bcrypt as bcrypt_handler
|
||
|
||
# Сохраняем оригинальную функцию hashpw
|
||
if hasattr(bcrypt_handler, '_bcrypt') and hasattr(bcrypt_handler._bcrypt, 'hashpw'):
|
||
original_hashpw = bcrypt_handler._bcrypt.hashpw
|
||
|
||
def safe_hashpw(secret, salt):
|
||
"""Обертка для hashpw с ограничением длины до 72 байт"""
|
||
if isinstance(secret, str):
|
||
secret_bytes = secret.encode('utf-8')
|
||
else:
|
||
secret_bytes = secret
|
||
if len(secret_bytes) > 72:
|
||
secret_bytes = secret_bytes[:72]
|
||
return original_hashpw(secret_bytes, salt)
|
||
|
||
# Заменяем hashpw НАВСЕГДА (не восстанавливаем)
|
||
bcrypt_handler._bcrypt.hashpw = safe_hashpw
|
||
|
||
# Импортируем и инициализируем CryptContext
|
||
from passlib.context import CryptContext
|
||
|
||
_pwd_context = CryptContext(
|
||
schemes=["bcrypt"],
|
||
deprecated="auto",
|
||
bcrypt__rounds=12
|
||
)
|
||
else:
|
||
# Если не удалось найти _bcrypt, инициализируем без патча
|
||
from passlib.context import CryptContext
|
||
_pwd_context = CryptContext(
|
||
schemes=["bcrypt"],
|
||
deprecated="auto",
|
||
bcrypt__rounds=12
|
||
)
|
||
|
||
except ImportError:
|
||
# Если bcrypt не установлен, используем простой хеш
|
||
raise ImportError("bcrypt не установлен")
|
||
|
||
except Exception as e:
|
||
# Если не удалось инициализировать bcrypt, используем простой хеш
|
||
import hashlib
|
||
import logging
|
||
logger = logging.getLogger(__name__)
|
||
logger.warning(f"Не удалось инициализировать bcrypt: {e}. Используется простой хеш.")
|
||
|
||
class SimpleHash:
|
||
def hash(self, password: str) -> str:
|
||
# Ограничиваем длину для безопасности
|
||
if len(password.encode('utf-8')) > 72:
|
||
password = password[:72]
|
||
return hashlib.sha256(password.encode()).hexdigest()
|
||
def verify(self, plain: str, hashed: str) -> bool:
|
||
if len(plain.encode('utf-8')) > 72:
|
||
plain = plain[:72]
|
||
return self.hash(plain) == hashed
|
||
_pwd_context = SimpleHash()
|
||
return _pwd_context
|
||
|
||
|
||
def _truncate_password_bytes(password: str, max_bytes: int = 72) -> str:
|
||
"""Обрезает пароль до максимального количества байт"""
|
||
password_bytes = password.encode('utf-8')
|
||
if len(password_bytes) <= max_bytes:
|
||
return password
|
||
# Обрезаем до max_bytes байт
|
||
truncated_bytes = password_bytes[:max_bytes]
|
||
# Декодируем обратно в строку, игнорируя ошибки если обрезали посередине символа
|
||
return truncated_bytes.decode('utf-8', errors='ignore')
|
||
|
||
|
||
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
||
"""Проверка пароля"""
|
||
pwd_context = _get_pwd_context()
|
||
# Ограничиваем длину пароля до 72 байт для bcrypt
|
||
plain_password = _truncate_password_bytes(plain_password, 72)
|
||
return pwd_context.verify(plain_password, hashed_password)
|
||
|
||
|
||
def get_password_hash(password: str) -> str:
|
||
"""Хеширование пароля"""
|
||
pwd_context = _get_pwd_context()
|
||
# Ограничиваем длину пароля до 72 байт для bcrypt
|
||
password = _truncate_password_bytes(password, 72)
|
||
return pwd_context.hash(password)
|
||
|
||
|
||
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
|
||
"""Создание JWT токена"""
|
||
to_encode = data.copy()
|
||
if expires_delta:
|
||
expire = datetime.utcnow() + expires_delta
|
||
else:
|
||
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
|
||
to_encode.update({"exp": expire})
|
||
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
|
||
return encoded_jwt
|
||
|
||
|
||
def decode_access_token(token: str) -> Optional[dict]:
|
||
"""Декодирование JWT токена"""
|
||
try:
|
||
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
|
||
return payload
|
||
except JWTError:
|
||
return None
|