import asyncio from dataclasses import dataclass, field from typing import Any from uuid import uuid4 from fastapi import FastAPI, WebSocket, WebSocketDisconnect from pydantic import BaseModel, Field from app.services.executors import ( run_ansible_playbook, run_molecule_test_for_playbook, run_molecule_test_for_role, ) from app.services.os_registry_pull import molecule_env_defaults_from_settings class PlaybookRunRequest(BaseModel): playbook_yaml: str inventory_text: str extra_vars: dict[str, Any] = Field(default_factory=dict) class MoleculePlaybookRunRequest(BaseModel): playbook_yaml: str hosts: list[dict[str, Any]] = Field(default_factory=list) extra_vars: dict[str, Any] = Field(default_factory=dict) class MoleculeRoleRunRequest(BaseModel): role_name: str role_tasks_yaml: str hosts: list[dict[str, Any]] = Field(default_factory=list) extra_vars: dict[str, Any] = Field(default_factory=dict) @dataclass class RunState: kind: str status: str = "running" return_code: int | None = None lines: list[str] = field(default_factory=list) queues: set[asyncio.Queue[str]] = field(default_factory=set) app = FastAPI(title="RoleForge Runner") runs: dict[str, RunState] = {} def _append_line(run_id: str, line: str) -> None: state = runs[run_id] state.lines.append(line) for q in state.queues: q.put_nowait(line) async def _execute(run_id: str, iterator) -> None: state = runs[run_id] code = 1 try: for line in iterator: if line.startswith("__RETURN_CODE__:"): code = int(line.split(":", maxsplit=1)[1]) continue _append_line(run_id, line) await asyncio.sleep(0) except Exception as exc: # noqa: BLE001 _append_line(run_id, f"[runner] execution error: {exc}") code = 1 state.return_code = code state.status = "success" if code == 0 else "failed" @app.get("/healthz") async def healthz() -> dict[str, str]: return {"status": "ok"} @app.post("/runs/playbook") async def create_playbook_run(payload: PlaybookRunRequest) -> dict[str, str]: run_id = str(uuid4()) runs[run_id] = RunState(kind="playbook") iterator = run_ansible_playbook(payload.playbook_yaml, payload.inventory_text, payload.extra_vars) asyncio.create_task(_execute(run_id, iterator)) return {"run_id": run_id} @app.post("/runs/molecule/playbook") async def create_molecule_playbook_run(payload: MoleculePlaybookRunRequest) -> dict[str, str]: run_id = str(uuid4()) runs[run_id] = RunState(kind="molecule_playbook") iterator = run_molecule_test_for_playbook( payload.playbook_yaml, payload.hosts, payload.extra_vars, molecule_extra_env=molecule_env_defaults_from_settings(), ) asyncio.create_task(_execute(run_id, iterator)) return {"run_id": run_id} @app.post("/runs/molecule/role") async def create_molecule_role_run(payload: MoleculeRoleRunRequest) -> dict[str, str]: run_id = str(uuid4()) runs[run_id] = RunState(kind="molecule_role") iterator = run_molecule_test_for_role( payload.role_name, payload.role_tasks_yaml, payload.hosts, payload.extra_vars, molecule_extra_env=molecule_env_defaults_from_settings(), ) asyncio.create_task(_execute(run_id, iterator)) return {"run_id": run_id} @app.get("/runs/{run_id}") async def get_run(run_id: str, offset: int = 0) -> dict[str, Any]: state = runs[run_id] return { "run_id": run_id, "kind": state.kind, "status": state.status, "return_code": state.return_code, "lines": state.lines[offset:], "next_offset": len(state.lines), } @app.websocket("/ws/runs/{run_id}") async def ws_run_logs(websocket: WebSocket, run_id: str) -> None: await websocket.accept() state = runs[run_id] queue: asyncio.Queue[str] = asyncio.Queue() state.queues.add(queue) try: for line in state.lines: await websocket.send_text(line) while True: if state.status in {"success", "failed"} and queue.empty(): break try: line = await asyncio.wait_for(queue.get(), timeout=1.0) except TimeoutError: # No log line this second — keep the socket open and re-check completion. continue await websocket.send_text(line) except WebSocketDisconnect: pass finally: state.queues.discard(queue)