- FastAPI приложение для отправки мониторинговых алертов в мессенджеры - Поддержка Telegram и MAX/VK - Интеграция с Grafana, Zabbix, AlertManager - Автоматическое создание тикетов в Jira - Управление группами мессенджеров через API - Декораторы для авторизации и скрытия эндпоинтов - Подробная документация в папке docs/ Автор: Сергей Антропов Сайт: https://devops.org.ru
213 lines
8.3 KiB
Python
213 lines
8.3 KiB
Python
"""
|
||
Управление конфигурацией маппинга алертов в Jira тикеты.
|
||
|
||
Автор: Сергей Антропов
|
||
Сайт: https://devops.org.ru
|
||
"""
|
||
import json
|
||
import logging
|
||
from typing import Dict, Any, Optional, List
|
||
import aiofiles
|
||
from datetime import datetime, timedelta
|
||
|
||
from app.models.jira import JiraMappingConfig, JiraSourceMapping, JiraMapping, JiraMappingCondition
|
||
from app.core.config import get_settings
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class JiraMappingManager:
|
||
"""Менеджер конфигурации маппинга алертов в Jira тикеты с кэшированием."""
|
||
|
||
def __init__(self, config_path: Optional[str] = None):
|
||
"""
|
||
Инициализация менеджера конфигурации.
|
||
|
||
Args:
|
||
config_path: Путь к файлу конфигурации маппинга.
|
||
"""
|
||
settings = get_settings()
|
||
self.config_path = config_path or settings.jira_mapping_config_path
|
||
self._cache: Optional[JiraMappingConfig] = None
|
||
self._cache_time: Optional[datetime] = None
|
||
self._cache_ttl = timedelta(minutes=10) # Кэш на 10 минут
|
||
|
||
async def _load_config(self) -> JiraMappingConfig:
|
||
"""
|
||
Загрузить конфигурацию маппинга из файла.
|
||
|
||
Returns:
|
||
Конфигурация маппинга алертов в Jira тикеты.
|
||
|
||
Raises:
|
||
FileNotFoundError: Если файл конфигурации не найден.
|
||
json.JSONDecodeError: Если файл содержит некорректный JSON.
|
||
"""
|
||
try:
|
||
async with aiofiles.open(self.config_path, 'r', encoding='utf-8') as f:
|
||
content = await f.read()
|
||
config_dict = json.loads(content)
|
||
config = JiraMappingConfig(**config_dict)
|
||
logger.info(f"Конфигурация маппинга Jira загружена из {self.config_path}")
|
||
return config
|
||
except FileNotFoundError:
|
||
logger.warning(f"Файл конфигурации маппинга Jira не найден: {self.config_path}")
|
||
# Возвращаем пустую конфигурацию
|
||
return JiraMappingConfig()
|
||
except json.JSONDecodeError as e:
|
||
logger.error(f"Ошибка парсинга JSON в файле конфигурации маппинга: {e}")
|
||
return JiraMappingConfig()
|
||
except Exception as e:
|
||
logger.error(f"Ошибка загрузки конфигурации маппинга: {e}")
|
||
return JiraMappingConfig()
|
||
|
||
def _is_cache_valid(self) -> bool:
|
||
"""Проверить, валиден ли кэш."""
|
||
if self._cache is None or self._cache_time is None:
|
||
return False
|
||
return datetime.now() - self._cache_time < self._cache_ttl
|
||
|
||
async def get_config(self) -> JiraMappingConfig:
|
||
"""
|
||
Получить конфигурацию маппинга (с кэшированием).
|
||
|
||
Returns:
|
||
Конфигурация маппинга алертов в Jira тикеты.
|
||
"""
|
||
if not self._is_cache_valid():
|
||
self._cache = await self._load_config()
|
||
self._cache_time = datetime.now()
|
||
return self._cache
|
||
|
||
async def find_mapping(
|
||
self,
|
||
source: str,
|
||
alert_data: Dict[str, Any]
|
||
) -> Optional[JiraMapping]:
|
||
"""
|
||
Найти подходящий маппинг для алерта.
|
||
|
||
Args:
|
||
source: Источник алерта (alertmanager, grafana, zabbix).
|
||
alert_data: Данные алерта.
|
||
|
||
Returns:
|
||
Подходящий маппинг или None, если маппинг не найден.
|
||
"""
|
||
config = await self.get_config()
|
||
|
||
# Получаем конфигурацию для источника
|
||
source_mapping: Optional[JiraSourceMapping] = None
|
||
if source == "alertmanager" and config.alertmanager:
|
||
source_mapping = config.alertmanager
|
||
elif source == "grafana" and config.grafana:
|
||
source_mapping = config.grafana
|
||
elif source == "zabbix" and config.zabbix:
|
||
source_mapping = config.zabbix
|
||
|
||
if not source_mapping:
|
||
return None
|
||
|
||
# Ищем подходящий маппинг по условиям
|
||
for mapping in source_mapping.mappings:
|
||
if self._check_conditions(mapping.conditions, alert_data):
|
||
return mapping
|
||
|
||
# Если маппинг не найден, возвращаем дефолтный маппинг
|
||
return JiraMapping(
|
||
conditions=JiraMappingCondition(),
|
||
project=source_mapping.default_project,
|
||
assignee=source_mapping.default_assignee,
|
||
issue_type=source_mapping.default_issue_type,
|
||
priority=source_mapping.default_priority,
|
||
labels=[]
|
||
)
|
||
|
||
def _check_conditions(
|
||
self,
|
||
conditions: JiraMappingCondition,
|
||
alert_data: Dict[str, Any]
|
||
) -> bool:
|
||
"""
|
||
Проверить, соответствуют ли данные алерта условиям маппинга.
|
||
|
||
Args:
|
||
conditions: Условия маппинга.
|
||
alert_data: Данные алерта.
|
||
|
||
Returns:
|
||
True если условия выполнены, False в противном случае.
|
||
"""
|
||
# Проверяем severity
|
||
if conditions.severity:
|
||
if alert_data.get("severity") != conditions.severity:
|
||
return False
|
||
|
||
# Проверяем namespace
|
||
if conditions.namespace:
|
||
if alert_data.get("namespace") != conditions.namespace:
|
||
return False
|
||
|
||
# Проверяем state
|
||
if conditions.state:
|
||
if alert_data.get("state") != conditions.state:
|
||
return False
|
||
|
||
# Проверяем status
|
||
if conditions.status:
|
||
if alert_data.get("status") != conditions.status:
|
||
return False
|
||
|
||
# Проверяем event-severity
|
||
if conditions.event_severity:
|
||
if alert_data.get("event-severity") != conditions.event_severity:
|
||
return False
|
||
|
||
# Проверяем теги
|
||
if conditions.tags:
|
||
alert_tags = alert_data.get("tags", {})
|
||
for key, value in conditions.tags.items():
|
||
if alert_tags.get(key) != value:
|
||
return False
|
||
|
||
return True
|
||
|
||
async def refresh_cache(self) -> None:
|
||
"""Принудительно обновить кэш конфигурации."""
|
||
try:
|
||
self._cache = await self._load_config()
|
||
self._cache_time = datetime.now()
|
||
logger.info("Кэш конфигурации маппинга Jira обновлен")
|
||
except Exception as e:
|
||
logger.error(f"Ошибка обновления кэша: {e}")
|
||
|
||
|
||
# Глобальный экземпляр менеджера конфигурации маппинга (lazy initialization)
|
||
_jira_mapping_manager_instance = None
|
||
|
||
|
||
def get_jira_mapping_manager() -> JiraMappingManager:
|
||
"""
|
||
Получить экземпляр менеджера конфигурации маппинга Jira (lazy initialization).
|
||
|
||
Returns:
|
||
Экземпляр JiraMappingManager.
|
||
"""
|
||
global _jira_mapping_manager_instance
|
||
if _jira_mapping_manager_instance is None:
|
||
_jira_mapping_manager_instance = JiraMappingManager()
|
||
return _jira_mapping_manager_instance
|
||
|
||
|
||
# Для обратной совместимости (lazy initialization)
|
||
class _JiraMappingManagerProxy:
|
||
"""Прокси для ленивой инициализации jira_mapping_manager."""
|
||
|
||
def __getattr__(self, name):
|
||
"""Получить атрибут из jira_mapping_manager."""
|
||
return getattr(get_jira_mapping_manager(), name)
|
||
|
||
|
||
jira_mapping_manager = _JiraMappingManagerProxy()
|
||
|