""" Сервис для проверки синтаксиса ролей (ansible-lint) Автор: Сергей Антропов Сайт: https://devops.org.ru """ import asyncio import subprocess from pathlib import Path from typing import Optional, AsyncGenerator, Dict, List from datetime import datetime from app.core.config import settings class LintService: """Проверка синтаксиса ролей через ansible-lint""" def __init__(self): self.project_root = settings.PROJECT_ROOT self.roles_dir = self.project_root / "roles" self.lint_config = self.project_root / ".ansible-lint" async def lint_role( self, role_name: Optional[str] = None, stream: bool = False ) -> AsyncGenerator[str, None]: """ Проверка синтаксиса роли через ansible-lint Args: role_name: Имя роли (опционально, если None - проверяются все роли) stream: Если True, возвращает генератор строк Yields: Строки вывода команды """ # Расшифровка vault файлов перед линтингом yield "🔓 Расшифровка vault файлов...\n" await self._decrypt_vault_files() if role_name: role_path = self.roles_dir / role_name if not role_path.exists(): yield f"❌ Роль '{role_name}' не найдена\n" return yield f"🔍 Проверка синтаксиса роли: {role_name}\n" cmd = [ "ansible-lint", str(role_path), "--config-file", str(self.lint_config) ] else: yield "🔍 Проверка синтаксиса всех ролей...\n" cmd = [ "ansible-lint", str(self.roles_dir), "--config-file", str(self.lint_config) ] # Запуск в Docker контейнере для изоляции docker_cmd = [ "docker", "run", "--rm", "--name", f"ansible-lint-{datetime.now().strftime('%Y%m%d%H%M%S')}", "-v", f"{self.project_root}:/workspace", "-w", "/workspace", "-e", "ANSIBLE_FORCE_COLOR=1", "inecs/ansible-lab:ansible-controller-latest", "bash", "-c", " ".join(cmd) + " || true" ] process = await asyncio.create_subprocess_exec( *docker_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, cwd=str(self.project_root) ) async for line in process.stdout: yield line.decode('utf-8', errors='replace') await process.wait() # Шифрование vault файлов после линтинга yield "\n🔒 Шифрование vault файлов...\n" await self._encrypt_vault_files() if process.returncode == 0: yield "\n✅ Линтинг завершен успешно\n" else: yield f"\n⚠️ Линтинг завершен с предупреждениями (код: {process.returncode})\n" async def _decrypt_vault_files(self) -> bool: """Расшифровка vault файлов""" vault_dir = self.project_root / "vault" if not vault_dir.exists(): return True vault_password_file = vault_dir / ".vault" if not vault_password_file.exists(): return True vault_files = list(vault_dir.glob("*.yml")) if not vault_files: return True try: for vault_file in vault_files: with open(vault_file, 'rb') as f: content = f.read(100) if b'$ANSIBLE_VAULT' not in content: continue cmd = [ "ansible-vault", "decrypt", str(vault_file), "--vault-password-file", str(vault_password_file) ] result = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=str(self.project_root) ) await result.wait() return True except Exception: return False async def _encrypt_vault_files(self) -> bool: """Шифрование vault файлов""" vault_dir = self.project_root / "vault" if not vault_dir.exists(): return True vault_password_file = vault_dir / ".vault" if not vault_password_file.exists(): return True vault_files = list(vault_dir.glob("*.yml")) if not vault_files: return True try: for vault_file in vault_files: with open(vault_file, 'rb') as f: content = f.read(100) if b'$ANSIBLE_VAULT' in content: continue cmd = [ "ansible-vault", "encrypt", str(vault_file), "--vault-password-file", str(vault_password_file) ] result = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=str(self.project_root) ) await result.wait() return True except Exception: return False def detect_log_level(self, line: str) -> str: """Определение уровня лога из строки""" line_lower = line.lower() if any(word in line_lower for word in ["error", "failed", "fatal"]): return "error" elif any(word in line_lower for word in ["warning", "warn"]): return "warning" elif any(word in line_lower for word in ["passed", "ok"]): return "info" else: return "debug"