Files
RoleForge/app/runner/main.py
Sergey Antropoff 01d598eea5 - Админка: настройка pull-реестра (Hub / Harbor / Nexus) в БД, шифрование секретов;
обновлён /admin/config и API для os_registry.
- Molecule/раннер: env из конфигурации, ensure roleforge-os (ensure_roleforge_os.yml),
  os_registry_pull и доработки executors / runner / create.yml.
- /admin/os-images: выбор реестра, buildx (в т.ч. split amd64+arm64 + imagetools),
  опция --no-cache, стрим логов; domain.py: план команд build, ретраи push.
- UI: брендинг (app_name, app_tagline) из app_config через get_ui_branding_context;
  base.xhtml, role-create / role-view, core.js, pages-main, стили.
- Dockerfiles: требование Python ≥3.9 (assert), доработки alt9/astra/debian9/ubuntu20
  и др.; новые Dockerfile.arm64 для centos7/centos8.
- Конфиг: .env.example, config.py, pyproject.toml.
2026-05-06 07:52:29 +03:00

151 lines
4.5 KiB
Python

import asyncio
from dataclasses import dataclass, field
from typing import Any
from uuid import uuid4
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from pydantic import BaseModel, Field
from app.services.executors import (
run_ansible_playbook,
run_molecule_test_for_playbook,
run_molecule_test_for_role,
)
from app.services.os_registry_pull import molecule_env_defaults_from_settings
class PlaybookRunRequest(BaseModel):
playbook_yaml: str
inventory_text: str
extra_vars: dict[str, Any] = Field(default_factory=dict)
class MoleculePlaybookRunRequest(BaseModel):
playbook_yaml: str
hosts: list[dict[str, Any]] = Field(default_factory=list)
extra_vars: dict[str, Any] = Field(default_factory=dict)
class MoleculeRoleRunRequest(BaseModel):
role_name: str
role_tasks_yaml: str
hosts: list[dict[str, Any]] = Field(default_factory=list)
extra_vars: dict[str, Any] = Field(default_factory=dict)
@dataclass
class RunState:
kind: str
status: str = "running"
return_code: int | None = None
lines: list[str] = field(default_factory=list)
queues: set[asyncio.Queue[str]] = field(default_factory=set)
app = FastAPI(title="RoleForge Runner")
runs: dict[str, RunState] = {}
def _append_line(run_id: str, line: str) -> None:
state = runs[run_id]
state.lines.append(line)
for q in state.queues:
q.put_nowait(line)
async def _execute(run_id: str, iterator) -> None:
state = runs[run_id]
code = 1
try:
for line in iterator:
if line.startswith("__RETURN_CODE__:"):
code = int(line.split(":", maxsplit=1)[1])
continue
_append_line(run_id, line)
await asyncio.sleep(0)
except Exception as exc: # noqa: BLE001
_append_line(run_id, f"[runner] execution error: {exc}")
code = 1
state.return_code = code
state.status = "success" if code == 0 else "failed"
@app.get("/healthz")
async def healthz() -> dict[str, str]:
return {"status": "ok"}
@app.post("/runs/playbook")
async def create_playbook_run(payload: PlaybookRunRequest) -> dict[str, str]:
run_id = str(uuid4())
runs[run_id] = RunState(kind="playbook")
iterator = run_ansible_playbook(payload.playbook_yaml, payload.inventory_text, payload.extra_vars)
asyncio.create_task(_execute(run_id, iterator))
return {"run_id": run_id}
@app.post("/runs/molecule/playbook")
async def create_molecule_playbook_run(payload: MoleculePlaybookRunRequest) -> dict[str, str]:
run_id = str(uuid4())
runs[run_id] = RunState(kind="molecule_playbook")
iterator = run_molecule_test_for_playbook(
payload.playbook_yaml,
payload.hosts,
payload.extra_vars,
molecule_extra_env=molecule_env_defaults_from_settings(),
)
asyncio.create_task(_execute(run_id, iterator))
return {"run_id": run_id}
@app.post("/runs/molecule/role")
async def create_molecule_role_run(payload: MoleculeRoleRunRequest) -> dict[str, str]:
run_id = str(uuid4())
runs[run_id] = RunState(kind="molecule_role")
iterator = run_molecule_test_for_role(
payload.role_name,
payload.role_tasks_yaml,
payload.hosts,
payload.extra_vars,
molecule_extra_env=molecule_env_defaults_from_settings(),
)
asyncio.create_task(_execute(run_id, iterator))
return {"run_id": run_id}
@app.get("/runs/{run_id}")
async def get_run(run_id: str, offset: int = 0) -> dict[str, Any]:
state = runs[run_id]
return {
"run_id": run_id,
"kind": state.kind,
"status": state.status,
"return_code": state.return_code,
"lines": state.lines[offset:],
"next_offset": len(state.lines),
}
@app.websocket("/ws/runs/{run_id}")
async def ws_run_logs(websocket: WebSocket, run_id: str) -> None:
await websocket.accept()
state = runs[run_id]
queue: asyncio.Queue[str] = asyncio.Queue()
state.queues.add(queue)
try:
for line in state.lines:
await websocket.send_text(line)
while True:
if state.status in {"success", "failed"} and queue.empty():
break
try:
line = await asyncio.wait_for(queue.get(), timeout=1.0)
except TimeoutError:
# No log line this second — keep the socket open and re-check completion.
continue
await websocket.send_text(line)
except WebSocketDisconnect:
pass
finally:
state.queues.discard(queue)