Files
MessageGateway/app/core/jira_mapping.py
Sergey Antropov b90def35ed Initial commit: Message Gateway project
- FastAPI приложение для отправки мониторинговых алертов в мессенджеры
- Поддержка Telegram и MAX/VK
- Интеграция с Grafana, Zabbix, AlertManager
- Автоматическое создание тикетов в Jira
- Управление группами мессенджеров через API
- Декораторы для авторизации и скрытия эндпоинтов
- Подробная документация в папке docs/

Автор: Сергей Антропов
Сайт: https://devops.org.ru
2025-11-12 20:25:11 +03:00

213 lines
8.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Управление конфигурацией маппинга алертов в Jira тикеты.
Автор: Сергей Антропов
Сайт: https://devops.org.ru
"""
import json
import logging
from typing import Dict, Any, Optional, List
import aiofiles
from datetime import datetime, timedelta
from app.models.jira import JiraMappingConfig, JiraSourceMapping, JiraMapping, JiraMappingCondition
from app.core.config import get_settings
logger = logging.getLogger(__name__)
class JiraMappingManager:
"""Менеджер конфигурации маппинга алертов в Jira тикеты с кэшированием."""
def __init__(self, config_path: Optional[str] = None):
"""
Инициализация менеджера конфигурации.
Args:
config_path: Путь к файлу конфигурации маппинга.
"""
settings = get_settings()
self.config_path = config_path or settings.jira_mapping_config_path
self._cache: Optional[JiraMappingConfig] = None
self._cache_time: Optional[datetime] = None
self._cache_ttl = timedelta(minutes=10) # Кэш на 10 минут
async def _load_config(self) -> JiraMappingConfig:
"""
Загрузить конфигурацию маппинга из файла.
Returns:
Конфигурация маппинга алертов в Jira тикеты.
Raises:
FileNotFoundError: Если файл конфигурации не найден.
json.JSONDecodeError: Если файл содержит некорректный JSON.
"""
try:
async with aiofiles.open(self.config_path, 'r', encoding='utf-8') as f:
content = await f.read()
config_dict = json.loads(content)
config = JiraMappingConfig(**config_dict)
logger.info(f"Конфигурация маппинга Jira загружена из {self.config_path}")
return config
except FileNotFoundError:
logger.warning(f"Файл конфигурации маппинга Jira не найден: {self.config_path}")
# Возвращаем пустую конфигурацию
return JiraMappingConfig()
except json.JSONDecodeError as e:
logger.error(f"Ошибка парсинга JSON в файле конфигурации маппинга: {e}")
return JiraMappingConfig()
except Exception as e:
logger.error(f"Ошибка загрузки конфигурации маппинга: {e}")
return JiraMappingConfig()
def _is_cache_valid(self) -> bool:
"""Проверить, валиден ли кэш."""
if self._cache is None or self._cache_time is None:
return False
return datetime.now() - self._cache_time < self._cache_ttl
async def get_config(self) -> JiraMappingConfig:
"""
Получить конфигурацию маппинга (с кэшированием).
Returns:
Конфигурация маппинга алертов в Jira тикеты.
"""
if not self._is_cache_valid():
self._cache = await self._load_config()
self._cache_time = datetime.now()
return self._cache
async def find_mapping(
self,
source: str,
alert_data: Dict[str, Any]
) -> Optional[JiraMapping]:
"""
Найти подходящий маппинг для алерта.
Args:
source: Источник алерта (alertmanager, grafana, zabbix).
alert_data: Данные алерта.
Returns:
Подходящий маппинг или None, если маппинг не найден.
"""
config = await self.get_config()
# Получаем конфигурацию для источника
source_mapping: Optional[JiraSourceMapping] = None
if source == "alertmanager" and config.alertmanager:
source_mapping = config.alertmanager
elif source == "grafana" and config.grafana:
source_mapping = config.grafana
elif source == "zabbix" and config.zabbix:
source_mapping = config.zabbix
if not source_mapping:
return None
# Ищем подходящий маппинг по условиям
for mapping in source_mapping.mappings:
if self._check_conditions(mapping.conditions, alert_data):
return mapping
# Если маппинг не найден, возвращаем дефолтный маппинг
return JiraMapping(
conditions=JiraMappingCondition(),
project=source_mapping.default_project,
assignee=source_mapping.default_assignee,
issue_type=source_mapping.default_issue_type,
priority=source_mapping.default_priority,
labels=[]
)
def _check_conditions(
self,
conditions: JiraMappingCondition,
alert_data: Dict[str, Any]
) -> bool:
"""
Проверить, соответствуют ли данные алерта условиям маппинга.
Args:
conditions: Условия маппинга.
alert_data: Данные алерта.
Returns:
True если условия выполнены, False в противном случае.
"""
# Проверяем severity
if conditions.severity:
if alert_data.get("severity") != conditions.severity:
return False
# Проверяем namespace
if conditions.namespace:
if alert_data.get("namespace") != conditions.namespace:
return False
# Проверяем state
if conditions.state:
if alert_data.get("state") != conditions.state:
return False
# Проверяем status
if conditions.status:
if alert_data.get("status") != conditions.status:
return False
# Проверяем event-severity
if conditions.event_severity:
if alert_data.get("event-severity") != conditions.event_severity:
return False
# Проверяем теги
if conditions.tags:
alert_tags = alert_data.get("tags", {})
for key, value in conditions.tags.items():
if alert_tags.get(key) != value:
return False
return True
async def refresh_cache(self) -> None:
"""Принудительно обновить кэш конфигурации."""
try:
self._cache = await self._load_config()
self._cache_time = datetime.now()
logger.info("Кэш конфигурации маппинга Jira обновлен")
except Exception as e:
logger.error(f"Ошибка обновления кэша: {e}")
# Глобальный экземпляр менеджера конфигурации маппинга (lazy initialization)
_jira_mapping_manager_instance = None
def get_jira_mapping_manager() -> JiraMappingManager:
"""
Получить экземпляр менеджера конфигурации маппинга Jira (lazy initialization).
Returns:
Экземпляр JiraMappingManager.
"""
global _jira_mapping_manager_instance
if _jira_mapping_manager_instance is None:
_jira_mapping_manager_instance = JiraMappingManager()
return _jira_mapping_manager_instance
# Для обратной совместимости (lazy initialization)
class _JiraMappingManagerProxy:
"""Прокси для ленивой инициализации jira_mapping_manager."""
def __getattr__(self, name):
"""Получить атрибут из jira_mapping_manager."""
return getattr(get_jira_mapping_manager(), name)
jira_mapping_manager = _JiraMappingManagerProxy()