logboard/app/core/auth.py
Сергей Антропов 9d4add2a7d fix: resolve static files and import issues
- Fix static files not loading due to volume mount conflict
- Remove problematic volume mount from docker-compose.yml
- Add __init__.py files to make Python packages
- Fix all import statements to use relative imports
- Update start.sh to use correct module name
- Update config.py with correct default paths and values
- Ensure all environment variables are properly loaded from .env file
2025-08-20 18:14:35 +03:00

83 lines
3.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
LogBoard+ - Аутентификация
Автор: Сергей Антропов
Сайт: https://devops.org.ru
"""
import os
from datetime import datetime, timedelta
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from passlib.context import CryptContext
import jwt
from core.config import (
SECRET_KEY,
ALGORITHM,
ACCESS_TOKEN_EXPIRE_MINUTES,
ADMIN_USERNAME,
ADMIN_PASSWORD
)
# Инициализация безопасности
security = HTTPBearer()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Функции для работы с паролями
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Проверяет пароль"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Хеширует пароль"""
return pwd_context.hash(password)
# Функции для работы с JWT токенами
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
"""Создает JWT токен"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(token: str) -> Optional[str]:
"""Проверяет JWT токен и возвращает имя пользователя"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
return None
return username
except jwt.PyJWTError:
return None
# Функция для проверки аутентификации
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str:
"""Получает текущего пользователя из токена"""
token = credentials.credentials
username = verify_token(token)
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Недействительный токен. Требуется авторизация.",
headers={"WWW-Authenticate": "Bearer"},
)
return username
# Функция для проверки пользователя
def authenticate_user(username: str, password: str) -> bool:
"""Аутентифицирует пользователя"""
if username == ADMIN_USERNAME:
# Для простоты используем прямое сравнение паролей
# В продакшене рекомендуется использовать хешированные пароли
return password == ADMIN_PASSWORD
return False