- Добавлена колонка 'Тип' во все таблицы истории сборок - Для push операций отображается registry вместо платформ - Сохранение пользователя при создании push лога - Исправлена ошибка с logger в push_docker_image endpoint - Улучшено отображение истории сборок с визуальными индикаторами
437 lines
15 KiB
Python
437 lines
15 KiB
Python
"""
|
||
Celery задачи для фонового выполнения
|
||
Автор: Сергей Антропов
|
||
Сайт: https://devops.org.ru
|
||
"""
|
||
|
||
from celery import Celery
|
||
from app.core.config import settings
|
||
from typing import Optional
|
||
|
||
# Создание Celery приложения
|
||
celery_app = Celery(
|
||
"devopslab",
|
||
broker=settings.REDIS_URL,
|
||
backend=settings.REDIS_URL
|
||
)
|
||
|
||
# Конфигурация Celery
|
||
celery_app.conf.update(
|
||
task_serializer='json',
|
||
accept_content=['json'],
|
||
result_serializer='json',
|
||
timezone='UTC',
|
||
enable_utc=True,
|
||
task_track_started=True,
|
||
task_time_limit=3600, # 1 час максимум
|
||
task_soft_time_limit=3300, # 55 минут мягкий лимит
|
||
)
|
||
|
||
|
||
@celery_app.task(bind=True, name="devopslab.run_role_test")
|
||
def run_role_test(self, role_name: str, preset: str = "default"):
|
||
"""
|
||
Запуск теста роли в фоне
|
||
|
||
Args:
|
||
role_name: Имя роли
|
||
preset: Preset для тестирования
|
||
|
||
Returns:
|
||
Результат выполнения теста
|
||
"""
|
||
from app.core.make_executor import MakeExecutor
|
||
|
||
executor = MakeExecutor()
|
||
result = executor.execute(f"role test {preset}")
|
||
|
||
# Обновление статуса задачи
|
||
self.update_state(
|
||
state='SUCCESS' if result['success'] else 'FAILURE',
|
||
meta=result
|
||
)
|
||
|
||
return result
|
||
|
||
|
||
@celery_app.task(bind=True, name="devopslab.run_role_deploy")
|
||
def run_role_deploy(self, role_name: str, inventory: str, variables: dict = None):
|
||
"""
|
||
Запуск деплоя роли в фоне
|
||
|
||
Args:
|
||
role_name: Имя роли
|
||
inventory: Путь к inventory файлу
|
||
variables: Переменные для деплоя
|
||
|
||
Returns:
|
||
Результат выполнения деплоя
|
||
"""
|
||
from app.core.make_executor import MakeExecutor
|
||
|
||
executor = MakeExecutor()
|
||
# TODO: Реализовать деплой через make
|
||
result = executor.execute(f"role deploy")
|
||
|
||
self.update_state(
|
||
state='SUCCESS' if result['success'] else 'FAILURE',
|
||
meta=result
|
||
)
|
||
|
||
return result
|
||
|
||
|
||
@celery_app.task(bind=True, name="devopslab.build_docker_image")
|
||
def build_docker_image(self, image_name: str):
|
||
"""
|
||
Сборка Docker образа в фоне (старая версия через MakeExecutor)
|
||
|
||
Args:
|
||
image_name: Имя образа для сборки
|
||
|
||
Returns:
|
||
Результат сборки
|
||
"""
|
||
from app.core.make_executor import MakeExecutor
|
||
|
||
executor = MakeExecutor()
|
||
result = executor.execute(f"docker build-image IMAGE={image_name}")
|
||
|
||
self.update_state(
|
||
state='SUCCESS' if result['success'] else 'FAILURE',
|
||
meta=result
|
||
)
|
||
|
||
return result
|
||
|
||
|
||
@celery_app.task(bind=True, name="devopslab.build_dockerfile")
|
||
def build_dockerfile(
|
||
self,
|
||
build_log_id: int,
|
||
dockerfile_id: int = None, # ID Dockerfile в БД (приоритет)
|
||
dockerfile_content: str = None, # Для обратной совместимости
|
||
image_name: str = None,
|
||
tag: str = "latest",
|
||
platforms: list = None,
|
||
no_cache: bool = False
|
||
):
|
||
"""
|
||
Запуск сборки Dockerfile через Docker Builder API
|
||
|
||
Args:
|
||
build_log_id: ID записи лога в БД
|
||
dockerfile_content: Содержимое Dockerfile
|
||
image_name: Имя образа
|
||
tag: Тег образа
|
||
platforms: Список платформ для сборки
|
||
no_cache: Флаг сборки без кеша
|
||
|
||
Returns:
|
||
Результат сборки
|
||
"""
|
||
import asyncio
|
||
import tempfile
|
||
import os
|
||
import json
|
||
from datetime import datetime
|
||
from app.db.session import get_async_db
|
||
from app.models.database import DockerfileBuildLog
|
||
from app.services.docker_builder_service import DockerBuilderService
|
||
from app.core.config import settings
|
||
from pathlib import Path
|
||
|
||
if platforms is None:
|
||
platforms = ["linux/amd64"]
|
||
|
||
# Инициализируем builder service
|
||
builder_service = DockerBuilderService(base_url=settings.DOCKER_BUILDER_URL)
|
||
|
||
# Формируем webhook URL для получения логов
|
||
# В production это должен быть реальный URL приложения
|
||
webhook_url = f"{settings.API_HOST.replace('0.0.0.0', 'web')}:{settings.API_PORT}/api/v1/dockerfiles/build-logs/webhook"
|
||
# Для локальной разработки используем внутренний URL
|
||
if "localhost" in str(settings.API_HOST) or "127.0.0.1" in str(settings.API_HOST):
|
||
webhook_url = f"http://web:8000/api/v1/dockerfiles/build-logs/webhook"
|
||
else:
|
||
webhook_url = f"http://web:8000/api/v1/dockerfiles/build-logs/webhook"
|
||
|
||
# Запускаем сборку через Builder API
|
||
# Передаем dockerfile_id - builder сам получит содержимое из БД
|
||
async def start_build():
|
||
try:
|
||
result = await builder_service.start_build(
|
||
build_log_id=build_log_id,
|
||
dockerfile_id=dockerfile_id, # Передаем ID - builder получит содержимое из БД
|
||
dockerfile_content=dockerfile_content, # Для обратной совместимости
|
||
image_name=image_name,
|
||
tag=tag,
|
||
platforms=platforms,
|
||
no_cache=no_cache,
|
||
webhook_url=webhook_url
|
||
)
|
||
return result
|
||
except Exception as e:
|
||
import logging
|
||
logging.error(f"Error starting build via Builder API: {e}", exc_info=True)
|
||
# Обновляем статус в БД через async функцию
|
||
async def update_db_error():
|
||
async for db in get_async_db():
|
||
from sqlalchemy import update
|
||
build_log = await db.get(DockerfileBuildLog, build_log_id)
|
||
current_logs = (build_log.logs or "") if build_log else ""
|
||
await db.execute(
|
||
update(DockerfileBuildLog)
|
||
.where(DockerfileBuildLog.id == build_log_id)
|
||
.values(
|
||
status="failed",
|
||
finished_at=datetime.utcnow(),
|
||
returncode=-1,
|
||
logs=current_logs + f"\n❌ Ошибка запуска сборки: {e}"
|
||
)
|
||
)
|
||
await db.commit()
|
||
break
|
||
|
||
# Вызываем async функцию через event loop
|
||
try:
|
||
loop = asyncio.get_event_loop()
|
||
if loop.is_closed():
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
except RuntimeError:
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
|
||
try:
|
||
loop.run_until_complete(update_db_error())
|
||
except Exception as db_error:
|
||
logging.error(f"Failed to update build log in DB: {db_error}", exc_info=True)
|
||
|
||
raise
|
||
|
||
# Запускаем сборку
|
||
try:
|
||
# Создаем новый event loop для Celery worker
|
||
try:
|
||
loop = asyncio.get_event_loop()
|
||
if loop.is_closed():
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
except RuntimeError:
|
||
# Если нет event loop, создаем новый
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
|
||
import logging
|
||
logging.info(f"Starting build for build_log_id={build_log_id}, image={image_name}:{tag}")
|
||
|
||
result = loop.run_until_complete(start_build())
|
||
|
||
logging.info(f"Build started successfully, result: {result}")
|
||
|
||
return {
|
||
"success": True,
|
||
"build_id": result.get("build_id") if result else None,
|
||
"image_name": image_name,
|
||
"tag": tag,
|
||
"full_image_name": f"{image_name}:{tag}"
|
||
}
|
||
except Exception as e:
|
||
import logging
|
||
error_msg = f"Ошибка при запуске сборки: {str(e)}"
|
||
logging.error(error_msg, exc_info=True)
|
||
|
||
# Обновляем статус в БД через async функцию
|
||
async def update_db_error():
|
||
async for db in get_async_db():
|
||
from sqlalchemy import update
|
||
build_log = await db.get(DockerfileBuildLog, build_log_id)
|
||
current_logs = (build_log.logs or "") if build_log else ""
|
||
await db.execute(
|
||
update(DockerfileBuildLog)
|
||
.where(DockerfileBuildLog.id == build_log_id)
|
||
.values(
|
||
status="failed",
|
||
finished_at=datetime.utcnow(),
|
||
returncode=-1,
|
||
logs=current_logs + f"\n❌ {error_msg}"
|
||
)
|
||
)
|
||
await db.commit()
|
||
break
|
||
|
||
# Вызываем async функцию через event loop
|
||
try:
|
||
try:
|
||
loop = asyncio.get_event_loop()
|
||
if loop.is_closed():
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
except RuntimeError:
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
|
||
loop.run_until_complete(update_db_error())
|
||
except Exception as db_error:
|
||
logging.error(f"Failed to update build log in DB: {db_error}", exc_info=True)
|
||
|
||
# Возвращаем ошибку, но не выбрасываем исключение, чтобы Celery не пометил задачу как failed
|
||
return {
|
||
"success": False,
|
||
"error": error_msg,
|
||
"build_log_id": build_log_id
|
||
}
|
||
|
||
|
||
@celery_app.task(bind=True, name="devopslab.run_playbook_test")
|
||
def run_playbook_test(self, playbook_id: int, preset: str = "default", test_run_id: int = None):
|
||
"""
|
||
Запуск тестирования playbook в фоне
|
||
|
||
Args:
|
||
playbook_id: ID playbook
|
||
preset: Preset для тестирования
|
||
test_run_id: ID записи о тесте в БД
|
||
|
||
Returns:
|
||
Результат выполнения теста
|
||
"""
|
||
from app.core.make_executor import MakeExecutor
|
||
from app.db.session import get_async_db
|
||
from app.services.playbook_service import PlaybookService
|
||
from app.core.config import settings
|
||
import asyncio
|
||
import tempfile
|
||
import yaml
|
||
|
||
executor = MakeExecutor()
|
||
|
||
# Получаем playbook из БД
|
||
async def get_playbook():
|
||
async for db in get_async_db():
|
||
playbook = await PlaybookService.get_playbook(db, playbook_id)
|
||
return playbook
|
||
|
||
playbook = asyncio.run(get_playbook())
|
||
if not playbook:
|
||
return {"success": False, "error": "Playbook не найден"}
|
||
|
||
# Создаем временный файл playbook
|
||
with tempfile.NamedTemporaryFile(mode='w', suffix='.yml', delete=False) as f:
|
||
f.write(playbook.content)
|
||
temp_playbook_path = f.name
|
||
|
||
try:
|
||
# Запускаем тест через molecule с временным playbook
|
||
# TODO: Адаптировать команду make для работы с playbook
|
||
result = executor.execute(f"role test {preset}")
|
||
|
||
# Обновляем статус теста в БД
|
||
async def update_test_run():
|
||
async for db in get_async_db():
|
||
await PlaybookService.save_test_run(
|
||
db=db,
|
||
playbook_id=playbook_id,
|
||
preset_name=preset,
|
||
status="success" if result.get('success') else "failed",
|
||
output=result.get('output'),
|
||
error=result.get('error'),
|
||
returncode=result.get('returncode')
|
||
)
|
||
|
||
if test_run_id:
|
||
asyncio.run(update_test_run())
|
||
|
||
self.update_state(
|
||
state='SUCCESS' if result.get('success') else 'FAILURE',
|
||
meta=result
|
||
)
|
||
|
||
return result
|
||
finally:
|
||
import os
|
||
if os.path.exists(temp_playbook_path):
|
||
os.unlink(temp_playbook_path)
|
||
|
||
|
||
@celery_app.task(bind=True, name="devopslab.run_playbook_deploy")
|
||
def run_playbook_deploy(
|
||
self,
|
||
playbook_id: int,
|
||
inventory: str,
|
||
limit: Optional[str] = None,
|
||
tags: Optional[str] = None,
|
||
check: bool = False,
|
||
deployment_id: int = None
|
||
):
|
||
"""
|
||
Запуск деплоя playbook в фоне
|
||
|
||
Args:
|
||
playbook_id: ID playbook
|
||
inventory: Путь к inventory файлу или содержимое
|
||
limit: Лимит хостов
|
||
tags: Теги для выполнения
|
||
check: Dry-run режим
|
||
deployment_id: ID записи о деплое в БД
|
||
|
||
Returns:
|
||
Результат выполнения деплоя
|
||
"""
|
||
from app.core.make_executor import MakeExecutor
|
||
from app.db.session import get_async_db
|
||
from app.services.playbook_service import PlaybookService
|
||
from app.core.config import settings
|
||
import asyncio
|
||
import tempfile
|
||
import os
|
||
|
||
executor = MakeExecutor()
|
||
|
||
# Получаем playbook из БД
|
||
async def get_playbook():
|
||
async for db in get_async_db():
|
||
playbook = await PlaybookService.get_playbook(db, playbook_id)
|
||
return playbook
|
||
|
||
playbook = asyncio.run(get_playbook())
|
||
if not playbook:
|
||
return {"success": False, "error": "Playbook не найден"}
|
||
|
||
# Создаем временный файл playbook
|
||
with tempfile.NamedTemporaryFile(mode='w', suffix='.yml', delete=False) as f:
|
||
f.write(playbook.content)
|
||
temp_playbook_path = f.name
|
||
|
||
try:
|
||
# Запускаем деплой через make
|
||
# TODO: Адаптировать команду make для работы с playbook
|
||
result = executor.execute(f"role deploy")
|
||
|
||
# Обновляем статус деплоя в БД
|
||
async def update_deployment():
|
||
async for db in get_async_db():
|
||
await PlaybookService.save_deployment(
|
||
db=db,
|
||
playbook_id=playbook_id,
|
||
inventory=inventory,
|
||
status="success" if result.get('success') else "failed",
|
||
output=result.get('output'),
|
||
error=result.get('error'),
|
||
returncode=result.get('returncode')
|
||
)
|
||
|
||
if deployment_id:
|
||
asyncio.run(update_deployment())
|
||
|
||
self.update_state(
|
||
state='SUCCESS' if result.get('success') else 'FAILURE',
|
||
meta=result
|
||
)
|
||
|
||
return result
|
||
finally:
|
||
if os.path.exists(temp_playbook_path):
|
||
os.unlink(temp_playbook_path)
|