Files
MessageGateway/app/core/auth.py
Sergey Antropov b90def35ed Initial commit: Message Gateway project
- FastAPI приложение для отправки мониторинговых алертов в мессенджеры
- Поддержка Telegram и MAX/VK
- Интеграция с Grafana, Zabbix, AlertManager
- Автоматическое создание тикетов в Jira
- Управление группами мессенджеров через API
- Декораторы для авторизации и скрытия эндпоинтов
- Подробная документация в папке docs/

Автор: Сергей Антропов
Сайт: https://devops.org.ru
2025-11-12 20:25:11 +03:00

184 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Утилиты для аутентификации и авторизации.
Автор: Сергей Антропов
Сайт: https://devops.org.ru
"""
import logging
from typing import Optional, Callable, Any
from functools import wraps
from fastapi import HTTPException, Security, Depends, Request
from fastapi.security import APIKeyHeader
from starlette.requests import Request as StarletteRequest
from app.core.config import get_settings
logger = logging.getLogger(__name__)
# Request может быть как из FastAPI, так и из Starlette
# Оба типа совместимы, поэтому используем StarletteRequest как базовый тип
RequestType = StarletteRequest
# Схема безопасности для API ключа
api_key_header = APIKeyHeader(
name="X-API-Key",
auto_error=False,
description="API ключ для авторизации"
)
def verify_api_key(api_key: Optional[str] = Security(api_key_header)) -> bool:
"""
Проверить API ключ для авторизации (обязательная авторизация).
Используется как зависимость FastAPI (Depends) для отображения в Swagger UI.
Также помечает контекст запроса, что API ключ был проверен.
Args:
api_key: API ключ из заголовка X-API-Key.
Returns:
True если API ключ верный.
Raises:
HTTPException: Если API ключ неверный или не указан.
"""
settings = get_settings()
# Если API ключ не установлен в настройках, доступ запрещен
if not settings.api_key:
logger.warning("API_KEY не установлен - доступ запрещен")
raise HTTPException(
status_code=401,
detail="API ключ не настроен на сервере"
)
# Если API ключ не передан, доступ запрещен
if not api_key:
raise HTTPException(
status_code=401,
detail="Неверный или отсутствующий API ключ",
headers={"WWW-Authenticate": "ApiKey"}
)
# Проверяем API ключ
if api_key != settings.api_key:
logger.warning(f"Неверный API ключ: {api_key[:10]}...")
raise HTTPException(
status_code=401,
detail="Неверный или отсутствующий API ключ",
headers={"WWW-Authenticate": "ApiKey"}
)
# Помечаем, что API ключ был проверен через dependency
# Это позволяет декоратору @require_api_key не выполнять повторную проверку
try:
from starlette.context import contextvars
# Используем contextvars для хранения информации о проверке
# Но это может не работать во всех случаях
pass
except ImportError:
pass
return True
def verify_api_key_optional(api_key: Optional[str] = Security(api_key_header)) -> Optional[bool]:
"""
Проверить API ключ для авторизации (опциональная авторизация).
Используется как зависимость FastAPI (Depends).
Args:
api_key: API ключ из заголовка X-API-Key.
Returns:
True если API ключ верный, None если не передан, выбрасывает исключение если неверный.
Raises:
HTTPException: Если API ключ неверный.
"""
settings = get_settings()
# Если API ключ не установлен в настройках, возвращаем None (нет авторизации)
if not settings.api_key:
return None
# Если API ключ не передан, возвращаем None (нет авторизации)
if not api_key:
return None
# Проверяем API ключ
if api_key != settings.api_key:
logger.warning(f"Неверный API ключ: {api_key[:10]}...")
raise HTTPException(
status_code=401,
detail="Неверный или отсутствующий API ключ",
headers={"WWW-Authenticate": "ApiKey"}
)
return True
# Удобные константы для использования в endpoints (через dependencies)
require_api_key_dependency = Depends(verify_api_key)
require_api_key_optional = Depends(verify_api_key_optional)
def require_api_key(func: Callable) -> Callable:
"""
Декоратор для пометки функции как требующей API ключ.
Использование:
@require_api_key
@router.post("/endpoint", dependencies=[require_api_key_dependency])
async def my_endpoint(request: Request, ...):
...
Примечание: Декоратор используется только для пометки функции.
Фактическая проверка API ключа выполняется через `dependencies=[require_api_key_dependency]`,
который также обеспечивает отображение замочка в Swagger UI.
Декоратор не выполняет проверку API ключа - это делает dependency.
Декоратор оставлен для удобства и возможного расширения в будущем.
Args:
func: Функция для декорирования.
Returns:
Декорированная функция с пометкой о необходимости API ключа.
"""
# Помечаем функцию, что она требует API ключ
func.__requires_api_key__ = True
# Просто возвращаем функцию без изменений
# Проверка API ключа выполняется через dependency
return func
def hide_from_api(func: Callable) -> Callable:
"""
Декоратор для скрытия эндпоинта из API документации (Swagger UI).
Использование:
@hide_from_api
@router.post("/debug/dump")
async def debug_endpoint(...):
...
Примечание: Декоратор помечает функцию как скрытую от API.
Эндпоинт все еще будет работать, но не будет отображаться в Swagger UI.
Декоратор должен быть применен ПЕРЕД декоратором route (снизу вверх).
Args:
func: Функция для декорирования.
Returns:
Декорированная функция с пометкой о скрытии от API.
"""
# Помечаем функцию, что она должна быть скрыта от API
func.__hide_from_api__ = True
# Просто возвращаем функцию без изменений
# Скрытие из API выполняется в custom_openapi
return func