Files
DevOpsLab/app/tasks/celery_tasks.py
Сергей Антропов 1fbf9185a2 feat: добавлена пометка типа операции (Build/Push) в истории сборок Dockerfile
- Добавлена колонка 'Тип' во все таблицы истории сборок
- Для push операций отображается registry вместо платформ
- Сохранение пользователя при создании push лога
- Исправлена ошибка с logger в push_docker_image endpoint
- Улучшено отображение истории сборок с визуальными индикаторами
2026-02-15 22:59:02 +03:00

437 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Celery задачи для фонового выполнения
Автор: Сергей Антропов
Сайт: https://devops.org.ru
"""
from celery import Celery
from app.core.config import settings
from typing import Optional
# Создание Celery приложения
celery_app = Celery(
"devopslab",
broker=settings.REDIS_URL,
backend=settings.REDIS_URL
)
# Конфигурация Celery
celery_app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
task_track_started=True,
task_time_limit=3600, # 1 час максимум
task_soft_time_limit=3300, # 55 минут мягкий лимит
)
@celery_app.task(bind=True, name="devopslab.run_role_test")
def run_role_test(self, role_name: str, preset: str = "default"):
"""
Запуск теста роли в фоне
Args:
role_name: Имя роли
preset: Preset для тестирования
Returns:
Результат выполнения теста
"""
from app.core.make_executor import MakeExecutor
executor = MakeExecutor()
result = executor.execute(f"role test {preset}")
# Обновление статуса задачи
self.update_state(
state='SUCCESS' if result['success'] else 'FAILURE',
meta=result
)
return result
@celery_app.task(bind=True, name="devopslab.run_role_deploy")
def run_role_deploy(self, role_name: str, inventory: str, variables: dict = None):
"""
Запуск деплоя роли в фоне
Args:
role_name: Имя роли
inventory: Путь к inventory файлу
variables: Переменные для деплоя
Returns:
Результат выполнения деплоя
"""
from app.core.make_executor import MakeExecutor
executor = MakeExecutor()
# TODO: Реализовать деплой через make
result = executor.execute(f"role deploy")
self.update_state(
state='SUCCESS' if result['success'] else 'FAILURE',
meta=result
)
return result
@celery_app.task(bind=True, name="devopslab.build_docker_image")
def build_docker_image(self, image_name: str):
"""
Сборка Docker образа в фоне (старая версия через MakeExecutor)
Args:
image_name: Имя образа для сборки
Returns:
Результат сборки
"""
from app.core.make_executor import MakeExecutor
executor = MakeExecutor()
result = executor.execute(f"docker build-image IMAGE={image_name}")
self.update_state(
state='SUCCESS' if result['success'] else 'FAILURE',
meta=result
)
return result
@celery_app.task(bind=True, name="devopslab.build_dockerfile")
def build_dockerfile(
self,
build_log_id: int,
dockerfile_id: int = None, # ID Dockerfile в БД (приоритет)
dockerfile_content: str = None, # Для обратной совместимости
image_name: str = None,
tag: str = "latest",
platforms: list = None,
no_cache: bool = False
):
"""
Запуск сборки Dockerfile через Docker Builder API
Args:
build_log_id: ID записи лога в БД
dockerfile_content: Содержимое Dockerfile
image_name: Имя образа
tag: Тег образа
platforms: Список платформ для сборки
no_cache: Флаг сборки без кеша
Returns:
Результат сборки
"""
import asyncio
import tempfile
import os
import json
from datetime import datetime
from app.db.session import get_async_db
from app.models.database import DockerfileBuildLog
from app.services.docker_builder_service import DockerBuilderService
from app.core.config import settings
from pathlib import Path
if platforms is None:
platforms = ["linux/amd64"]
# Инициализируем builder service
builder_service = DockerBuilderService(base_url=settings.DOCKER_BUILDER_URL)
# Формируем webhook URL для получения логов
# В production это должен быть реальный URL приложения
webhook_url = f"{settings.API_HOST.replace('0.0.0.0', 'web')}:{settings.API_PORT}/api/v1/dockerfiles/build-logs/webhook"
# Для локальной разработки используем внутренний URL
if "localhost" in str(settings.API_HOST) or "127.0.0.1" in str(settings.API_HOST):
webhook_url = f"http://web:8000/api/v1/dockerfiles/build-logs/webhook"
else:
webhook_url = f"http://web:8000/api/v1/dockerfiles/build-logs/webhook"
# Запускаем сборку через Builder API
# Передаем dockerfile_id - builder сам получит содержимое из БД
async def start_build():
try:
result = await builder_service.start_build(
build_log_id=build_log_id,
dockerfile_id=dockerfile_id, # Передаем ID - builder получит содержимое из БД
dockerfile_content=dockerfile_content, # Для обратной совместимости
image_name=image_name,
tag=tag,
platforms=platforms,
no_cache=no_cache,
webhook_url=webhook_url
)
return result
except Exception as e:
import logging
logging.error(f"Error starting build via Builder API: {e}", exc_info=True)
# Обновляем статус в БД через async функцию
async def update_db_error():
async for db in get_async_db():
from sqlalchemy import update
build_log = await db.get(DockerfileBuildLog, build_log_id)
current_logs = (build_log.logs or "") if build_log else ""
await db.execute(
update(DockerfileBuildLog)
.where(DockerfileBuildLog.id == build_log_id)
.values(
status="failed",
finished_at=datetime.utcnow(),
returncode=-1,
logs=current_logs + f"\n❌ Ошибка запуска сборки: {e}"
)
)
await db.commit()
break
# Вызываем async функцию через event loop
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(update_db_error())
except Exception as db_error:
logging.error(f"Failed to update build log in DB: {db_error}", exc_info=True)
raise
# Запускаем сборку
try:
# Создаем новый event loop для Celery worker
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
# Если нет event loop, создаем новый
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
import logging
logging.info(f"Starting build for build_log_id={build_log_id}, image={image_name}:{tag}")
result = loop.run_until_complete(start_build())
logging.info(f"Build started successfully, result: {result}")
return {
"success": True,
"build_id": result.get("build_id") if result else None,
"image_name": image_name,
"tag": tag,
"full_image_name": f"{image_name}:{tag}"
}
except Exception as e:
import logging
error_msg = f"Ошибка при запуске сборки: {str(e)}"
logging.error(error_msg, exc_info=True)
# Обновляем статус в БД через async функцию
async def update_db_error():
async for db in get_async_db():
from sqlalchemy import update
build_log = await db.get(DockerfileBuildLog, build_log_id)
current_logs = (build_log.logs or "") if build_log else ""
await db.execute(
update(DockerfileBuildLog)
.where(DockerfileBuildLog.id == build_log_id)
.values(
status="failed",
finished_at=datetime.utcnow(),
returncode=-1,
logs=current_logs + f"\n{error_msg}"
)
)
await db.commit()
break
# Вызываем async функцию через event loop
try:
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(update_db_error())
except Exception as db_error:
logging.error(f"Failed to update build log in DB: {db_error}", exc_info=True)
# Возвращаем ошибку, но не выбрасываем исключение, чтобы Celery не пометил задачу как failed
return {
"success": False,
"error": error_msg,
"build_log_id": build_log_id
}
@celery_app.task(bind=True, name="devopslab.run_playbook_test")
def run_playbook_test(self, playbook_id: int, preset: str = "default", test_run_id: int = None):
"""
Запуск тестирования playbook в фоне
Args:
playbook_id: ID playbook
preset: Preset для тестирования
test_run_id: ID записи о тесте в БД
Returns:
Результат выполнения теста
"""
from app.core.make_executor import MakeExecutor
from app.db.session import get_async_db
from app.services.playbook_service import PlaybookService
from app.core.config import settings
import asyncio
import tempfile
import yaml
executor = MakeExecutor()
# Получаем playbook из БД
async def get_playbook():
async for db in get_async_db():
playbook = await PlaybookService.get_playbook(db, playbook_id)
return playbook
playbook = asyncio.run(get_playbook())
if not playbook:
return {"success": False, "error": "Playbook не найден"}
# Создаем временный файл playbook
with tempfile.NamedTemporaryFile(mode='w', suffix='.yml', delete=False) as f:
f.write(playbook.content)
temp_playbook_path = f.name
try:
# Запускаем тест через molecule с временным playbook
# TODO: Адаптировать команду make для работы с playbook
result = executor.execute(f"role test {preset}")
# Обновляем статус теста в БД
async def update_test_run():
async for db in get_async_db():
await PlaybookService.save_test_run(
db=db,
playbook_id=playbook_id,
preset_name=preset,
status="success" if result.get('success') else "failed",
output=result.get('output'),
error=result.get('error'),
returncode=result.get('returncode')
)
if test_run_id:
asyncio.run(update_test_run())
self.update_state(
state='SUCCESS' if result.get('success') else 'FAILURE',
meta=result
)
return result
finally:
import os
if os.path.exists(temp_playbook_path):
os.unlink(temp_playbook_path)
@celery_app.task(bind=True, name="devopslab.run_playbook_deploy")
def run_playbook_deploy(
self,
playbook_id: int,
inventory: str,
limit: Optional[str] = None,
tags: Optional[str] = None,
check: bool = False,
deployment_id: int = None
):
"""
Запуск деплоя playbook в фоне
Args:
playbook_id: ID playbook
inventory: Путь к inventory файлу или содержимое
limit: Лимит хостов
tags: Теги для выполнения
check: Dry-run режим
deployment_id: ID записи о деплое в БД
Returns:
Результат выполнения деплоя
"""
from app.core.make_executor import MakeExecutor
from app.db.session import get_async_db
from app.services.playbook_service import PlaybookService
from app.core.config import settings
import asyncio
import tempfile
import os
executor = MakeExecutor()
# Получаем playbook из БД
async def get_playbook():
async for db in get_async_db():
playbook = await PlaybookService.get_playbook(db, playbook_id)
return playbook
playbook = asyncio.run(get_playbook())
if not playbook:
return {"success": False, "error": "Playbook не найден"}
# Создаем временный файл playbook
with tempfile.NamedTemporaryFile(mode='w', suffix='.yml', delete=False) as f:
f.write(playbook.content)
temp_playbook_path = f.name
try:
# Запускаем деплой через make
# TODO: Адаптировать команду make для работы с playbook
result = executor.execute(f"role deploy")
# Обновляем статус деплоя в БД
async def update_deployment():
async for db in get_async_db():
await PlaybookService.save_deployment(
db=db,
playbook_id=playbook_id,
inventory=inventory,
status="success" if result.get('success') else "failed",
output=result.get('output'),
error=result.get('error'),
returncode=result.get('returncode')
)
if deployment_id:
asyncio.run(update_deployment())
self.update_state(
state='SUCCESS' if result.get('success') else 'FAILURE',
meta=result
)
return result
finally:
if os.path.exists(temp_playbook_path):
os.unlink(temp_playbook_path)