Files
DevOpsLab/app/services/role_service.py
Сергей Антропов 1fbf9185a2 feat: добавлена пометка типа операции (Build/Push) в истории сборок Dockerfile
- Добавлена колонка 'Тип' во все таблицы истории сборок
- Для push операций отображается registry вместо платформ
- Сохранение пользователя при создании push лога
- Исправлена ошибка с logger в push_docker_image endpoint
- Улучшено отображение истории сборок с визуальными индикаторами
2026-02-15 22:59:02 +03:00

578 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Сервис для работы с ролями
Автор: Сергей Антропов
Сайт: https://devops.org.ru
"""
from pathlib import Path
from typing import Dict, List, Optional
import yaml
import json
from datetime import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, or_, and_, func
from app.core.config import settings
from app.models.database import Role
from app.core.make_executor import MakeExecutor
import logging
logger = logging.getLogger(__name__)
class RoleService:
"""Сервис для управления ролями"""
def __init__(self):
self.project_root = settings.PROJECT_ROOT
# Папка для хранения ролей (для экспорта и работы с файлами)
self.roles_dir_alembic = Path(__file__).parent.parent / "alembic" / "roles"
self.roles_dir_alembic.mkdir(exist_ok=True)
self.executor = MakeExecutor()
@staticmethod
async def get_role(db: AsyncSession, role_id: int) -> Optional[Role]:
"""Получение роли по ID"""
result = await db.execute(select(Role).where(Role.id == role_id))
return result.scalar_one_or_none()
@staticmethod
async def get_role_by_name(db: AsyncSession, name: str) -> Optional[Role]:
"""Получение роли по имени"""
result = await db.execute(select(Role).where(Role.name == name))
return result.scalar_one_or_none()
@staticmethod
async def list_roles(
db: AsyncSession,
user_id: Optional[int] = None,
is_global: Optional[bool] = None,
is_personal: Optional[bool] = None,
groups: Optional[List[str]] = None,
status: Optional[str] = None,
search: Optional[str] = None,
page: int = 1,
per_page: int = 10
) -> tuple[List[Role], int]:
"""
Список ролей с фильтрацией и пагинацией
Args:
db: Сессия БД
user_id: ID пользователя (для фильтрации личных ролей)
is_global: Фильтр по глобальным ролям
is_personal: Фильтр по личным ролям
groups: Список групп пользователя (для фильтрации групповых ролей)
status: Фильтр по статусу
search: Поиск по имени/описанию
page: Номер страницы
per_page: Количество на странице
Returns:
Кортеж (список ролей, общее количество)
"""
query = select(Role)
# Фильтры доступа
conditions = []
# Глобальные роли доступны всем
if is_global is None or is_global:
conditions.append(Role.is_global == True)
# Личные роли доступны только владельцу
if user_id:
conditions.append(
and_(
Role.is_personal == True,
Role.user_id == user_id
)
)
# Групповые роли доступны пользователям из соответствующих групп
if groups:
for group in groups:
conditions.append(
and_(
Role.is_global == False,
Role.is_personal == False,
Role.groups.contains([group])
)
)
if conditions:
query = query.where(or_(*conditions))
# Фильтр по статусу
if status:
query = query.where(Role.status == status)
else:
query = query.where(Role.status == "active")
# Поиск
if search:
search_pattern = f"%{search}%"
query = query.where(
or_(
Role.name.ilike(search_pattern),
Role.description.ilike(search_pattern)
)
)
# Подсчет общего количества
count_query = select(func.count()).select_from(query.subquery())
count_result = await db.execute(count_query)
total = count_result.scalar() or 0
# Пагинация
offset = (page - 1) * per_page
query = query.order_by(Role.name).offset(offset).limit(per_page)
result = await db.execute(query)
roles = result.scalars().all()
return roles, total
@staticmethod
async def create_role(
db: AsyncSession,
name: str,
template: str = "default",
description: str = "",
platforms: List[str] = None,
variables: List[Dict] = None,
is_global: bool = True,
is_personal: bool = False,
groups: Optional[List[str]] = None,
user_id: Optional[int] = None,
created_by: Optional[str] = None
) -> Role:
"""
Создание новой роли в БД
Args:
db: Сессия БД
name: Имя роли
template: Тип шаблона (default, service, package, config, etc.)
description: Описание роли
platforms: Список поддерживаемых платформ
variables: Список переменных для defaults/main.yml
is_global: Глобальная роль (доступна всем)
is_personal: Личная роль пользователя
groups: Список групп, которым доступна роль
user_id: ID пользователя-владельца (если is_personal=True)
created_by: Имя пользователя, создавшего роль
Returns:
Созданная роль
"""
# Проверка существования роли
existing = await RoleService.get_role_by_name(db, name)
if existing:
raise ValueError(f"Роль '{name}' уже существует")
# Генерация содержимого роли
role_content = RoleService._generate_role_content(name, template, variables or [])
# Генерация метаданных
author = "Сергей Антропов"
galaxy_info = {
"galaxy_info": {
"author": author,
"description": description or f"Роль {name}",
"platforms": RoleService._format_platforms(platforms or []),
"company": "https://devops.org.ru",
"license": "MIT",
"min_ansible_version": "2.9"
}
}
# Создание роли
role = Role(
name=name,
description=description or f"Роль {name}",
content=role_content,
is_global=is_global,
is_personal=is_personal,
groups=groups if groups else None,
user_id=user_id if is_personal else None,
author=author,
platforms=platforms if platforms else None,
galaxy_info=galaxy_info,
status="active",
created_by=created_by,
updated_by=created_by
)
db.add(role)
await db.commit()
await db.refresh(role)
# Экспорт роли в файловую систему для совместимости
await RoleService.export_role_to_filesystem(role)
return role
@staticmethod
async def update_role(
db: AsyncSession,
role_id: int,
name: Optional[str] = None,
description: Optional[str] = None,
content: Optional[Dict] = None,
is_global: Optional[bool] = None,
is_personal: Optional[bool] = None,
groups: Optional[List[str]] = None,
user_id: Optional[int] = None,
updated_by: Optional[str] = None
) -> Optional[Role]:
"""Обновление роли"""
role = await RoleService.get_role(db, role_id)
if not role:
return None
if name is not None:
# Проверка уникальности имени
existing = await RoleService.get_role_by_name(db, name)
if existing and existing.id != role_id:
raise ValueError(f"Роль '{name}' уже существует")
role.name = name
if description is not None:
role.description = description
if content is not None:
role.content = content
if is_global is not None:
role.is_global = is_global
if is_personal is not None:
role.is_personal = is_personal
if is_personal and user_id:
role.user_id = user_id
elif not is_personal:
role.user_id = None
if groups is not None:
role.groups = groups
if updated_by:
role.updated_by = updated_by
role.updated_at = datetime.utcnow()
await db.commit()
await db.refresh(role)
# Экспорт обновленной роли в файловую систему
await RoleService.export_role_to_filesystem(role)
return role
@staticmethod
async def update_role_file(
db: AsyncSession,
role_id: int,
file_path: str,
content: str,
updated_by: Optional[str] = None
) -> Optional[Role]:
"""Обновление конкретного файла роли"""
role = await RoleService.get_role(db, role_id)
if not role:
return None
# Обновляем содержимое роли
role_content = role.content if isinstance(role.content, dict) else {}
role_content[file_path] = content
role.content = role_content
if updated_by:
role.updated_by = updated_by
role.updated_at = datetime.utcnow()
await db.commit()
await db.refresh(role)
# Экспорт обновленной роли в файловую систему
await RoleService.export_role_to_filesystem(role)
return role
@staticmethod
async def delete_role(db: AsyncSession, role_id: int) -> bool:
"""Удаление роли"""
role = await RoleService.get_role(db, role_id)
if not role:
return False
await db.delete(role)
await db.commit()
# Удаление из файловой системы
role_dir = RoleService().roles_dir_alembic / role.name
if role_dir.exists():
import shutil
shutil.rmtree(role_dir)
return True
@staticmethod
async def export_role_to_filesystem(role: Role) -> Path:
"""
Экспорт роли из БД в файловую систему
Args:
role: Роль из БД
Returns:
Путь к директории роли
"""
service = RoleService()
role_dir = service.roles_dir_alembic / role.name
role_dir.mkdir(exist_ok=True)
# Записываем все файлы роли
role_content = role.content if isinstance(role.content, dict) else {}
for file_path, content in role_content.items():
target_file = role_dir / file_path
target_file.parent.mkdir(parents=True, exist_ok=True)
target_file.write_text(content, encoding='utf-8')
return role_dir
@staticmethod
def _generate_role_content(role_name: str, template: str, variables: List[Dict]) -> Dict[str, str]:
"""Генерация содержимого роли в виде словаря {file_path: content}"""
content = {}
# Tasks
content["tasks/main.yml"] = RoleService._generate_tasks_content(role_name, template)
# Defaults
content["defaults/main.yml"] = RoleService._generate_defaults_content(role_name, variables)
# Handlers
content["handlers/main.yml"] = RoleService._generate_handlers_content(role_name)
# Meta
content["meta/main.yml"] = RoleService._generate_meta_content(role_name, "", [])
# README
content["README.md"] = RoleService._generate_readme_content(role_name, "")
return content
@staticmethod
def _generate_tasks_content(role_name: str, template: str) -> str:
"""Генерация содержимого tasks/main.yml"""
base_content = f"""---
# Задачи для роли {role_name}
# Автор: Сергей Антропов
# Сайт: https://devops.org.ru
- name: Пример задачи для роли {role_name}
debug:
msg: "Роль {role_name} выполнена"
"""
templates_content = {
"service": f"""---
# Задачи для роли {role_name} (сервис)
# Автор: Сергей Антропов
# Сайт: https://devops.org.ru
- name: Установка пакетов
package:
name: "{{{{ {role_name}_packages | default([]) }}}}"
state: present
when: {role_name}_enabled | default(true)
- name: Настройка конфигурации
template:
src: {role_name}.conf.j2
dest: /etc/{role_name}/{role_name}.conf
notify: restart {role_name}
- name: Запуск сервиса
systemd:
name: {role_name}
enabled: true
state: started
when: {role_name}_enabled | default(true)
""",
"package": f"""---
# Задачи для роли {role_name} (пакеты)
# Автор: Сергей Антропов
# Сайт: https://devops.org.ru
- name: Установка пакетов
package:
name: "{{{{ {role_name}_packages }}}}"
state: present
when: {role_name}_enabled | default(true)
""",
"config": f"""---
# Задачи для роли {role_name} (конфигурация)
# Автор: Сергей Антропов
# Сайт: https://devops.org.ru
- name: Создание директорий
file:
path: "{{{{ item }}}}"
state: directory
mode: '0755'
loop: "{{{{ {role_name}_directories | default([]) }}}}"
when: {role_name}_enabled | default(true)
- name: Копирование конфигурационных файлов
copy:
src: "{{{{ item.src }}}}"
dest: "{{{{ item.dest }}}}"
mode: '0644'
loop: "{{{{ {role_name}_config_files | default([]) }}}}"
when: {role_name}_enabled | default(true)
"""
}
return templates_content.get(template, base_content)
@staticmethod
def _generate_defaults_content(role_name: str, variables: List[Dict]) -> str:
"""Генерация содержимого defaults/main.yml"""
content = f"""---
# Переменные по умолчанию для роли {role_name}
# Автор: Сергей Антропов
# Сайт: https://devops.org.ru
{role_name}_enabled: true
"""
for var in variables:
var_name = var.get("name", "")
var_value = var.get("value", "")
var_type = var.get("type", "string")
if var_type == "bool":
content += f"{var_name}: {var_value.lower()}\n"
elif var_type == "int":
content += f"{var_name}: {var_value}\n"
elif var_type == "list":
content += f"{var_name}: []\n"
elif var_type == "dict":
content += f"{var_name}: {{}}\n"
else:
content += f"{var_name}: \"{var_value}\"\n"
return content
@staticmethod
def _generate_handlers_content(role_name: str) -> str:
"""Генерация содержимого handlers/main.yml"""
return f"""---
# Обработчики для роли {role_name}
# Автор: Сергей Антропов
# Сайт: https://devops.org.ru
- name: restart {role_name}
systemd:
name: {role_name}
state: restarted
when: ansible_facts['service_mgr'] == 'systemd'
"""
@staticmethod
def _generate_meta_content(role_name: str, description: str, platforms: List[str]) -> str:
"""Генерация содержимого meta/main.yml"""
platform_map = {
"ubuntu": {"name": "Ubuntu", "versions": ["focal", "jammy", "noble"]},
"debian": {"name": "Debian", "versions": ["bullseye", "bookworm"]},
"centos": {"name": "CentOS", "versions": ["8", "9"]},
"rhel": {"name": "RHEL", "versions": ["8", "9"]},
"almalinux": {"name": "AlmaLinux", "versions": ["8", "9"]},
"rocky": {"name": "Rocky", "versions": ["8", "9"]},
}
platforms_yaml = []
for platform in platforms:
if platform in platform_map:
platforms_yaml.append(platform_map[platform])
if not platforms_yaml:
platforms_yaml = [
{"name": "Ubuntu", "versions": ["focal", "jammy"]},
{"name": "Debian", "versions": ["bullseye", "bookworm"]},
]
return f"""---
galaxy_info:
author: Сергей Антропов
description: {description or f"Роль {role_name}"}
company: https://devops.org.ru
license: MIT
min_ansible_version: "2.9"
platforms:
{yaml.dump(platforms_yaml, default_flow_style=False, indent=4, allow_unicode=True)}
galaxy_tags:
- {role_name}
dependencies: []
"""
@staticmethod
def _generate_readme_content(role_name: str, description: str) -> str:
"""Генерация содержимого README.md"""
return f"""# Роль {role_name}
{description or f"Роль для настройки и конфигурации {role_name}."}
## Описание
{description or "Описание роли"}
## Требования
- Ansible >= 2.9
- Поддерживаемые ОС: Ubuntu, Debian, CentOS, RHEL, AlmaLinux, Rocky Linux
## Переменные
| Переменная | По умолчанию | Описание |
|------------|--------------|----------|
| `{role_name}_enabled` | `true` | Включить роль |
## Примеры использования
```yaml
- hosts: all
roles:
- {role_name}
```
## Автор
Сергей Антропов - https://devops.org.ru
"""
@staticmethod
def _format_platforms(platforms: List[str]) -> List[Dict]:
"""Форматирование списка платформ для Galaxy"""
platform_map = {
"ubuntu": {"name": "Ubuntu", "versions": ["focal", "jammy", "noble"]},
"debian": {"name": "Debian", "versions": ["bullseye", "bookworm"]},
"centos": {"name": "CentOS", "versions": ["8", "9"]},
"rhel": {"name": "RHEL", "versions": ["8", "9"]},
"almalinux": {"name": "AlmaLinux", "versions": ["8", "9"]},
"rocky": {"name": "Rocky", "versions": ["8", "9"]},
}
result = []
for platform in platforms:
if platform in platform_map:
result.append(platform_map[platform])
return result if result else [
{"name": "Ubuntu", "versions": ["focal", "jammy"]},
{"name": "Debian", "versions": ["bullseye", "bookworm"]},
]