""" Celery задачи для фонового выполнения Автор: Сергей Антропов Сайт: https://devops.org.ru """ from celery import Celery from app.core.config import settings from typing import Optional # Создание Celery приложения celery_app = Celery( "devopslab", broker=settings.REDIS_URL, backend=settings.REDIS_URL ) # Конфигурация Celery celery_app.conf.update( task_serializer='json', accept_content=['json'], result_serializer='json', timezone='UTC', enable_utc=True, task_track_started=True, task_time_limit=3600, # 1 час максимум task_soft_time_limit=3300, # 55 минут мягкий лимит ) @celery_app.task(bind=True, name="devopslab.run_role_test") def run_role_test(self, role_name: str, preset: str = "default"): """ Запуск теста роли в фоне Args: role_name: Имя роли preset: Preset для тестирования Returns: Результат выполнения теста """ from app.core.make_executor import MakeExecutor executor = MakeExecutor() result = executor.execute(f"role test {preset}") # Обновление статуса задачи self.update_state( state='SUCCESS' if result['success'] else 'FAILURE', meta=result ) return result @celery_app.task(bind=True, name="devopslab.run_role_deploy") def run_role_deploy(self, role_name: str, inventory: str, variables: dict = None): """ Запуск деплоя роли в фоне Args: role_name: Имя роли inventory: Путь к inventory файлу variables: Переменные для деплоя Returns: Результат выполнения деплоя """ from app.core.make_executor import MakeExecutor executor = MakeExecutor() # TODO: Реализовать деплой через make result = executor.execute(f"role deploy") self.update_state( state='SUCCESS' if result['success'] else 'FAILURE', meta=result ) return result @celery_app.task(bind=True, name="devopslab.build_docker_image") def build_docker_image(self, image_name: str): """ Сборка Docker образа в фоне (старая версия через MakeExecutor) Args: image_name: Имя образа для сборки Returns: Результат сборки """ from app.core.make_executor import MakeExecutor executor = MakeExecutor() result = executor.execute(f"docker build-image IMAGE={image_name}") self.update_state( state='SUCCESS' if result['success'] else 'FAILURE', meta=result ) return result @celery_app.task(bind=True, name="devopslab.build_dockerfile") def build_dockerfile( self, build_log_id: int, dockerfile_id: int = None, # ID Dockerfile в БД (приоритет) dockerfile_content: str = None, # Для обратной совместимости image_name: str = None, tag: str = "latest", platforms: list = None, no_cache: bool = False ): """ Запуск сборки Dockerfile через Docker Builder API Args: build_log_id: ID записи лога в БД dockerfile_content: Содержимое Dockerfile image_name: Имя образа tag: Тег образа platforms: Список платформ для сборки no_cache: Флаг сборки без кеша Returns: Результат сборки """ import asyncio import tempfile import os import json from datetime import datetime from app.db.session import get_async_db from app.models.database import DockerfileBuildLog from app.services.docker_builder_service import DockerBuilderService from app.core.config import settings from pathlib import Path if platforms is None: platforms = ["linux/amd64"] # Инициализируем builder service builder_service = DockerBuilderService(base_url=settings.DOCKER_BUILDER_URL) # Формируем webhook URL для получения логов # В production это должен быть реальный URL приложения webhook_url = f"{settings.API_HOST.replace('0.0.0.0', 'web')}:{settings.API_PORT}/api/v1/dockerfiles/build-logs/webhook" # Для локальной разработки используем внутренний URL if "localhost" in str(settings.API_HOST) or "127.0.0.1" in str(settings.API_HOST): webhook_url = f"http://web:8000/api/v1/dockerfiles/build-logs/webhook" else: webhook_url = f"http://web:8000/api/v1/dockerfiles/build-logs/webhook" # Запускаем сборку через Builder API # Передаем dockerfile_id - builder сам получит содержимое из БД async def start_build(): try: result = await builder_service.start_build( build_log_id=build_log_id, dockerfile_id=dockerfile_id, # Передаем ID - builder получит содержимое из БД dockerfile_content=dockerfile_content, # Для обратной совместимости image_name=image_name, tag=tag, platforms=platforms, no_cache=no_cache, webhook_url=webhook_url ) return result except Exception as e: import logging logging.error(f"Error starting build via Builder API: {e}", exc_info=True) # Обновляем статус в БД через async функцию async def update_db_error(): async for db in get_async_db(): from sqlalchemy import update build_log = await db.get(DockerfileBuildLog, build_log_id) current_logs = (build_log.logs or "") if build_log else "" await db.execute( update(DockerfileBuildLog) .where(DockerfileBuildLog.id == build_log_id) .values( status="failed", finished_at=datetime.utcnow(), returncode=-1, logs=current_logs + f"\n❌ Ошибка запуска сборки: {e}" ) ) await db.commit() break # Вызываем async функцию через event loop try: loop = asyncio.get_event_loop() if loop.is_closed(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete(update_db_error()) except Exception as db_error: logging.error(f"Failed to update build log in DB: {db_error}", exc_info=True) raise # Запускаем сборку try: # Создаем новый event loop для Celery worker try: loop = asyncio.get_event_loop() if loop.is_closed(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) except RuntimeError: # Если нет event loop, создаем новый loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) import logging logging.info(f"Starting build for build_log_id={build_log_id}, image={image_name}:{tag}") result = loop.run_until_complete(start_build()) logging.info(f"Build started successfully, result: {result}") return { "success": True, "build_id": result.get("build_id") if result else None, "image_name": image_name, "tag": tag, "full_image_name": f"{image_name}:{tag}" } except Exception as e: import logging error_msg = f"Ошибка при запуске сборки: {str(e)}" logging.error(error_msg, exc_info=True) # Обновляем статус в БД через async функцию async def update_db_error(): async for db in get_async_db(): from sqlalchemy import update build_log = await db.get(DockerfileBuildLog, build_log_id) current_logs = (build_log.logs or "") if build_log else "" await db.execute( update(DockerfileBuildLog) .where(DockerfileBuildLog.id == build_log_id) .values( status="failed", finished_at=datetime.utcnow(), returncode=-1, logs=current_logs + f"\n❌ {error_msg}" ) ) await db.commit() break # Вызываем async функцию через event loop try: try: loop = asyncio.get_event_loop() if loop.is_closed(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(update_db_error()) except Exception as db_error: logging.error(f"Failed to update build log in DB: {db_error}", exc_info=True) # Возвращаем ошибку, но не выбрасываем исключение, чтобы Celery не пометил задачу как failed return { "success": False, "error": error_msg, "build_log_id": build_log_id } @celery_app.task(bind=True, name="devopslab.run_playbook_test") def run_playbook_test(self, playbook_id: int, preset: str = "default", test_run_id: int = None): """ Запуск тестирования playbook в фоне Args: playbook_id: ID playbook preset: Preset для тестирования test_run_id: ID записи о тесте в БД Returns: Результат выполнения теста """ from app.core.make_executor import MakeExecutor from app.db.session import get_async_db from app.services.playbook_service import PlaybookService from app.core.config import settings import asyncio import tempfile import yaml executor = MakeExecutor() # Получаем playbook из БД async def get_playbook(): async for db in get_async_db(): playbook = await PlaybookService.get_playbook(db, playbook_id) return playbook playbook = asyncio.run(get_playbook()) if not playbook: return {"success": False, "error": "Playbook не найден"} # Создаем временный файл playbook with tempfile.NamedTemporaryFile(mode='w', suffix='.yml', delete=False) as f: f.write(playbook.content) temp_playbook_path = f.name try: # Запускаем тест через molecule с временным playbook # TODO: Адаптировать команду make для работы с playbook result = executor.execute(f"role test {preset}") # Обновляем статус теста в БД async def update_test_run(): async for db in get_async_db(): await PlaybookService.save_test_run( db=db, playbook_id=playbook_id, preset_name=preset, status="success" if result.get('success') else "failed", output=result.get('output'), error=result.get('error'), returncode=result.get('returncode') ) if test_run_id: asyncio.run(update_test_run()) self.update_state( state='SUCCESS' if result.get('success') else 'FAILURE', meta=result ) return result finally: import os if os.path.exists(temp_playbook_path): os.unlink(temp_playbook_path) @celery_app.task(bind=True, name="devopslab.run_playbook_deploy") def run_playbook_deploy( self, playbook_id: int, inventory: str, limit: Optional[str] = None, tags: Optional[str] = None, check: bool = False, deployment_id: int = None ): """ Запуск деплоя playbook в фоне Args: playbook_id: ID playbook inventory: Путь к inventory файлу или содержимое limit: Лимит хостов tags: Теги для выполнения check: Dry-run режим deployment_id: ID записи о деплое в БД Returns: Результат выполнения деплоя """ from app.core.make_executor import MakeExecutor from app.db.session import get_async_db from app.services.playbook_service import PlaybookService from app.core.config import settings import asyncio import tempfile import os executor = MakeExecutor() # Получаем playbook из БД async def get_playbook(): async for db in get_async_db(): playbook = await PlaybookService.get_playbook(db, playbook_id) return playbook playbook = asyncio.run(get_playbook()) if not playbook: return {"success": False, "error": "Playbook не найден"} # Создаем временный файл playbook with tempfile.NamedTemporaryFile(mode='w', suffix='.yml', delete=False) as f: f.write(playbook.content) temp_playbook_path = f.name try: # Запускаем деплой через make # TODO: Адаптировать команду make для работы с playbook result = executor.execute(f"role deploy") # Обновляем статус деплоя в БД async def update_deployment(): async for db in get_async_db(): await PlaybookService.save_deployment( db=db, playbook_id=playbook_id, inventory=inventory, status="success" if result.get('success') else "failed", output=result.get('output'), error=result.get('error'), returncode=result.get('returncode') ) if deployment_id: asyncio.run(update_deployment()) self.update_state( state='SUCCESS' if result.get('success') else 'FAILURE', meta=result ) return result finally: if os.path.exists(temp_playbook_path): os.unlink(temp_playbook_path)