Files
DevOpsLab/app/services/vault_service.py
Сергей Антропов 1fbf9185a2 feat: добавлена пометка типа операции (Build/Push) в истории сборок Dockerfile
- Добавлена колонка 'Тип' во все таблицы истории сборок
- Для push операций отображается registry вместо платформ
- Сохранение пользователя при создании push лога
- Исправлена ошибка с logger в push_docker_image endpoint
- Улучшено отображение истории сборок с визуальными индикаторами
2026-02-15 22:59:02 +03:00

249 lines
9.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Сервис для работы с Ansible Vault
Автор: Сергей Антропов
Сайт: https://devops.org.ru
"""
import subprocess
from pathlib import Path
from typing import Dict, Optional
from app.core.config import settings
import logging
logger = logging.getLogger(__name__)
class VaultService:
"""Сервис для работы с Ansible Vault"""
def __init__(self):
self.project_root = settings.PROJECT_ROOT
self.vault_dir = self.project_root / "vault"
self.vault_password_file = self.vault_dir / ".vault"
def _get_vault_password(self) -> Optional[str]:
"""Получение пароля Vault из файла"""
if self.vault_password_file.exists():
try:
return self.vault_password_file.read_text().strip()
except Exception as e:
logger.error(f"Ошибка чтения пароля Vault: {e}")
return None
def encrypt_string(self, plaintext: str, vault_id: str = "default") -> Dict:
"""
Шифрование строки через Ansible Vault
Args:
plaintext: Текст для шифрования
vault_id: ID vault (по умолчанию default)
Returns:
Результат шифрования
"""
password = self._get_vault_password()
if not password:
raise ValueError("Пароль Vault не найден. Инициализируйте Vault сначала.")
try:
# Создаем временный файл с паролем
import tempfile
with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
f.write(password)
temp_password_file = f.name
try:
# Шифруем строку
result = subprocess.run(
[
"ansible-vault",
"encrypt_string",
"--vault-password-file", temp_password_file,
"--vault-id", vault_id,
plaintext
],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
encrypted = result.stdout.strip()
return {
"success": True,
"encrypted": encrypted,
"plaintext": plaintext
}
else:
raise ValueError(f"Ошибка шифрования: {result.stderr}")
finally:
# Удаляем временный файл
Path(temp_password_file).unlink()
except Exception as e:
logger.error(f"Ошибка при шифровании: {e}")
raise ValueError(f"Ошибка шифрования: {str(e)}")
def decrypt_string(self, encrypted: str) -> Dict:
"""
Расшифровка строки из Ansible Vault
Args:
encrypted: Зашифрованный текст
Returns:
Результат расшифровки
"""
password = self._get_vault_password()
if not password:
raise ValueError("Пароль Vault не найден")
try:
import tempfile
with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
f.write(password)
temp_password_file = f.name
try:
# Создаем временный файл с зашифрованным текстом
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.yml') as f:
f.write(encrypted)
temp_vault_file = f.name
try:
# Расшифровываем
result = subprocess.run(
[
"ansible-vault",
"decrypt",
"--vault-password-file", temp_password_file,
temp_vault_file
],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
decrypted = Path(temp_vault_file).read_text().strip()
# Удаляем маркеры Vault
decrypted = decrypted.replace("!vault |", "").strip()
return {
"success": True,
"decrypted": decrypted
}
else:
raise ValueError(f"Ошибка расшифровки: {result.stderr}")
finally:
Path(temp_vault_file).unlink()
finally:
Path(temp_password_file).unlink()
except Exception as e:
logger.error(f"Ошибка при расшифровке: {e}")
raise ValueError(f"Ошибка расшифровки: {str(e)}")
def view_vault_file(self, file_path: Path) -> str:
"""
Просмотр содержимого Vault файла
Args:
file_path: Путь к Vault файлу
Returns:
Расшифрованное содержимое
"""
password = self._get_vault_password()
if not password:
raise ValueError("Пароль Vault не найден")
try:
import tempfile
with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
f.write(password)
temp_password_file = f.name
try:
result = subprocess.run(
[
"ansible-vault",
"view",
"--vault-password-file", temp_password_file,
str(file_path)
],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
return result.stdout
else:
raise ValueError(f"Ошибка просмотра файла: {result.stderr}")
finally:
Path(temp_password_file).unlink()
except Exception as e:
logger.error(f"Ошибка при просмотре файла: {e}")
raise ValueError(f"Ошибка просмотра: {str(e)}")
def edit_vault_file(self, file_path: Path, content: str) -> Dict:
"""
Редактирование Vault файла
Args:
file_path: Путь к Vault файлу
content: Новое содержимое
Returns:
Результат редактирования
"""
password = self._get_vault_password()
if not password:
raise ValueError("Пароль Vault не найден")
try:
# Создаем временный файл с новым содержимым
import tempfile
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.yml') as f:
f.write(content)
temp_content_file = f.name
try:
with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
f.write(password)
temp_password_file = f.name
try:
# Шифруем содержимое
result = subprocess.run(
[
"ansible-vault",
"encrypt",
"--vault-password-file", temp_password_file,
temp_content_file
],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
# Копируем зашифрованный файл
encrypted_content = Path(temp_content_file).read_text()
file_path.write_text(encrypted_content)
return {
"success": True,
"message": "Файл успешно обновлен"
}
else:
raise ValueError(f"Ошибка шифрования: {result.stderr}")
finally:
Path(temp_password_file).unlink()
finally:
Path(temp_content_file).unlink()
except Exception as e:
logger.error(f"Ошибка при редактировании файла: {e}")
raise ValueError(f"Ошибка редактирования: {str(e)}")
def is_vault_encrypted(self, content: str) -> bool:
"""Проверка, зашифрован ли контент через Vault"""
return "$ANSIBLE_VAULT" in content or "!vault |" in content