""" API endpoints для управления playbook Автор: Сергей Антропов Сайт: https://devops.org.ru """ from fastapi import APIRouter, Request, HTTPException, Depends, status, Form, WebSocket from fastapi.responses import HTMLResponse, JSONResponse from fastapi.templating import Jinja2Templates from pathlib import Path from typing import List, Optional, Dict from pydantic import BaseModel from app.db.session import get_async_db from app.services.playbook_service import PlaybookService from app.auth.deps import get_current_user from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.models.database import PlaybookTestRun, PlaybookDeployment router = APIRouter() templates_path = Path(__file__).parent.parent.parent.parent / "templates" templates = Jinja2Templates(directory=str(templates_path)) class PlaybookCreate(BaseModel): name: str description: Optional[str] = None roles: List[str] variables: Optional[Dict] = None inventory: Optional[str] = None class PlaybookUpdate(BaseModel): name: Optional[str] = None description: Optional[str] = None roles: Optional[List[str]] = None variables: Optional[Dict] = None inventory: Optional[str] = None content: Optional[str] = None @router.get("/playbooks", response_class=HTMLResponse) async def playbooks_list( request: Request, current_user: dict = Depends(get_current_user), db: AsyncSession = Depends(get_async_db) ): """Список всех playbook""" playbooks = await PlaybookService.list_playbooks(db) return templates.TemplateResponse( "pages/playbooks/list.html", { "request": request, "playbooks": playbooks } ) @router.get("/playbooks/create", response_class=HTMLResponse) async def playbook_create_page( request: Request, current_user: dict = Depends(get_current_user) ): """Страница создания playbook""" from app.core.config import settings # Получаем список доступных ролей roles_dir = settings.PROJECT_ROOT / "roles" roles = [] if roles_dir.exists(): roles = [d.name for d in roles_dir.iterdir() if d.is_dir() and (d / "tasks").exists()] return templates.TemplateResponse( "pages/playbooks/create.html", { "request": request, "roles": sorted(roles) } ) @router.get("/playbooks/{playbook_id}", response_class=HTMLResponse) async def playbook_detail( request: Request, playbook_id: int, current_user: dict = Depends(get_current_user), db: AsyncSession = Depends(get_async_db) ): """Детали playbook""" playbook = await PlaybookService.get_playbook(db, playbook_id) if not playbook: raise HTTPException(status_code=404, detail="Playbook не найден") # Получаем историю тестов и деплоев test_runs = await db.execute( select(PlaybookTestRun) .where(PlaybookTestRun.playbook_id == playbook_id) .order_by(PlaybookTestRun.started_at.desc()) .limit(10) ) deployments = await db.execute( select(PlaybookDeployment) .where(PlaybookDeployment.playbook_id == playbook_id) .order_by(PlaybookDeployment.started_at.desc()) .limit(10) ) return templates.TemplateResponse( "pages/playbooks/detail.html", { "request": request, "playbook": playbook, "test_runs": test_runs.scalars().all(), "deployments": deployments.scalars().all() } ) @router.get("/playbooks/{playbook_id}/edit", response_class=HTMLResponse) async def playbook_edit_page( request: Request, playbook_id: int, current_user: dict = Depends(get_current_user), db: AsyncSession = Depends(get_async_db) ): """Страница редактирования playbook""" playbook = await PlaybookService.get_playbook(db, playbook_id) if not playbook: raise HTTPException(status_code=404, detail="Playbook не найден") from app.core.config import settings # Получаем список доступных ролей roles_dir = settings.PROJECT_ROOT / "roles" all_roles = [] if roles_dir.exists(): all_roles = [d.name for d in roles_dir.iterdir() if d.is_dir() and (d / "tasks").exists()] return templates.TemplateResponse( "pages/playbooks/edit.html", { "request": request, "playbook": playbook, "all_roles": sorted(all_roles) } ) @router.post("/api/v1/playbooks") async def create_playbook( playbook: PlaybookCreate, current_user: dict = Depends(get_current_user), db: AsyncSession = Depends(get_async_db) ): """Создание нового playbook""" # Проверяем, что playbook с таким именем не существует existing = await PlaybookService.get_playbook_by_name(db, playbook.name) if existing: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Playbook с именем '{playbook.name}' уже существует" ) new_playbook = await PlaybookService.create_playbook( db=db, name=playbook.name, roles=playbook.roles, description=playbook.description, variables=playbook.variables, inventory=playbook.inventory, created_by=current_user.get("username") ) return {"id": new_playbook.id, "name": new_playbook.name, "message": "Playbook создан успешно"} @router.put("/api/v1/playbooks/{playbook_id}") async def update_playbook( playbook_id: int, playbook: PlaybookUpdate, current_user: dict = Depends(get_current_user), db: AsyncSession = Depends(get_async_db) ): """Обновление playbook""" updated = await PlaybookService.update_playbook( db=db, playbook_id=playbook_id, name=playbook.name, description=playbook.description, roles=playbook.roles, variables=playbook.variables, inventory=playbook.inventory, content=playbook.content, updated_by=current_user.get("username") ) if not updated: raise HTTPException(status_code=404, detail="Playbook не найден") return {"message": "Playbook обновлен успешно"} @router.delete("/api/v1/playbooks/{playbook_id}") async def delete_playbook( playbook_id: int, current_user: dict = Depends(get_current_user), db: AsyncSession = Depends(get_async_db) ): """Удаление playbook""" # Получаем имя playbook до удаления playbook = await PlaybookService.get_playbook(db, playbook_id) if not playbook: raise HTTPException(status_code=404, detail="Playbook не найден") playbook_name = playbook.name # Удаляем playbook deleted = await PlaybookService.delete_playbook(db, playbook_id) if not deleted: raise HTTPException(status_code=404, detail="Playbook не найден") return JSONResponse(content={ "success": True, "playbook_id": playbook_id, "playbook_name": playbook_name, "message": f"Playbook '{playbook_name}' успешно удален" }) @router.get("/api/v1/playbooks") async def list_playbooks_api( status_filter: Optional[str] = None, current_user: dict = Depends(get_current_user), db: AsyncSession = Depends(get_async_db) ): """API: Список всех playbook""" playbooks = await PlaybookService.list_playbooks(db, status=status_filter) return [ { "id": p.id, "name": p.name, "description": p.description, "roles": p.roles, "status": p.status, "created_at": p.created_at.isoformat() if p.created_at else None, "updated_at": p.updated_at.isoformat() if p.updated_at else None } for p in playbooks ] @router.post("/api/v1/playbooks/{playbook_id}/test") async def test_playbook( playbook_id: int, preset: Optional[str] = Form("default"), current_user: dict = Depends(get_current_user), db: AsyncSession = Depends(get_async_db) ): """Запуск тестирования playbook""" playbook = await PlaybookService.get_playbook(db, playbook_id) if not playbook: raise HTTPException(status_code=404, detail="Playbook не найден") # Сохраняем запись о тесте test_run = await PlaybookService.save_test_run( db=db, playbook_id=playbook_id, preset_name=preset, status="running", user=current_user.get("username") ) # Запускаем тест в фоне через Celery from app.tasks.celery_tasks import run_playbook_test task = run_playbook_test.delay(playbook_id, preset, test_run.id) return { "message": "Тест запущен", "test_run_id": test_run.id, "task_id": task.id, "websocket_url": f"/ws/playbook-test/{test_run.id}" } @router.post("/api/v1/playbooks/{playbook_id}/deploy") async def deploy_playbook( playbook_id: int, inventory: Optional[str] = Form(None), limit: Optional[str] = Form(None), tags: Optional[str] = Form(None), check: bool = Form(False), current_user: dict = Depends(get_current_user), db: AsyncSession = Depends(get_async_db) ): """Запуск деплоя playbook""" playbook = await PlaybookService.get_playbook(db, playbook_id) if not playbook: raise HTTPException(status_code=404, detail="Playbook не найден") # Используем inventory из playbook или переданный deploy_inventory = inventory or playbook.inventory if not deploy_inventory: raise HTTPException( status_code=400, detail="Inventory не указан. Укажите inventory в playbook или передайте его в запросе." ) # Сохраняем запись о деплое deployment = await PlaybookService.save_deployment( db=db, playbook_id=playbook_id, inventory=deploy_inventory, hosts=None, # Будет заполнено после парсинга inventory status="running", user=current_user.get("username") ) # Запускаем деплой в фоне через Celery from app.tasks.celery_tasks import run_playbook_deploy task = run_playbook_deploy.delay( playbook_id, deploy_inventory, limit, tags, check, deployment.id ) return { "message": "Деплой запущен", "deployment_id": deployment.id, "task_id": task.id, "websocket_url": f"/ws/playbook-deploy/{deployment.id}" } @router.websocket("/ws/playbook-test/{test_run_id}") async def playbook_test_websocket(websocket: WebSocket, test_run_id: int): """WebSocket для live логов тестирования playbook""" await websocket.accept() try: from app.db.session import get_async_db from app.services.playbook_service import PlaybookService from sqlalchemy.ext.asyncio import AsyncSession # Получаем информацию о тесте async for db in get_async_db(): test_run = await db.execute( select(PlaybookTestRun).where(PlaybookTestRun.id == test_run_id) ) test_run = test_run.scalar_one_or_none() if not test_run: await websocket.send_json({ "type": "error", "data": f"Тест #{test_run_id} не найден" }) await websocket.close() return playbook = await PlaybookService.get_playbook(db, test_run.playbook_id) if not playbook: await websocket.send_json({ "type": "error", "data": "Playbook не найден" }) await websocket.close() return # Подключаемся к Redis для получения логов из Celery # Пока отправляем заглушку await websocket.send_json({ "type": "info", "data": f"Тестирование playbook '{playbook.name}' запущено..." }) # TODO: Реализовать получение логов из Celery task # Пока просто ждем и отправляем статус import asyncio await asyncio.sleep(1) await websocket.send_json({ "type": "complete", "status": "running", "data": "Тест выполняется..." }) except Exception as e: await websocket.send_json({ "type": "error", "data": f"Ошибка: {str(e)}" }) await websocket.close() @router.websocket("/ws/playbook-deploy/{deployment_id}") async def playbook_deploy_websocket(websocket: WebSocket, deployment_id: int): """WebSocket для live логов деплоя playbook""" await websocket.accept() try: from app.db.session import get_async_db from app.services.playbook_service import PlaybookService from sqlalchemy import select from app.models.database import PlaybookDeployment # Получаем информацию о деплое async for db in get_async_db(): deployment = await db.execute( select(PlaybookDeployment).where(PlaybookDeployment.id == deployment_id) ) deployment = deployment.scalar_one_or_none() if not deployment: await websocket.send_json({ "type": "error", "data": f"Деплой #{deployment_id} не найден" }) await websocket.close() return playbook = await PlaybookService.get_playbook(db, deployment.playbook_id) if not playbook: await websocket.send_json({ "type": "error", "data": "Playbook не найден" }) await websocket.close() return # Подключаемся к Redis для получения логов из Celery # Пока отправляем заглушку await websocket.send_json({ "type": "info", "data": f"Деплой playbook '{playbook.name}' запущен..." }) # TODO: Реализовать получение логов из Celery task # Пока просто ждем и отправляем статус import asyncio await asyncio.sleep(1) await websocket.send_json({ "type": "complete", "status": "running", "data": "Деплой выполняется..." }) except Exception as e: await websocket.send_json({ "type": "error", "data": f"Ошибка: {str(e)}" }) await websocket.close()