""" Сервис для работы с Ansible Vault Автор: Сергей Антропов Сайт: https://devops.org.ru """ import subprocess from pathlib import Path from typing import Dict, Optional from app.core.config import settings import logging logger = logging.getLogger(__name__) class VaultService: """Сервис для работы с Ansible Vault""" def __init__(self): self.project_root = settings.PROJECT_ROOT self.vault_dir = self.project_root / "vault" self.vault_password_file = self.vault_dir / ".vault" def _get_vault_password(self) -> Optional[str]: """Получение пароля Vault из файла""" if self.vault_password_file.exists(): try: return self.vault_password_file.read_text().strip() except Exception as e: logger.error(f"Ошибка чтения пароля Vault: {e}") return None def encrypt_string(self, plaintext: str, vault_id: str = "default") -> Dict: """ Шифрование строки через Ansible Vault Args: plaintext: Текст для шифрования vault_id: ID vault (по умолчанию default) Returns: Результат шифрования """ password = self._get_vault_password() if not password: raise ValueError("Пароль Vault не найден. Инициализируйте Vault сначала.") try: # Создаем временный файл с паролем import tempfile with tempfile.NamedTemporaryFile(mode='w', delete=False) as f: f.write(password) temp_password_file = f.name try: # Шифруем строку result = subprocess.run( [ "ansible-vault", "encrypt_string", "--vault-password-file", temp_password_file, "--vault-id", vault_id, plaintext ], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: encrypted = result.stdout.strip() return { "success": True, "encrypted": encrypted, "plaintext": plaintext } else: raise ValueError(f"Ошибка шифрования: {result.stderr}") finally: # Удаляем временный файл Path(temp_password_file).unlink() except Exception as e: logger.error(f"Ошибка при шифровании: {e}") raise ValueError(f"Ошибка шифрования: {str(e)}") def decrypt_string(self, encrypted: str) -> Dict: """ Расшифровка строки из Ansible Vault Args: encrypted: Зашифрованный текст Returns: Результат расшифровки """ password = self._get_vault_password() if not password: raise ValueError("Пароль Vault не найден") try: import tempfile with tempfile.NamedTemporaryFile(mode='w', delete=False) as f: f.write(password) temp_password_file = f.name try: # Создаем временный файл с зашифрованным текстом with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.yml') as f: f.write(encrypted) temp_vault_file = f.name try: # Расшифровываем result = subprocess.run( [ "ansible-vault", "decrypt", "--vault-password-file", temp_password_file, temp_vault_file ], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: decrypted = Path(temp_vault_file).read_text().strip() # Удаляем маркеры Vault decrypted = decrypted.replace("!vault |", "").strip() return { "success": True, "decrypted": decrypted } else: raise ValueError(f"Ошибка расшифровки: {result.stderr}") finally: Path(temp_vault_file).unlink() finally: Path(temp_password_file).unlink() except Exception as e: logger.error(f"Ошибка при расшифровке: {e}") raise ValueError(f"Ошибка расшифровки: {str(e)}") def view_vault_file(self, file_path: Path) -> str: """ Просмотр содержимого Vault файла Args: file_path: Путь к Vault файлу Returns: Расшифрованное содержимое """ password = self._get_vault_password() if not password: raise ValueError("Пароль Vault не найден") try: import tempfile with tempfile.NamedTemporaryFile(mode='w', delete=False) as f: f.write(password) temp_password_file = f.name try: result = subprocess.run( [ "ansible-vault", "view", "--vault-password-file", temp_password_file, str(file_path) ], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: return result.stdout else: raise ValueError(f"Ошибка просмотра файла: {result.stderr}") finally: Path(temp_password_file).unlink() except Exception as e: logger.error(f"Ошибка при просмотре файла: {e}") raise ValueError(f"Ошибка просмотра: {str(e)}") def edit_vault_file(self, file_path: Path, content: str) -> Dict: """ Редактирование Vault файла Args: file_path: Путь к Vault файлу content: Новое содержимое Returns: Результат редактирования """ password = self._get_vault_password() if not password: raise ValueError("Пароль Vault не найден") try: # Создаем временный файл с новым содержимым import tempfile with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.yml') as f: f.write(content) temp_content_file = f.name try: with tempfile.NamedTemporaryFile(mode='w', delete=False) as f: f.write(password) temp_password_file = f.name try: # Шифруем содержимое result = subprocess.run( [ "ansible-vault", "encrypt", "--vault-password-file", temp_password_file, temp_content_file ], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: # Копируем зашифрованный файл encrypted_content = Path(temp_content_file).read_text() file_path.write_text(encrypted_content) return { "success": True, "message": "Файл успешно обновлен" } else: raise ValueError(f"Ошибка шифрования: {result.stderr}") finally: Path(temp_password_file).unlink() finally: Path(temp_content_file).unlink() except Exception as e: logger.error(f"Ошибка при редактировании файла: {e}") raise ValueError(f"Ошибка редактирования: {str(e)}") def is_vault_encrypted(self, content: str) -> bool: """Проверка, зашифрован ли контент через Vault""" return "$ANSIBLE_VAULT" in content or "!vault |" in content