Files
MessageGateway/app/core/messenger_factory.py
Sergey Antropov b90def35ed Initial commit: Message Gateway project
- FastAPI приложение для отправки мониторинговых алертов в мессенджеры
- Поддержка Telegram и MAX/VK
- Интеграция с Grafana, Zabbix, AlertManager
- Автоматическое создание тикетов в Jira
- Управление группами мессенджеров через API
- Декораторы для авторизации и скрытия эндпоинтов
- Подробная документация в папке docs/

Автор: Сергей Антропов
Сайт: https://devops.org.ru
2025-11-12 20:25:11 +03:00

103 lines
4.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Фабрика для создания клиентов мессенджеров.
Автор: Сергей Антропов
Сайт: https://devops.org.ru
"""
import logging
from typing import Optional, Dict, Any
from app.core.messengers.base import MessengerClient
from app.core.messengers.telegram import TelegramMessengerClient
from app.core.messengers.max import MaxMessengerClient
from app.core.config import get_settings
logger = logging.getLogger(__name__)
class MessengerFactory:
"""Фабрика для создания клиентов мессенджеров."""
@staticmethod
def create(messenger_type: str, **kwargs) -> MessengerClient:
"""
Создать клиент мессенджера.
Args:
messenger_type: Тип мессенджера (telegram, max).
**kwargs: Дополнительные параметры для инициализации клиента.
Returns:
Экземпляр MessengerClient.
Raises:
ValueError: Если тип мессенджера неизвестен.
"""
messenger_type = messenger_type.lower()
if messenger_type == "telegram":
bot_token = kwargs.get("bot_token")
return TelegramMessengerClient(bot_token=bot_token)
elif messenger_type == "max":
access_token = kwargs.get("access_token")
api_version = kwargs.get("api_version", "5.131")
return MaxMessengerClient(access_token=access_token, api_version=api_version)
else:
raise ValueError(f"Неизвестный тип мессенджера: {messenger_type}")
@staticmethod
def create_from_config(group_config: Dict[str, Any]) -> MessengerClient:
"""
Создать клиент мессенджера из конфигурации группы.
Args:
group_config: Конфигурация группы с полями:
- messenger: Тип мессенджера (telegram, max)
- config: Дополнительная конфигурация для мессенджера
Returns:
Экземпляр MessengerClient.
"""
messenger_type = group_config.get("messenger", "telegram")
config = group_config.get("config", {})
# Если конфигурация не указана, используем настройки из переменных окружения
settings = get_settings()
if messenger_type == "telegram":
bot_token = config.get("bot_token") or settings.telegram_bot_token
return TelegramMessengerClient(bot_token=bot_token)
elif messenger_type == "max":
access_token = config.get("access_token") or settings.max_access_token
api_version = config.get("api_version", settings.max_api_version or "5.131")
return MaxMessengerClient(access_token=access_token, api_version=api_version)
else:
raise ValueError(f"Неизвестный тип мессенджера: {messenger_type}")
# Глобальный кэш клиентов мессенджеров
_messenger_clients_cache: Dict[str, MessengerClient] = {}
def get_messenger_client(messenger_type: str, **kwargs) -> MessengerClient:
"""
Получить клиент мессенджера (с кэшированием).
Args:
messenger_type: Тип мессенджера (telegram, max).
**kwargs: Дополнительные параметры для инициализации клиента.
Returns:
Экземпляр MessengerClient.
"""
# Для Telegram используем глобальный экземпляр, если bot_token не указан
if messenger_type == "telegram" and "bot_token" not in kwargs:
cache_key = "telegram_default"
if cache_key not in _messenger_clients_cache:
_messenger_clients_cache[cache_key] = MessengerFactory.create(messenger_type, **kwargs)
return _messenger_clients_cache[cache_key]
# Для других мессенджеров создаем новый экземпляр
return MessengerFactory.create(messenger_type, **kwargs)