Files
RoleForge/app/routers/domain.py
Sergey Antropoff 01d598eea5 - Админка: настройка pull-реестра (Hub / Harbor / Nexus) в БД, шифрование секретов;
обновлён /admin/config и API для os_registry.
- Molecule/раннер: env из конфигурации, ensure roleforge-os (ensure_roleforge_os.yml),
  os_registry_pull и доработки executors / runner / create.yml.
- /admin/os-images: выбор реестра, buildx (в т.ч. split amd64+arm64 + imagetools),
  опция --no-cache, стрим логов; domain.py: план команд build, ретраи push.
- UI: брендинг (app_name, app_tagline) из app_config через get_ui_branding_context;
  base.xhtml, role-create / role-view, core.js, pages-main, стили.
- Dockerfiles: требование Python ≥3.9 (assert), доработки alt9/astra/debian9/ubuntu20
  и др.; новые Dockerfile.arm64 для centos7/centos8.
- Конфиг: .env.example, config.py, pyproject.toml.
2026-05-06 07:52:29 +03:00

3238 lines
120 KiB
Python

import asyncio
import base64
import io
import json
import re
from uuid import UUID
import subprocess
import time
import zipfile
from pathlib import Path
from urllib.parse import quote
import yaml
from collections.abc import AsyncIterator
from typing import Any, Literal, NamedTuple
from fastapi import APIRouter, Body, Depends, File, HTTPException, Query, Response, UploadFile, status
from fastapi.responses import StreamingResponse
from yamllint import linter as yaml_linter
from yamllint.config import YamlLintConfig
from app.deps.auth import get_current_admin_user_id, get_current_user_id
from app.db.pool import get_pool
from app.schemas.domain import (
AddRoleToPlaybookRequest,
InventoryCreate,
JobLaunchRequest,
LintRoleFileError,
LintRoleFileRequest,
LintRoleFileResponse,
PlaybookCreate,
PlaybookYamlImportRequest,
RoleCategoryCreate,
RoleCreate,
RoleFilePayload,
RoleForkRequest,
RoleFilesReplaceRequest,
RoleImportItem,
RoleImportResult,
RoleUpdate,
RoleYamlImportRequest,
RunnerStopRequest,
TestLaunchRequest,
)
from app.core.config import get_settings
from app.services.dynamic_worker import launch_ephemeral_worker, list_active_runners, terminate_ephemeral_worker
from app.services.jsonlint_rules import (
JSONLINT_RULE_META,
merge_jsonlint_saved,
run_json_lint,
serialize_rules_for_api as serialize_jsonlint_rules_for_api,
validate_jsonlint_put_payload,
)
from app.services.jsonlint_runtime import (
get_jsonlint_config_for_lint,
invalidate_jsonlint_config_cache,
refresh_jsonlint_config_cache,
)
from app.services.yamllint_rules import RULE_META, merge_yamllint_saved, serialize_rules_for_api, validate_put_payload
from app.services.yamllint_runtime import get_yamllint_config_for_lint, invalidate_yamllint_config_cache, refresh_yamllint_config_cache
from app.services.runner_client import (
create_molecule_playbook_run,
create_molecule_role_run,
create_playbook_run,
get_run_status,
)
from app.services.os_registry_pull import (
CONFIG_KEY as OS_REGISTRY_CONFIG_KEY,
encrypt_registry_secret,
parse_stored_os_registry,
)
router = APIRouter(tags=["domain"])
_PROJECT_ADMIN_KEYS = frozenset(
{
"app_name",
"app_tagline",
"runner_image",
"docker_network",
"runner_timeout_sec",
"runner_poll_interval_sec",
"runner_max_poll_errors",
}
)
# Temporary debug switch: keep ephemeral test runner alive after run completion.
# Flip back to True when you want normal auto-cleanup again.
AUTO_TERMINATE_TEST_RUNNER = True
DOCKERFILES_ROOT = Path(__file__).resolve().parents[2] / "dockerfiles"
BUILDX_BUILDER_NAME = "roleforge-builder"
# Hub uploads often flake (broken pipe / resets); BuildKit also reports context canceled if the HTTP stream is torn down mid-push.
_OS_BUILDX_PUSH_RETRIES = 3
_OS_BUILDX_PUSH_RETRY_DELAY_SEC = 10
def _buildx_attestation_off_args() -> list[str]:
"""Skip provenance/SBOM attestations — fewer manifests and less push churn to Docker Hub."""
return ["--provenance=false", "--sbom=false"]
def _should_retry_os_registry_cmd(cmd: list[str]) -> bool:
"""Retry flaky registry operations (buildx --push and manifest creation)."""
if not cmd:
return False
if "buildx" in cmd and "build" in cmd and "--push" in cmd:
return True
if "buildx" in cmd and "imagetools" in cmd and "create" in cmd:
return True
return False
def _manifest_stage_tag(remote_ref: str, slug: str) -> str:
"""Temporary tag for per-arch pushes before `buildx imagetools create` merges the manifest."""
r = str(remote_ref or "").strip()
if not r:
return f"__invalid:{slug}"
if ":" not in r:
return f"{r}__rfstage_{slug}"
repo_part, tag_part = r.rsplit(":", 1)
return f"{repo_part}:{tag_part}__rfstage_{slug}"
def _paired_amd64_arm64_paths(dockerfile_abs: str) -> tuple[str | None, str | None]:
p = Path(dockerfile_abs)
if p.name != "Dockerfile":
return None, None
arm = p.parent / "Dockerfile.arm64"
if not arm.is_file():
return None, None
return str(p.resolve()), str(arm.resolve())
def _should_retry_buildx_push_output(blob: str) -> bool:
t = blob.lower()
needles = (
"broken pipe",
"connection reset",
"connection refused",
"context canceled",
"context cancelled",
"timeout",
"unexpected eof",
"eof",
"tls:",
"bad gateway",
"gateway timeout",
" 502 ",
" 503 ",
"use of closed network connection",
"failed to do request",
"temporary failure",
)
return any(n in t for n in needles)
def _run_platform_for_os_image(platform: str, build_platforms: str) -> str:
"""Architectural platform hint for Molecule (`docker_container.platform`).
Single-arch images: pin that arch (often linux/amd64). Multi-arch matrices: return empty so the
runner picks the host-native slice (Apple Silicon gets linux/arm64, no forced amd64 emulation).
"""
p = str(platform or "").strip()
if p:
return p
parts = [x.strip() for x in str(build_platforms or "").split(",") if x.strip()]
if len(parts) > 1:
return ""
return parts[0] if parts else ""
def _scan_os_options() -> list[dict[str, str]]:
"""Discover dockerfiles under dockerfiles/.
When both ``Dockerfile`` and ``Dockerfile.arm64`` exist in the same OS directory, emit a single
matrix row that builds/pushes one manifest tag (``roleforge-os:<os>``) with amd64 + arm64 slices
using the two dockerfiles.
"""
items: list[dict[str, str]] = []
root = DOCKERFILES_ROOT
if not root.exists():
return items
all_paths = sorted(root.rglob("Dockerfile*"))
by_parent: dict[str, list[Path]] = {}
for path in all_paths:
rel = path.relative_to(root)
key = str(rel.parent)
by_parent.setdefault(key, []).append(path)
paired_parents: set[str] = set()
for parent_key, paths in by_parent.items():
names = {p.name for p in paths}
if "Dockerfile" in names and "Dockerfile.arm64" in names:
paired_parents.add(parent_key)
merged_parents: set[str] = set()
for path in all_paths:
rel = path.relative_to(root)
parent_key = str(rel.parent)
os_key = rel.parent.name
if parent_key in paired_parents:
if path.name != "Dockerfile":
continue
if parent_key in merged_parents:
continue
merged_parents.add(parent_key)
rel_arm = (root / rel.parent / "Dockerfile.arm64").relative_to(root)
image_tag = f"roleforge-os:{os_key}"
build_platforms = "linux/amd64,linux/arm64"
platform = ""
run_platform = _run_platform_for_os_image(platform, build_platforms)
items.append(
{
"id": f"{os_key}:manifest",
"name": f"{os_key} (amd64 + arm64)",
"dockerfile": str(rel),
"dockerfile_arm64": str(rel_arm),
"image": image_tag,
"command": "/sbin/init",
"systemd": "true",
"platform": platform,
"build_platforms": build_platforms,
"run_platform": run_platform,
}
)
continue
variant = path.name.replace("Dockerfile", "").strip(".")
suffix = f" ({variant})" if variant else ""
image_tag = f"roleforge-os:{os_key}{('-' + variant) if variant else ''}"
if variant == "arm64":
platform = "linux/arm64"
build_platforms = "linux/arm64"
elif os_key in {"astra-linux", "redos", "centos7", "centos8"}:
platform = "linux/amd64"
build_platforms = "linux/amd64"
else:
platform = ""
build_platforms = "linux/amd64,linux/arm64"
run_platform = _run_platform_for_os_image(platform, build_platforms)
items.append(
{
"id": f"{os_key}:{variant or 'default'}",
"name": f"{os_key}{suffix}",
"dockerfile": str(rel),
"image": image_tag,
"command": "/sbin/init",
"systemd": "true",
"platform": platform,
"build_platforms": build_platforms,
"run_platform": run_platform,
}
)
return items
def _resolve_os_option_by_dockerfile(dockerfile_rel: str) -> dict[str, str] | None:
needle = str(dockerfile_rel or "").strip().replace("\\", "/")
if not needle:
return None
for item in _scan_os_options():
candidates: list[str] = []
df = str(item.get("dockerfile") or "").strip().replace("\\", "/")
if df:
candidates.append(df)
dfa = item.get("dockerfile_arm64")
if dfa:
candidates.append(str(dfa).strip().replace("\\", "/"))
for candidate in candidates:
if candidate == needle or candidate.endswith(needle) or needle.endswith(candidate):
return item
return None
def _infer_platform_defaults(dockerfile_rel: str) -> tuple[str, str]:
hit = _resolve_os_option_by_dockerfile(str(dockerfile_rel))
if hit:
return (str(hit.get("platform") or "").strip(), str(hit.get("build_platforms") or "").strip())
s = str(dockerfile_rel or "").replace("\\", "/").lower()
if "dockerfile.arm64" in s:
return ("linux/arm64", "linux/arm64")
return ("", "linux/amd64,linux/arm64")
async def _ensure_docker_buildx_mode() -> tuple[str, str]:
check = await asyncio.to_thread(
subprocess.run,
["docker", "buildx", "version"],
capture_output=True,
text=True,
)
if check.returncode == 0:
return "local", "docker buildx is available"
install_attempts = [
"apt-get update && apt-get install -y docker-buildx-plugin",
"apt-get update && apt-get install -y docker-buildx",
]
for cmd in install_attempts:
installed = await asyncio.to_thread(
subprocess.run,
["sh", "-lc", cmd],
capture_output=True,
text=True,
)
if installed.returncode == 0:
recheck = await asyncio.to_thread(
subprocess.run,
["docker", "buildx", "version"],
capture_output=True,
text=True,
)
if recheck.returncode == 0:
return "local", "docker buildx installed via package manager"
msg = (check.stderr or check.stdout or "").strip() or "unknown error"
return "none", f"docker buildx unavailable and auto-install failed: {msg}"
async def _ensure_buildx_builder() -> tuple[bool, str]:
inspect = await asyncio.to_thread(
subprocess.run,
["docker", "buildx", "inspect", BUILDX_BUILDER_NAME],
capture_output=True,
text=True,
)
if inspect.returncode != 0:
created = await asyncio.to_thread(
subprocess.run,
["docker", "buildx", "create", "--name", BUILDX_BUILDER_NAME, "--driver", "docker-container"],
capture_output=True,
text=True,
)
if created.returncode != 0:
err = (created.stderr or created.stdout or "").strip() or "unknown error"
return False, f"failed to create buildx builder '{BUILDX_BUILDER_NAME}': {err}"
boot = await asyncio.to_thread(
subprocess.run,
["docker", "buildx", "inspect", BUILDX_BUILDER_NAME, "--bootstrap"],
capture_output=True,
text=True,
)
if boot.returncode != 0:
err = (boot.stderr or boot.stdout or "").strip() or "unknown error"
return False, f"failed to bootstrap buildx builder '{BUILDX_BUILDER_NAME}': {err}"
return True, f"buildx builder '{BUILDX_BUILDER_NAME}' is ready"
def _normalize_os_registry_prefix(raw: str) -> str:
return (raw or "").strip().rstrip("/")
def _normalize_registry_host(raw: str) -> str:
"""Host[:port] only for docker login / push (strip scheme and path)."""
s = (raw or "").strip()
if not s:
return ""
s = s.replace("https://", "").replace("http://", "").strip()
return s.split("/", 1)[0].strip().rstrip("/")
class ResolvedOsPush(NamedTuple):
kind: str # hub | registry | none
target: str # push target base (see legacy_full_image)
docker_login_server: str | None # docker login server arg
legacy_full_image: bool # True: target is prefix + full roleforge-os:tag image ref
def _resolve_os_push_config(registry_payload: dict[str, Any] | None) -> ResolvedOsPush:
"""UI/registry_* fields override ROLEFORGE_OS_* env defaults."""
p = registry_payload or {}
provider = str(p.get("registry_provider") or "").strip().lower()
if provider in ("harbor", "nexus"):
host = _normalize_registry_host(str(p.get("registry_host") or ""))
repo_path = "/".join(
x.strip()
for x in str(p.get("repository_path") or "").replace("\\", "/").split("/")
if x.strip()
)
if not host or not repo_path:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="registry_host and repository_path are required for Harbor and Nexus.",
)
target = f"{host}/{repo_path}"
return ResolvedOsPush("registry", target, host, False)
if provider not in ("hub", "", "docker_hub"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Unknown registry_provider: {provider or '(empty)'}",
)
repo_path = str(p.get("repository_path") or "").strip().rstrip("/")
if repo_path:
return ResolvedOsPush("hub", repo_path, None, False)
kind, tgt = _os_push_kind_and_target()
if kind == "hub":
return ResolvedOsPush("hub", tgt, None, False)
if kind == "registry":
return ResolvedOsPush("registry", tgt, tgt, True)
return ResolvedOsPush("none", "", None, False)
def _os_push_kind_and_target() -> tuple[str, str]:
"""('hub', 'user/repo') | ('registry', 'host[:port]') | ('none', '')."""
s = get_settings()
hub_repo = (s.roleforge_os_docker_hub_repository or "").strip().rstrip("/")
if hub_repo:
return "hub", hub_repo
reg = _normalize_os_registry_prefix(s.roleforge_os_image_registry or "")
if reg:
return "registry", reg
return "none", ""
def _os_image_push_display() -> str:
kind, target = _os_push_kind_and_target()
if kind == "hub":
return f"docker.io/{target}"
return target
async def _stream_process_stdout_chunks(proc: asyncio.subprocess.Process) -> AsyncIterator[str]:
"""Yield decoded stdout; chunk reads (not readline) so Docker progress bars using \\r do not stall."""
if proc.stdout is None:
return
while True:
block = await proc.stdout.read(65536)
if not block:
break
yield block.decode(errors="replace")
async def _reap_subprocess_after_stream_disconnect(proc: asyncio.subprocess.Process | None) -> None:
"""Drain stdout and wait — avoids killing BuildKit on client disconnect (prevents spurious context canceled)."""
if proc is None:
return
try:
if proc.returncode is not None:
return
if proc.stdout is not None:
while await proc.stdout.read(65536):
pass
await asyncio.wait_for(proc.wait(), timeout=6 * 3600)
except asyncio.TimeoutError:
try:
proc.kill()
except ProcessLookupError:
pass
except Exception: # noqa: BLE001
try:
proc.kill()
except ProcessLookupError:
pass
async def _docker_login_stdin_password(
*,
username: str,
password: str,
registry_server: str | None,
) -> tuple[int, bytes]:
cmd = ["docker", "login", "--username", username, "--password-stdin"]
if registry_server:
cmd.append(registry_server)
proc = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
stdout, _ = await proc.communicate(input=password.encode())
code = int(proc.returncode or 0)
return code, stdout or b""
def _remote_os_image_ref(registry_prefix: str, image_tag: str) -> str:
"""Canonical tag roleforge-os:ubuntu20 -> localhost:5000/roleforge-os:ubuntu20."""
p = _normalize_os_registry_prefix(registry_prefix)
tag = str(image_tag or "").strip()
if not p:
return tag
return f"{p}/{tag}"
def _remote_push_reference(
kind: str,
target: str,
image_tag: str,
*,
legacy_full_image: bool = False,
) -> str:
"""roleforge-os:ubuntu20 → hub user/repo:ubuntu20 or host/proj/img:ubuntu20."""
if legacy_full_image:
return _remote_os_image_ref(target, image_tag)
_, sep, suffix = str(image_tag).partition(":")
tag_suffix = (suffix.strip() if sep else str(image_tag).strip()) or "latest"
return f"{target}:{tag_suffix}"
def _manual_pull_and_tag_shell_commands(
kind: str,
push_target: str,
image_tag: str,
*,
legacy_full_image: bool = False,
) -> list[str]:
"""Lines to run locally after a registry push (pull remote digest + alias for Molecule)."""
remote = _remote_push_reference(kind, push_target, image_tag, legacy_full_image=legacy_full_image)
local = str(image_tag).strip()
return [f"docker pull {remote}", f"docker tag {remote} {local}"]
def _os_build_command_plan(
*,
dockerfile_abs: str,
image_tag: str,
platform: str,
build_platforms_csv: str,
repo_root: str,
push_kind: str,
push_target: str,
legacy_full_image: bool = False,
no_cache: bool = False,
) -> tuple[list[list[str]], str | None]:
matrix = [p.strip() for p in str(build_platforms_csv or "").split(",") if p.strip()]
if not matrix and str(platform or "").strip():
matrix = [str(platform).strip()]
if not matrix:
return [], "No platforms to build (set platform / build_platforms)."
multi = len(matrix) > 1
target_ok = push_kind in ("hub", "registry") and bool(_normalize_os_registry_prefix(push_target))
def buildx_load(tag: str, plat: str) -> list[str]:
core = ["docker", "buildx", "build", "--builder", BUILDX_BUILDER_NAME]
if no_cache:
core.append("--no-cache")
core.extend(["-f", dockerfile_abs, "-t", tag])
core.extend(_buildx_attestation_off_args())
if plat:
core.extend(["--platform", plat])
core.extend(["--load", repo_root])
return core
cmds: list[list[str]] = []
manifest_platforms = {"linux/amd64", "linux/arm64"}
amd_path, arm_path = _paired_amd64_arm64_paths(dockerfile_abs)
use_split_manifest = (
multi
and target_ok
and bool(amd_path and arm_path)
and set(matrix) == manifest_platforms
)
if use_split_manifest and amd_path and arm_path:
remote = _remote_push_reference(
push_kind,
push_target,
image_tag,
legacy_full_image=legacy_full_image,
)
stage_amd = _manifest_stage_tag(remote, "amd64")
stage_arm = _manifest_stage_tag(remote, "arm64")
amd_cmd = [
"docker",
"buildx",
"build",
"--builder",
BUILDX_BUILDER_NAME,
]
if no_cache:
amd_cmd.append("--no-cache")
amd_cmd.extend(
[
"-f",
amd_path,
"-t",
stage_amd,
*_buildx_attestation_off_args(),
"--platform",
"linux/amd64",
"--push",
repo_root,
]
)
cmds.append(amd_cmd)
arm_cmd = [
"docker",
"buildx",
"build",
"--builder",
BUILDX_BUILDER_NAME,
]
if no_cache:
arm_cmd.append("--no-cache")
arm_cmd.extend(
[
"-f",
arm_path,
"-t",
stage_arm,
*_buildx_attestation_off_args(),
"--platform",
"linux/arm64",
"--push",
repo_root,
]
)
cmds.append(arm_cmd)
cmds.append(["docker", "buildx", "imagetools", "create", "-t", remote, stage_amd, stage_arm])
return cmds, None
if multi:
if not target_ok:
return [], (
"Multi-arch OS images need ROLEFORGE_OS_DOCKER_HUB_REPOSITORY or ROLEFORGE_OS_IMAGE_REGISTRY "
"(plain docker load cannot store one multi-arch tag locally)."
)
remote = _remote_push_reference(
push_kind,
push_target,
image_tag,
legacy_full_image=legacy_full_image,
)
multi_cmd = [
"docker",
"buildx",
"build",
"--builder",
BUILDX_BUILDER_NAME,
]
if no_cache:
multi_cmd.append("--no-cache")
multi_cmd.extend(
[
"-f",
dockerfile_abs,
"-t",
remote,
*_buildx_attestation_off_args(),
"--platform",
",".join(matrix),
"--push",
repo_root,
]
)
cmds.append(multi_cmd)
return cmds, None
single_plat = matrix[0]
if target_ok:
remote = _remote_push_reference(
push_kind,
push_target,
image_tag,
legacy_full_image=legacy_full_image,
)
one_cmd = [
"docker",
"buildx",
"build",
"--builder",
BUILDX_BUILDER_NAME,
]
if no_cache:
one_cmd.append("--no-cache")
one_cmd.extend(
[
"-f",
dockerfile_abs,
"-t",
remote,
*_buildx_attestation_off_args(),
"--platform",
single_plat,
"--push",
repo_root,
]
)
cmds.append(one_cmd)
else:
cmds.append(buildx_load(image_tag, single_plat))
return cmds, None
def _require_registry_credentials(kind: str, username: str | None, password: str | None) -> None:
if kind not in ("hub", "registry"):
return
if not str(username or "").strip() or not str(password or ""):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="docker_login_username and docker_login_password are required for registry pushes.",
)
def _registry_payload_slice(payload: dict[str, Any]) -> dict[str, Any]:
return {
"registry_provider": payload.get("registry_provider"),
"registry_host": payload.get("registry_host"),
"repository_path": payload.get("repository_path"),
}
def _request_bool_flag(payload: dict[str, Any], key: str = "no_cache") -> bool:
"""Parse UI/API booleans; accepts JSON bool or common string/int truthy forms."""
v = payload.get(key)
if isinstance(v, bool):
return v
if v is None:
return False
if isinstance(v, (int, float)):
return bool(v)
s = str(v).strip().lower()
return s in ("1", "true", "yes", "on")
async def _build_os_image(
dockerfile_rel: str,
image_tag: str,
platform: str,
build_platforms: str = "",
*,
docker_login_username: str | None = None,
docker_login_password: str | None = None,
registry_payload: dict[str, Any] | None = None,
no_cache: bool = False,
) -> dict[str, Any]:
logs: list[str] = []
def _failed_payload(code: str) -> dict[str, Any]:
return {
"dockerfile": dockerfile_rel,
"image": image_tag,
"platform": platform,
"status": "failed",
"exit_code": code,
"logs": logs,
}
cfg = _resolve_os_push_config(registry_payload)
push_kind, push_target = cfg.kind, cfg.target
legacy_layout = cfg.legacy_full_image
login_server = cfg.docker_login_server
if push_kind == "hub":
logs.append(f"[roleforge] Docker Hub repository: docker.io/{push_target}")
elif push_kind == "registry":
logs.append(
f"[roleforge] Registry push target: {push_target}"
+ (" (legacy prefix layout)" if legacy_layout else "")
)
buildx_mode, buildx_note = await _ensure_docker_buildx_mode()
logs.append(f"[roleforge] {buildx_note}")
if buildx_mode == "none":
return {
"dockerfile": dockerfile_rel,
"image": image_tag,
"platform": platform,
"status": "failed",
"exit_code": "127",
"logs": logs,
}
builder_ok, builder_note = await _ensure_buildx_builder()
logs.append(f"[roleforge] {builder_note}")
if not builder_ok:
return {
"dockerfile": dockerfile_rel,
"image": image_tag,
"platform": platform,
"status": "failed",
"exit_code": "127",
"logs": logs,
}
if no_cache:
logs.append("[roleforge] Docker BuildKit cache disabled for this run (--no-cache).")
du = str(docker_login_username or "").strip()
dp = str(docker_login_password or "")
if push_kind == "hub":
if not du or not dp:
logs.append("[roleforge] Registry credentials are required (docker_login_username / docker_login_password).")
return _failed_payload("3")
logs.append(f"$ docker login --username {du!r} --password-stdin (Docker Hub)")
lg_code, lg_out = await _docker_login_stdin_password(username=du, password=dp, registry_server=None)
logs.extend(lg_out.decode(errors="replace").splitlines()[-40:])
if lg_code != 0:
return _failed_payload(str(lg_code))
elif push_kind == "registry":
if not du or not dp:
logs.append("[roleforge] Registry credentials are required (docker_login_username / docker_login_password).")
return _failed_payload("3")
srv = login_server or push_target.split("/")[0]
logs.append(f"$ docker login --username {du!r} --password-stdin {srv}")
lg_code, lg_out = await _docker_login_stdin_password(username=du, password=dp, registry_server=srv)
logs.extend(lg_out.decode(errors="replace").splitlines()[-40:])
if lg_code != 0:
return _failed_payload(str(lg_code))
repo_root = str(DOCKERFILES_ROOT.parent)
dockerfile_abs = str(DOCKERFILES_ROOT / dockerfile_rel)
cmds, plan_err = _os_build_command_plan(
dockerfile_abs=dockerfile_abs,
image_tag=image_tag,
platform=platform,
build_platforms_csv=build_platforms,
repo_root=repo_root,
push_kind=push_kind,
push_target=push_target,
legacy_full_image=legacy_layout,
no_cache=no_cache,
)
if plan_err:
logs.append(f"[roleforge] {plan_err}")
return _failed_payload("2")
manual_pull_commands: list[str] = []
if push_kind in ("hub", "registry") and push_target.strip():
manual_pull_commands = _manual_pull_and_tag_shell_commands(
push_kind,
push_target,
image_tag,
legacy_full_image=legacy_layout,
)
async def run_one(cmd: list[str]) -> int:
max_attempts = _OS_BUILDX_PUSH_RETRIES if _should_retry_os_registry_cmd(cmd) else 1
last = 1
for attempt in range(max_attempts):
if attempt:
logs.append(
f"[roleforge] buildx push retry {attempt + 1}/{max_attempts} "
f"after {_OS_BUILDX_PUSH_RETRY_DELAY_SEC}s…"
)
await asyncio.sleep(_OS_BUILDX_PUSH_RETRY_DELAY_SEC)
logs.append(f"$ {' '.join(cmd)}")
proc = await asyncio.to_thread(
subprocess.run,
cmd,
capture_output=True,
text=True,
cwd=repo_root,
)
out = (proc.stdout or "").splitlines()
err = (proc.stderr or "").splitlines()
if out:
logs.extend(out[-200:])
if err:
logs.extend(err[-200:])
last = int(proc.returncode or 0)
if last == 0:
return 0
blob = f"{proc.stdout or ''}\n{proc.stderr or ''}"[-12000:]
if attempt < max_attempts - 1 and _should_retry_buildx_push_output(blob):
continue
return last
return last
for cmd in cmds:
code = await run_one(cmd)
if code != 0:
return {
"dockerfile": dockerfile_rel,
"image": image_tag,
"platform": platform,
"status": "failed",
"exit_code": str(code),
"logs": logs,
}
return {
"dockerfile": dockerfile_rel,
"image": image_tag,
"platform": platform,
"status": "success",
"exit_code": "0",
"logs": logs,
"manual_pull_commands": manual_pull_commands,
}
def _app_config_row_value_dict(row: Any) -> dict[str, Any]:
"""Decode `app_config.value` (JSONB) to a dict. Handles asyncpg objects and JSON stored as str."""
if not row:
return {}
try:
raw = row["value"]
except (KeyError, TypeError):
return {}
if raw is None:
return {}
if isinstance(raw, dict):
return dict(raw)
if isinstance(raw, str):
try:
parsed = json.loads(raw)
except (json.JSONDecodeError, TypeError, ValueError):
return {}
return dict(parsed) if isinstance(parsed, dict) else {}
return {}
# Binary uploads store body as this prefix + standard base64 (see pages-main.js ROLEFORGE_B64_PREFIX).
ROLEFORGE_B64_PREFIX = "ROLEFORGE_B64:"
LEGACY_FILES_UPLOAD_PREFIX = "files/_roleforge_uploads/"
def _role_file_body_zip_bytes(path: str, body: str) -> bytes:
"""Bytes for role archive: UTF-8 text, or base64-decoded legacy / prefixed binary."""
s = body if isinstance(body, str) else str(body)
if s.startswith(ROLEFORGE_B64_PREFIX):
raw_b64 = "".join(s[len(ROLEFORGE_B64_PREFIX) :].split())
try:
return base64.standard_b64decode(raw_b64, validate=False)
except Exception: # noqa: BLE001
return s.encode("utf-8")
if str(path).startswith(LEGACY_FILES_UPLOAD_PREFIX):
raw_b64 = "".join(s.split())
try:
return base64.standard_b64decode(raw_b64, validate=False)
except Exception: # noqa: BLE001
return s.encode("utf-8")
return s.encode("utf-8")
DEFAULT_ROLE_CATEGORIES = (
"Base OS Hardening",
"User and SSH Access",
"Package Management",
"System Services",
"Time and NTP",
"Logging and Audit",
"Monitoring and Observability",
"Backup and Restore",
"Disaster Recovery",
"Network Baseline",
"DNS and DHCP",
"PKI and Certificates",
"Secrets and Vault",
"Firewall and Security",
"Identity and SSO",
"Container Runtime",
"Kubernetes Bootstrap",
"Kubernetes Add-ons",
"Ingress and Load Balancing",
"Databases",
"Message Brokers",
"Caching and KV Stores",
"Web Servers",
"Reverse Proxy",
"CI/CD and Build Agents",
"Artifact Registry",
"Infrastructure Provisioning",
"Cloud Integrations",
"Storage and Filesystems",
"NFS and SMB",
"Virtualization",
"High Availability",
"Performance Tuning",
"Compliance and Benchmarks",
"Middleware and App Runtime",
"Developer Tooling",
"Automation Utilities",
"Molecule Tests",
"Migration and Upgrade",
"Custom Business Logic",
)
ALLOWED_ROLE_ROOT_DIRS = {
"tasks",
"handlers",
"templates",
"files",
"vars",
"defaults",
"meta",
"library",
"plugins",
"module_utils",
"lookup_plugins",
"filter_plugins",
"tests",
}
# New roles get these files only — no tests/ tree (Molecule uses its own layout on the runner).
DEFAULT_ROLE_SCAFFOLD_FILES = (
("tasks/main.yml", ""),
("handlers/main.yml", ""),
("defaults/main.yml", ""),
("vars/main.yml", ""),
("meta/main.yml", ""),
)
def _parse_json_object(value) -> dict:
if isinstance(value, dict):
return value
if isinstance(value, str):
try:
parsed = json.loads(value)
except Exception: # noqa: BLE001
return {}
return parsed if isinstance(parsed, dict) else {}
return {}
def _safe_role_archive_dir_name(name: str) -> str:
"""Filesystem-safe single segment for zip root (Ansible role folder name)."""
base = str(name or "").strip().lower() or "role"
base = re.sub(r"[^a-z0-9_.-]+", "_", base).strip("._-") or "role"
return base[:120]
def _sanitize_ansible_role_name(raw: str) -> str | None:
"""
Ansible-friendly role name: lowercase; spaces and any character outside [a-z0-9_] become '_'.
Collapses repeated underscores; trims leading/trailing underscores.
Returns None if nothing usable remains (caller should reject empty input).
"""
s = str(raw or "").strip().lower()
s = re.sub(r"[^a-z0-9_]+", "_", s)
s = re.sub(r"_+", "_", s).strip("_")
if not s:
return None
return s[:120]
ROLE_VISIBILITY_VALUES = frozenset({"public", "team", "personal"})
def _normalize_role_visibility(raw: str | None, default: str = "public") -> str:
v = str(raw if raw is not None else default).strip().lower() or default
if v not in ROLE_VISIBILITY_VALUES:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid role visibility")
return v
ROLE_OS_FAMILY_VALUES = frozenset({"rhel", "debian", "alpine", "suse", "arch", "bsd", "windows", "universal"})
def _normalize_role_tags(raw: object | None, *, max_tags: int = 32, max_len_each: int = 48) -> list[str]:
if raw is None:
return []
parts: list[str]
if isinstance(raw, str):
parts = [p.strip() for p in re.split(r"[,;]+", raw) if p.strip()]
elif isinstance(raw, (list, tuple)):
parts = [str(p).strip() for p in raw if str(p).strip()]
else:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="tags must be a list or comma-separated text")
out: list[str] = []
seen: set[str] = set()
for p in parts:
if len(p) > max_len_each:
p = p[:max_len_each].rstrip()
key = p.casefold()
if key in seen:
continue
seen.add(key)
out.append(p)
if len(out) >= max_tags:
break
return out
def _normalize_os_families(raw: object | None) -> list[str]:
if raw is None:
return []
if not isinstance(raw, (list, tuple)):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="os_families must be a list")
out: list[str] = []
seen: set[str] = set()
for item in raw:
slug = str(item).strip().lower()
if slug not in ROLE_OS_FAMILY_VALUES:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid os_family {item!r}; allowed: {', '.join(sorted(ROLE_OS_FAMILY_VALUES))}",
)
if slug in seen:
continue
seen.add(slug)
out.append(slug)
return out
async def _allocate_unique_role_name(conn, user_id: str, base: str) -> str:
base_s = _sanitize_ansible_role_name(base) or "role"
for i in range(80):
cand = base_s if i == 0 else f"{base_s}_copy{i}"[:120]
exists = await conn.fetchval(
"select 1 from ansible_roles where owner_id=$1::uuid and name=$2",
user_id,
cand,
)
if not exists:
return cand
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Could not allocate a unique role name")
async def _fork_role_copy(
conn,
source_role_id: str,
user_id: str,
target_visibility: str,
target_name: str | None = None,
*,
source_type: str | None = None,
source_ref: str | None = None,
category_id: str | None = None,
description: str | None = None,
) -> str:
row = await conn.fetchrow(
"""
select
name,
source_type,
source_ref,
category_id,
content,
coalesce(role_tags, array[]::text[]) as role_tags,
coalesce(os_families, array[]::text[]) as os_families
from ansible_roles where id=$1::uuid
""",
source_role_id,
)
if not row:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
orig_name_s = _sanitize_ansible_role_name(str(row["name"])) or ""
if target_name is not None:
unique_name = _sanitize_ansible_role_name(target_name)
if not unique_name:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Role name is required")
if unique_name == orig_name_s:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Fork name must differ from the original role name",
)
taken = await conn.fetchval(
"select 1 from ansible_roles where owner_id=$1::uuid and name=$2",
user_id,
unique_name,
)
if taken:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="You already have a role with this name"
)
else:
unique_name = await _allocate_unique_role_name(conn, user_id, str(row["name"]))
content_obj = row["content"]
if not isinstance(content_obj, dict):
content_obj = _parse_json_object(content_obj)
content_obj = dict(content_obj)
if target_name is None:
st = str(row["source_type"] or "inline")
sr = "fork"
cat = row["category_id"]
else:
content_obj["description"] = str(description if description is not None else "")
st = str(source_type or "inline").strip()
sr = str(source_ref if source_ref is not None else "")
cat = category_id
if cat:
cok = await conn.fetchval("select 1 from role_categories where id=$1::uuid", cat)
if not cok:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role category not found")
tag_copy = [str(x) for x in (row["role_tags"] or [])]
os_copy = [str(x) for x in (row["os_families"] or [])]
new_id = await conn.fetchval(
"""
insert into ansible_roles (
owner_id, name, source_type, source_ref, category_id, content, visibility,
forked_from_id, team_id, role_tags, os_families
)
values ($1::uuid, $2, $3, $4, $5::uuid, $6::jsonb, $7, $8::uuid, null, $9::text[], $10::text[])
returning id::text
""",
user_id,
unique_name,
st,
sr,
cat,
json.dumps(content_obj),
target_visibility,
source_role_id,
tag_copy,
os_copy,
)
files = await conn.fetch(
"select path, content from role_files where role_id=$1::uuid order by path",
source_role_id,
)
if files:
await conn.executemany(
"""
insert into role_files (role_id, path, content)
values ($1::uuid, $2, $3)
""",
[(new_id, str(fr["path"]), str(fr["content"])) for fr in files],
)
else:
await _upsert_role_files(conn, str(new_id), list(DEFAULT_ROLE_SCAFFOLD_FILES))
return str(new_id)
async def _user_is_admin(conn, user_id: str) -> bool:
role = await conn.fetchval("select role from users where id=$1::uuid", user_id)
return str(role or "") in ("admin", "root")
async def _is_active_team_member(conn, team_id: str | None, user_id: str) -> bool:
if not team_id:
return False
ok = await conn.fetchval(
"""
select 1 from team_memberships
where team_id = $1::uuid and user_id = $2::uuid and status = 'active'
""",
team_id,
user_id,
)
return ok is not None
async def _resolve_role_for_mutation(conn, role_id: str, user_id: str) -> tuple[str, bool]:
"""Return (effective_role_id, forked). Fork when editing another user's public role."""
row = await conn.fetchrow(
"""
select owner_id::text, visibility, team_id::text as team_id
from ansible_roles where id=$1::uuid
""",
role_id,
)
if not row:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
if row["owner_id"] == user_id:
return role_id, False
if await _user_is_admin(conn, user_id):
return role_id, False
vis = str(row["visibility"] or "personal")
tid = row["team_id"]
if vis == "team" and tid:
if await _is_active_team_member(conn, tid, user_id):
return role_id, False
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Join the team to edit this team role",
)
if vis == "personal":
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot edit another user's personal role")
if vis == "public":
new_id = await _fork_role_copy(conn, role_id, user_id, "personal")
return new_id, True
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Role access denied")
async def _clamp_visibility_on_update(conn, role_id: str, user_id: str, requested: str) -> str:
"""Owner may change personal↔public↔team (subject to team_id validation). Public stays public for non-admins."""
req = _normalize_role_visibility(requested)
row = await conn.fetchrow(
"""
select owner_id::text, coalesce(visibility, 'personal') as visibility
from ansible_roles where id=$1::uuid
""",
role_id,
)
if not row:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
cur = str(row["visibility"])
is_owner = str(row["owner_id"]) == user_id
is_admin = await _user_is_admin(conn, user_id)
if cur == "public":
if is_admin:
return req
return "public"
if cur == "personal":
if req == "public" and (is_owner or is_admin):
return "public"
if req == "team" and (is_owner or is_admin):
return "team"
return "personal"
if cur == "team":
if is_admin or is_owner:
return req
return cur
return _normalize_role_visibility(requested, default="personal")
async def _assert_can_view_role(conn, role_id: str, user_id: str) -> dict[str, str]:
row = await conn.fetchrow(
"""
select
owner_id::text as owner_id,
coalesce(visibility, 'public') as visibility,
team_id::text as team_id
from ansible_roles where id=$1::uuid
""",
role_id,
)
if not row:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
out = {"owner_id": str(row["owner_id"]), "visibility": str(row["visibility"]), "team_id": row["team_id"]}
if await _user_is_admin(conn, user_id):
return out
vis = str(row["visibility"])
if vis == "public":
return out
if row["owner_id"] == user_id:
return out
if vis == "team" and row["team_id"] and await _is_active_team_member(conn, str(row["team_id"]), user_id):
return out
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Role access denied")
def _validate_role_file_path(path: str) -> str:
normalized = str(path or "").strip().lstrip("/")
if not normalized or normalized.endswith("/"):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Role file path is invalid")
root = normalized.split("/", 1)[0]
if root not in ALLOWED_ROLE_ROOT_DIRS:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Role file path must start with one of: {', '.join(sorted(ALLOWED_ROLE_ROOT_DIRS))}",
)
return normalized
def _zip_member_to_rel_paths(namelist: list[str]) -> tuple[list[tuple[str, str]], str | None]:
"""Map zip entry -> relative path under role; strip one shared top-level folder (Galaxy-style layout)."""
clean: list[str] = []
for n in namelist:
n = n.replace("\\", "/").strip()
if not n or n.endswith("/"):
continue
if "__MACOSX" in n or "/.DS_Store" in n or n.endswith(".DS_Store"):
continue
clean.append(n)
if not clean:
return [], None
parts = [c.split("/") for c in clean]
first0 = parts[0][0]
if len(parts[0]) > 1 and all(len(p) > 1 and p[0] == first0 for p in parts):
pairs = [(clean[i], "/".join(parts[i][1:])) for i in range(len(clean))]
return pairs, first0
return [(c, c) for c in clean], None
def _decode_import_bytes(rel_path: str, raw: bytes) -> tuple[str, str] | None:
"""Turn zip bytes into stored path + text or ROLEFORGE_B64 body; skip invalid paths."""
rel_path = rel_path.replace("\\", "/").strip().lstrip("/")
if not rel_path or rel_path.endswith("/"):
return None
try:
text = raw.decode("utf-8")
except UnicodeDecodeError:
candidate = rel_path
if not candidate.startswith("files/"):
base = candidate.split("/")[-1]
safe = re.sub(r"[^a-zA-Z0-9_.-]+", "_", base).strip("._-")[:200] or "file"
candidate = f"files/{safe}"
try:
norm = _validate_role_file_path(candidate)
except HTTPException:
return None
return norm, ROLEFORGE_B64_PREFIX + base64.standard_b64encode(raw).decode("ascii")
try:
norm = _validate_role_file_path(rel_path)
except HTTPException:
return None
return norm, text
def _parse_role_zip_bytes(raw: bytes, filename_hint: str) -> tuple[str, list[tuple[str, str]]]:
try:
zf = zipfile.ZipFile(io.BytesIO(raw))
except zipfile.BadZipFile as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid ZIP file") from exc
pairs, root_folder = _zip_member_to_rel_paths(zf.namelist())
stem = Path(filename_hint or "imported.zip").stem
suggested = _safe_role_archive_dir_name(root_folder or stem)
merged: dict[str, str] = {}
for member, rel in pairs:
try:
data = zf.read(member)
except (KeyError, RuntimeError):
continue
got = _decode_import_bytes(rel, data)
if not got:
continue
path_s, body_s = got
merged[path_s] = body_s
if not merged:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No allowed role files found in archive (use paths under tasks/, handlers/, templates/, files/, …).",
)
items = sorted(merged.items(), key=lambda x: x[0])
return suggested, items
def _role_editor_lint_kind(path: str) -> Literal["yaml", "json"]:
lower = str(path or "").strip().lstrip("/").lower()
base = lower.split("/")[-1] if lower else ""
if base.endswith((".json", ".tfstate", ".ndjson")):
return "json"
return "yaml"
def _lint_yaml_role_file(content: str, pseudo_path: str, cfg: YamlLintConfig) -> list[LintRoleFileError]:
out: list[LintRoleFileError] = []
for problem in yaml_linter.run(content, cfg, pseudo_path):
lvl = str(getattr(problem, "level", "") or "error").lower()
rule = getattr(problem, "rule", None)
desc = getattr(problem, "desc", "") or ""
prefix = f"{rule}: " if rule else ""
out.append(
LintRoleFileError(
line=getattr(problem, "line", None),
column=getattr(problem, "column", None),
level=lvl,
message=f"{prefix}{desc}".strip(),
)
)
return out
async def _upsert_role_files(conn, role_id: str, files: list[tuple[str, str]]) -> None:
if not files:
return
await conn.executemany(
"""
insert into role_files (role_id, path, content)
values ($1::uuid, $2, $3)
on conflict (role_id, path)
do update set content=excluded.content, updated_at=now()
""",
[(role_id, path, content) for path, content in files],
)
async def _ensure_default_role_categories(conn) -> None:
for name in DEFAULT_ROLE_CATEGORIES:
await conn.execute(
"""
insert into role_categories (owner_id, name)
select null, $1
where not exists (
select 1 from role_categories where lower(name) = lower($1)
)
""",
name,
)
async def _prune_legacy_empty_tests_scaffold_for_user(conn, user_id: str) -> None:
"""Remove old default tests/test.yml (empty) left from earlier RoleForge scaffolds."""
await conn.execute(
"""
delete from role_files rf
using ansible_roles r
where rf.role_id = r.id
and r.owner_id = $1::uuid
and rf.path = 'tests/test.yml'
and btrim(coalesce(rf.content, '')) = ''
""",
user_id,
)
async def _backfill_role_files_for_user(conn, user_id: str) -> None:
rows = await conn.fetch(
"""
select r.id::text as id, r.content
from ansible_roles r
where r.owner_id=$1::uuid
""",
user_id,
)
for row in rows:
role_id = str(row["id"])
file_count = await conn.fetchval(
"select count(*)::int from role_files where role_id=$1::uuid",
role_id,
)
if int(file_count or 0) > 0:
continue
content = _parse_json_object(row["content"])
tasks_yaml = str(content.get("tasks_yaml") or "")
files = [*DEFAULT_ROLE_SCAFFOLD_FILES]
if tasks_yaml.strip():
files = [*files, ("tasks/main.yml", tasks_yaml)]
await _upsert_role_files(conn, role_id, files)
await _prune_legacy_empty_tests_scaffold_for_user(conn, user_id)
async def _monitor_job(
job_id: str, runner_url: str, runner_run_id: str, runner_name: str, runtime_mode: str
) -> None:
pool = get_pool()
settings = get_settings()
started = time.monotonic()
poll_errors = 0
offset = 0
final_status = "failed"
try:
while True:
if time.monotonic() - started > settings.app_runner_timeout_sec:
final_status = "failed"
break
try:
snapshot = await get_run_status(runner_url, runner_run_id, offset)
poll_errors = 0
except Exception: # noqa: BLE001
poll_errors += 1
if poll_errors >= settings.app_runner_max_poll_errors:
final_status = "failed"
break
await asyncio.sleep(settings.app_runner_poll_interval_sec)
continue
lines = snapshot.get("lines", [])
offset = int(snapshot.get("next_offset", offset))
async with pool.acquire() as conn:
await conn.execute(
"update jobs set runner_last_heartbeat=now() where id=$1::uuid",
job_id,
)
if lines:
await conn.executemany(
"insert into job_logs (job_id, log_line) values ($1::uuid, $2)",
[(job_id, line) for line in lines],
)
status_name = str(snapshot.get("status", "running"))
if status_name in {"success", "failed"}:
final_status = status_name
break
await asyncio.sleep(settings.app_runner_poll_interval_sec)
finally:
if AUTO_TERMINATE_TEST_RUNNER:
await terminate_ephemeral_worker(runner_name, runtime_mode)
async with pool.acquire() as conn:
await conn.execute(
"update jobs set status=$1, finished_at=now() where id=$2::uuid",
final_status,
job_id,
)
async def _monitor_test(
test_id: str, runner_url: str, runner_run_id: str, runner_name: str, runtime_mode: str
) -> None:
pool = get_pool()
settings = get_settings()
started = time.monotonic()
poll_errors = 0
offset = 0
final_status = "failed"
try:
while True:
if time.monotonic() - started > settings.app_runner_timeout_sec:
final_status = "failed"
break
try:
snapshot = await get_run_status(runner_url, runner_run_id, offset)
poll_errors = 0
except Exception: # noqa: BLE001
poll_errors += 1
if poll_errors >= settings.app_runner_max_poll_errors:
final_status = "failed"
break
await asyncio.sleep(settings.app_runner_poll_interval_sec)
continue
lines = snapshot.get("lines", [])
offset = int(snapshot.get("next_offset", offset))
async with pool.acquire() as conn:
await conn.execute(
"update test_runs set runner_last_heartbeat=now() where id=$1::uuid",
test_id,
)
if lines:
await conn.executemany(
"insert into test_logs (test_run_id, log_line) values ($1::uuid, $2)",
[(test_id, line) for line in lines],
)
status_name = str(snapshot.get("status", "running"))
if status_name in {"success", "failed"}:
final_status = status_name
break
await asyncio.sleep(settings.app_runner_poll_interval_sec)
finally:
await terminate_ephemeral_worker(runner_name, runtime_mode)
async with pool.acquire() as conn:
await conn.execute(
"""
update test_runs
set status=$1, finished_at=now()
where id=$2::uuid
and status not in ('success', 'failed')
""",
final_status,
test_id,
)
@router.post("/roles/lint-file", response_model=LintRoleFileResponse)
async def lint_role_file(
payload: LintRoleFileRequest,
user_id: str = Depends(get_current_user_id),
) -> LintRoleFileResponse:
"""Validate editor buffer: YAML via yamllint (DB `yamllint`); JSON via project JSONLint rules (DB `jsonlint`)."""
_ = user_id
_validate_role_file_path(payload.path)
kind = _role_editor_lint_kind(payload.path)
if kind == "json":
merged = await get_jsonlint_config_for_lint()
errors = run_json_lint(payload.content, merged)
blocking = [e for e in errors if (e.level or "").lower() == "error"]
return LintRoleFileResponse(ok=len(blocking) == 0, kind="json", errors=errors)
cfg = await get_yamllint_config_for_lint()
errors = _lint_yaml_role_file(payload.content, payload.path, cfg)
blocking = [e for e in errors if (e.level or "").lower() == "error"]
return LintRoleFileResponse(ok=len(blocking) == 0, kind="yaml", errors=errors)
@router.post("/roles/import-archive", response_model=RoleImportResult)
async def import_role_archive(
file: UploadFile = File(...),
user_id: str = Depends(get_current_user_id),
) -> RoleImportResult:
"""Parse an Ansible role ZIP (same layout as export); returns files for the create-role UI."""
_ = user_id
raw = await file.read()
max_bytes = 15 * 1024 * 1024
if len(raw) > max_bytes:
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail="ZIP is too large (max 15 MB).",
)
suggested, tuples = _parse_role_zip_bytes(raw, file.filename or "imported.zip")
return RoleImportResult(
suggested_name=suggested,
items=[RoleImportItem(path=p, content=c) for p, c in tuples],
)
@router.post("/roles")
async def create_role(payload: RoleCreate, user_id: str = Depends(get_current_user_id)) -> dict[str, str]:
name = _sanitize_ansible_role_name(payload.name)
if not name:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Role name is required")
_ = _normalize_role_visibility(payload.visibility)
visibility = "personal"
team_id_insert: str | None = None
if payload.team_id:
tid = str(payload.team_id).strip()
try:
UUID(tid)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid team_id") from exc
visibility = "team"
team_id_insert = tid
pool = get_pool()
async with pool.acquire() as conn:
if team_id_insert:
team_ok = await conn.fetchval("select 1 from teams where id=$1::uuid", team_id_insert)
if not team_ok:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found")
if not await _is_active_team_member(conn, team_id_insert, user_id):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Must be an active team member to create a team role",
)
if payload.category_id:
exists = await conn.fetchval(
"select 1 from role_categories where id=$1::uuid",
payload.category_id,
)
if not exists:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role category not found")
tags_norm = _normalize_role_tags(payload.tags)
os_norm = _normalize_os_families(payload.os_families)
role_id = await conn.fetchval(
"""
insert into ansible_roles (
owner_id, name, source_type, source_ref, category_id, content, visibility, team_id,
role_tags, os_families
)
values ($1, $2, $3, $4, $5::uuid, $6::jsonb, $7, $8::uuid, $9::text[], $10::text[])
returning id::text
""",
user_id,
name,
payload.source_type,
payload.source_ref,
payload.category_id,
json.dumps({"description": payload.description, **(payload.content or {})}),
visibility,
team_id_insert,
tags_norm,
os_norm,
)
validated_files = [(_validate_role_file_path(item.path), item.content) for item in payload.files]
await _upsert_role_files(
conn,
str(role_id),
[*DEFAULT_ROLE_SCAFFOLD_FILES, *validated_files],
)
return {"id": role_id, "name": name}
@router.post("/roles/import-yaml")
async def import_role_yaml(
payload: RoleYamlImportRequest, user_id: str = Depends(get_current_user_id)
) -> dict[str, str]:
try:
parsed = yaml.safe_load(payload.tasks_yaml)
except yaml.YAMLError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid YAML") from exc
if not isinstance(parsed, list):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Role tasks YAML must be a list of Ansible tasks",
)
name = _sanitize_ansible_role_name(payload.name)
if not name:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Role name is required")
_ = _normalize_role_visibility(payload.visibility)
visibility = "personal"
pool = get_pool()
async with pool.acquire() as conn:
if payload.category_id:
exists = await conn.fetchval(
"select 1 from role_categories where id=$1::uuid",
payload.category_id,
)
if not exists:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role category not found")
tags_norm = _normalize_role_tags(payload.tags)
os_norm = _normalize_os_families(payload.os_families)
role_id = await conn.fetchval(
"""
insert into ansible_roles (
owner_id, name, source_type, source_ref, category_id, content, visibility,
role_tags, os_families
)
values ($1::uuid, $2, 'inline', $3, $4::uuid, $5::jsonb, $6, $7::text[], $8::text[])
returning id::text
""",
user_id,
name,
payload.source_ref,
payload.category_id,
json.dumps({"description": payload.description, "tasks_yaml": payload.tasks_yaml}),
visibility,
tags_norm,
os_norm,
)
validated_files = [(_validate_role_file_path(item.path), item.content) for item in payload.files]
await _upsert_role_files(
conn,
str(role_id),
[
*DEFAULT_ROLE_SCAFFOLD_FILES,
("tasks/main.yml", payload.tasks_yaml),
*validated_files,
],
)
return {"id": role_id, "name": name}
@router.get("/roles/{role_id}/export-yaml")
async def export_role_yaml(role_id: str, user_id: str = Depends(get_current_user_id)) -> Response:
pool = get_pool()
async with pool.acquire() as conn:
await _assert_can_view_role(conn, role_id, user_id)
row = await conn.fetchrow(
"""
select content
from ansible_roles
where id=$1::uuid
""",
role_id,
)
if not row:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
content = _parse_json_object(row["content"])
tasks_yaml = content.get("tasks_yaml")
if not tasks_yaml:
tasks_yaml = yaml.safe_dump(content, sort_keys=False, allow_unicode=False)
return Response(content=tasks_yaml, media_type="application/x-yaml")
@router.get("/roles/{role_id}/export-archive")
async def export_role_archive(role_id: str, user_id: str = Depends(get_current_user_id)) -> Response:
pool = get_pool()
async with pool.acquire() as conn:
await _assert_can_view_role(conn, role_id, user_id)
row = await conn.fetchrow(
"""
select r.name, r.content
from ansible_roles r
where r.id=$1::uuid
""",
role_id,
)
if not row:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
content = _parse_json_object(row["content"])
file_rows = await conn.fetch(
"select path, content from role_files where role_id=$1::uuid order by path",
role_id,
)
if file_rows:
paths_body: dict[str, str] = {}
for fr in file_rows:
paths_body[str(fr["path"])] = str(fr["content"])
files = sorted(paths_body.items(), key=lambda x: x[0])
else:
tasks_yaml = str(content.get("tasks_yaml") or "")
merged: dict[str, str] = {path: body for path, body in DEFAULT_ROLE_SCAFFOLD_FILES}
if tasks_yaml.strip():
merged["tasks/main.yml"] = tasks_yaml
files = sorted(merged.items(), key=lambda x: x[0])
root = _safe_role_archive_dir_name(row["name"])
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for path, body in files:
arcname = f"{root}/{path}"
raw = _role_file_body_zip_bytes(path, body) if isinstance(body, str) else bytes(body)
zf.writestr(arcname, raw)
zip_bytes = buf.getvalue()
zip_filename = f"{root}.zip"
ascii_fallback = zip_filename.encode("ascii", "replace").decode("ascii")
disp = (
f'attachment; filename="{ascii_fallback}"; '
f"filename*=UTF-8''{quote(zip_filename)}"
)
return Response(
content=zip_bytes,
media_type="application/zip",
headers={"Content-Disposition": disp},
)
@router.get("/roles/{role_id}/files")
async def list_role_files(role_id: str, user_id: str = Depends(get_current_user_id)) -> dict[str, list[dict[str, str]]]:
pool = get_pool()
async with pool.acquire() as conn:
await _assert_can_view_role(conn, role_id, user_id)
rows = await conn.fetch(
"select path, content from role_files where role_id=$1::uuid order by path",
role_id,
)
return {"items": [{"path": row["path"], "content": row["content"]} for row in rows]}
@router.get("/roles/details/{role_id}")
async def get_role_details(role_id: str, user_id: str = Depends(get_current_user_id)) -> dict[str, Any]:
pool = get_pool()
async with pool.acquire() as conn:
await _assert_can_view_role(conn, role_id, user_id)
row = await conn.fetchrow(
"""
select
r.id::text as id,
r.owner_id::text as owner_id,
r.name,
r.source_type,
r.source_ref,
r.category_id::text as category_id,
r.created_at,
coalesce(r.visibility, 'personal') as visibility,
r.forked_from_id::text as forked_from_id,
r.team_id::text as team_id,
t.name as team_name,
c.name as category_name,
r.content,
coalesce(r.role_tags, array[]::text[]) as role_tags,
coalesce(r.os_families, array[]::text[]) as os_families
from ansible_roles r
left join role_categories c on c.id = r.category_id
left join teams t on t.id = r.team_id
where r.id=$1::uuid
""",
role_id,
)
if not row:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
is_admin = await _user_is_admin(conn, user_id)
content = _parse_json_object(row["content"])
is_owner = row["owner_id"] == user_id
vis = str(row["visibility"])
editable = bool(is_owner or is_admin)
deletable = (vis == "personal" and (is_owner or is_admin)) or (vis == "public" and is_admin)
return {
"id": row["id"],
"name": row["name"],
"source_type": row["source_type"],
"source_ref": row["source_ref"],
"category_id": row["category_id"],
"category_name": row["category_name"] or "Uncategorized",
"description": str(content.get("description") or ""),
"created_at": row["created_at"].isoformat() if row["created_at"] else "",
"visibility": vis,
"forked_from_id": row["forked_from_id"] or "",
"team_id": row["team_id"] or "",
"team_name": str(row["team_name"] or "") if row["team_id"] else "",
"tags": [str(t) for t in (row["role_tags"] or [])],
"os_families": [str(t) for t in (row["os_families"] or [])],
"is_owner": is_owner,
"is_admin": is_admin,
"editable": editable,
"deletable": deletable,
}
@router.post("/roles/{role_id}/fork")
async def fork_role(
role_id: str, payload: RoleForkRequest, user_id: str = Depends(get_current_user_id)
) -> dict[str, str]:
pool = get_pool()
async with pool.acquire() as conn:
await _assert_can_view_role(conn, role_id, user_id)
new_id = await _fork_role_copy(
conn,
role_id,
user_id,
"personal",
target_name=payload.name,
source_type=payload.source_type,
source_ref=payload.source_ref,
category_id=payload.category_id,
description=payload.description,
)
row = await conn.fetchrow("select name from ansible_roles where id=$1::uuid", new_id)
return {"id": str(new_id), "name": str(row["name"]) if row else payload.name}
@router.put("/roles/details/{role_id}")
async def update_role_details(
role_id: str,
payload: RoleUpdate,
user_id: str = Depends(get_current_user_id),
) -> dict[str, Any]:
name = _sanitize_ansible_role_name(payload.name)
if not name:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Role name is required")
pool = get_pool()
async with pool.acquire() as conn:
effective_id, forked = await _resolve_role_for_mutation(conn, role_id, user_id)
vis_payload = await _clamp_visibility_on_update(conn, effective_id, user_id, payload.visibility)
if payload.category_id:
exists = await conn.fetchval("select 1 from role_categories where id=$1::uuid", payload.category_id)
if not exists:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role category not found")
tags_norm = _normalize_role_tags(payload.tags)
os_norm = _normalize_os_families(payload.os_families)
tid_final: str | None = None
team_display_name = ""
if vis_payload == "team":
raw_new = str(payload.team_id or "").strip()
prev_row = await conn.fetchrow(
"select team_id::text as team_id from ansible_roles where id=$1::uuid",
effective_id,
)
prev_tid = str(prev_row["team_id"] or "").strip() if prev_row else ""
tid_use = raw_new or prev_tid
if not tid_use:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="team_id is required for team visibility",
)
try:
UUID(tid_use)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid team_id") from exc
team_exists = await conn.fetchval("select 1 from teams where id=$1::uuid", tid_use)
if not team_exists:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found")
is_member = await _is_active_team_member(conn, tid_use, user_id)
if not is_member and not await _user_is_admin(conn, user_id):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Must be an active member of the selected team",
)
tid_final = tid_use
team_display_name = str(
await conn.fetchval("select name from teams where id=$1::uuid", tid_final) or ""
)
await conn.execute(
"""
update ansible_roles
set
name=$1,
source_type=$2,
source_ref=$3,
category_id=$4::uuid,
content=$5::jsonb,
visibility=$6,
role_tags=$7::text[],
os_families=$8::text[],
team_id=$9::uuid
where id=$10::uuid
""",
name,
payload.source_type,
payload.source_ref,
payload.category_id,
json.dumps({"description": payload.description}),
vis_payload,
tags_norm,
os_norm,
tid_final,
effective_id,
)
return {
"status": "updated",
"id": effective_id,
"name": name,
"forked": forked,
"visibility": vis_payload,
"team_id": tid_final or "",
"team_name": team_display_name if vis_payload == "team" else "",
}
@router.put("/roles/{role_id}/files")
async def replace_role_files(
role_id: str,
payload: RoleFilesReplaceRequest,
user_id: str = Depends(get_current_user_id),
) -> dict[str, Any]:
pool = get_pool()
async with pool.acquire() as conn:
effective_id, forked = await _resolve_role_for_mutation(conn, role_id, user_id)
normalized: list[tuple[str, str]] = []
for item in payload.files:
path = _validate_role_file_path(item.path)
normalized.append((path, item.content))
await conn.execute("delete from role_files where role_id=$1::uuid", effective_id)
await _upsert_role_files(conn, effective_id, normalized if normalized else list(DEFAULT_ROLE_SCAFFOLD_FILES))
return {"status": "updated", "role_id": effective_id, "forked": forked}
@router.delete("/roles/{role_id}")
async def delete_role(role_id: str, user_id: str = Depends(get_current_user_id)) -> dict[str, str]:
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
select
owner_id::text,
coalesce(visibility, 'personal') as visibility,
team_id::text as team_id
from ansible_roles where id=$1::uuid
""",
role_id,
)
if not row:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
is_admin = await _user_is_admin(conn, user_id)
vis = str(row["visibility"])
owner_id = str(row["owner_id"])
if vis == "public":
if not is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only an administrator can delete a public role",
)
elif vis == "team" and row["team_id"]:
if not (
is_admin
or owner_id == user_id
or await _is_active_team_member(conn, str(row["team_id"]), user_id)
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Role delete denied")
elif owner_id != user_id and not is_admin:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Role delete denied")
await conn.execute(
"update playbooks set role_ids = array_remove(role_ids, $1::uuid) where $1::uuid = any(role_ids)",
role_id,
)
await conn.execute("delete from ansible_roles where id=$1::uuid", role_id)
return {"status": "deleted", "id": role_id}
@router.get("/roles")
async def list_roles(
user_id: str = Depends(get_current_user_id),
team_id: str | None = Query(None, description="List Ansible roles for this team (members only)."),
) -> dict[str, list[dict]]:
pool = get_pool()
async with pool.acquire() as conn:
await _ensure_default_role_categories(conn)
await _backfill_role_files_for_user(conn, user_id)
if team_id:
if not await _is_active_team_member(conn, team_id, user_id):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not a member of this team")
rows = await conn.fetch(
"""
select
r.id::text as id,
r.name,
r.source_type,
r.source_ref,
r.created_at,
r.category_id::text as category_id,
c.name as category_name,
r.content,
coalesce(r.visibility, 'personal') as visibility,
r.owner_id::text as owner_id,
r.team_id::text as team_id,
coalesce(fc.cnt, 0)::int as file_count,
coalesce(r.role_tags, array[]::text[]) as role_tags,
coalesce(r.os_families, array[]::text[]) as os_families
from ansible_roles r
left join role_categories c on c.id = r.category_id
left join (
select role_id, count(*)::int as cnt
from role_files
group by role_id
) fc on fc.role_id = r.id
where r.team_id = $1::uuid and coalesce(r.visibility, 'personal') = 'team'
order by coalesce(c.name, 'Uncategorized'), r.name
""",
team_id,
)
else:
rows = await conn.fetch(
"""
select
r.id::text as id,
r.name,
r.source_type,
r.source_ref,
r.created_at,
r.category_id::text as category_id,
c.name as category_name,
r.content,
coalesce(r.visibility, 'personal') as visibility,
r.owner_id::text as owner_id,
r.team_id::text as team_id,
coalesce(fc.cnt, 0)::int as file_count,
coalesce(r.role_tags, array[]::text[]) as role_tags,
coalesce(r.os_families, array[]::text[]) as os_families
from ansible_roles r
left join role_categories c on c.id = r.category_id
left join (
select role_id, count(*)::int as cnt
from role_files
group by role_id
) fc on fc.role_id = r.id
where coalesce(r.visibility, 'personal') = 'public' or r.owner_id = $1::uuid
order by coalesce(c.name, 'Uncategorized'), r.name
""",
user_id,
)
items = [
{
"id": row["id"],
"name": row["name"],
"source_type": row["source_type"],
"source_ref": row["source_ref"],
"created_at": row["created_at"].isoformat() if row["created_at"] else "",
"category_id": row["category_id"],
"category_name": row["category_name"] or "Uncategorized",
"description": str(_parse_json_object(row["content"]).get("description") or ""),
"file_count": int(row["file_count"] or 0),
"visibility": str(row["visibility"]),
"is_owner": row["owner_id"] == user_id,
"tags": [str(t) for t in (row["role_tags"] or [])],
"os_families": [str(t) for t in (row["os_families"] or [])],
}
for row in rows
]
return {"items": items}
@router.get("/playbooks")
async def list_playbooks(user_id: str = Depends(get_current_user_id)) -> dict[str, list[dict]]:
pool = get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
select id::text as id, name, description, owner_id::text as owner_id
from playbooks
where owner_id=$1::uuid
order by name
""",
user_id,
)
return {
"items": [
{
"id": row["id"],
"name": row["name"],
"description": row["description"],
"owner_id": row["owner_id"],
}
for row in rows
]
}
@router.post("/playbooks/{playbook_id}/roles")
async def add_role_to_playbook(
playbook_id: str,
payload: AddRoleToPlaybookRequest,
user_id: str = Depends(get_current_user_id),
) -> dict[str, str]:
pool = get_pool()
async with pool.acquire() as conn:
owner_id = await conn.fetchval(
"select owner_id::text from playbooks where id=$1::uuid",
playbook_id,
)
if not owner_id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Playbook not found")
if owner_id != user_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Playbook access denied")
role_row = await conn.fetchrow(
"select owner_id::text, coalesce(visibility, 'public') as visibility from ansible_roles where id=$1::uuid",
payload.role_id,
)
if not role_row:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
if role_row["owner_id"] != user_id and role_row["visibility"] != "public":
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Role access denied")
await conn.execute(
"""
update playbooks
set role_ids = (
select array_agg(distinct role_id)
from unnest(role_ids || $1::uuid) as role_id
)
where id=$2::uuid
""",
payload.role_id,
playbook_id,
)
return {"status": "added", "playbook_id": playbook_id, "role_id": payload.role_id}
@router.get("/roles/categories")
async def list_role_categories(user_id: str = Depends(get_current_user_id)) -> dict[str, list[dict]]:
pool = get_pool()
async with pool.acquire() as conn:
await _ensure_default_role_categories(conn)
rows = await conn.fetch(
"""
select
c.id::text as id,
c.name,
c.created_at,
count(r.id)::int as role_count
from role_categories c
left join ansible_roles r on r.category_id = c.id
group by c.id
order by c.name
"""
)
return {
"items": [
{
"id": row["id"],
"name": row["name"],
"role_count": row["role_count"],
"created_at": row["created_at"].isoformat() if row["created_at"] else "",
}
for row in rows
]
}
@router.post("/roles/categories")
async def create_role_category(
payload: RoleCategoryCreate,
_: str = Depends(get_current_admin_user_id),
) -> dict[str, str]:
category_name = payload.name.strip()
if not category_name:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Category name is required")
pool = get_pool()
async with pool.acquire() as conn:
existing = await conn.fetchval(
"select id::text from role_categories where lower(name)=lower($1)",
category_name,
)
if existing:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Category already exists")
category_id = await conn.fetchval(
"""
insert into role_categories (owner_id, name)
values (null, $1)
returning id::text
""",
category_name,
)
return {"id": category_id}
@router.delete("/roles/categories/{category_id}")
async def delete_role_category(category_id: str, _: str = Depends(get_current_admin_user_id)) -> dict[str, str]:
pool = get_pool()
async with pool.acquire() as conn:
exists = await conn.fetchval("select 1 from role_categories where id=$1::uuid", category_id)
if not exists:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Category not found")
await conn.execute("update ansible_roles set category_id=null where category_id=$1::uuid", category_id)
await conn.execute("delete from role_categories where id=$1::uuid", category_id)
return {"status": "deleted", "id": category_id}
@router.patch("/roles/categories/{category_id}")
async def update_role_category(
category_id: str,
payload: RoleCategoryCreate,
_: str = Depends(get_current_admin_user_id),
) -> dict[str, str]:
category_name = payload.name.strip()
if not category_name:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Category name is required")
pool = get_pool()
async with pool.acquire() as conn:
exists = await conn.fetchval("select 1 from role_categories where id=$1::uuid", category_id)
if not exists:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Category not found")
duplicate = await conn.fetchval(
"select id::text from role_categories where lower(name)=lower($1) and id<>$2::uuid",
category_name,
category_id,
)
if duplicate:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Category already exists")
await conn.execute("update role_categories set name=$1 where id=$2::uuid", category_name, category_id)
return {"status": "updated", "id": category_id}
@router.get("/admin/config")
async def get_admin_config(_: str = Depends(get_current_admin_user_id)) -> dict[str, dict]:
settings = get_settings()
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow("select value from app_config where key='project'")
saved = _app_config_row_value_dict(row)
saved_proj = {k: v for k, v in saved.items() if k in _PROJECT_ADMIN_KEYS}
defaults = {
"app_name": settings.app_name,
"app_tagline": settings.app_tagline,
"runner_image": settings.app_runner_image,
"docker_network": settings.app_docker_network,
"runner_timeout_sec": settings.app_runner_timeout_sec,
"runner_poll_interval_sec": settings.app_runner_poll_interval_sec,
"runner_max_poll_errors": settings.app_runner_max_poll_errors,
}
return {"items": {**defaults, **saved_proj}}
@router.put("/admin/config")
async def update_admin_config(payload: dict, _: str = Depends(get_current_admin_user_id)) -> dict[str, str]:
if not isinstance(payload, dict):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Config payload must be an object")
merged: dict[str, Any] = {}
for key in _PROJECT_ADMIN_KEYS:
if key not in payload:
continue
val = payload[key]
if key in ("runner_timeout_sec", "runner_poll_interval_sec", "runner_max_poll_errors"):
try:
merged[key] = int(val)
except (TypeError, ValueError):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"{key} must be an integer",
) from None
elif key in ("app_name", "app_tagline", "runner_image", "docker_network"):
merged[key] = str(val)
else:
merged[key] = val
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow("select value from app_config where key='project'")
prev = _app_config_row_value_dict(row)
prev_proj = {k: v for k, v in prev.items() if k in _PROJECT_ADMIN_KEYS}
out = {**prev_proj, **merged}
await conn.execute(
"""
insert into app_config (key, value, updated_at)
values ('project', $1::jsonb, now())
on conflict (key)
do update set value=excluded.value, updated_at=now()
""",
json.dumps(out),
)
return {"status": "updated"}
@router.get("/admin/os-registry-config")
async def get_admin_os_registry_config(_: str = Depends(get_current_admin_user_id)) -> dict[str, Any]:
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow("select value from app_config where key = $1", OS_REGISTRY_CONFIG_KEY)
saved = _app_config_row_value_dict(row)
if not saved:
return {
"provider": "hub",
"registry_host": "",
"repository_path": get_settings().roleforge_os_docker_hub_repository.strip() or "inecs/roleforge",
"username": "",
"password_set": False,
}
return parse_stored_os_registry(saved)
@router.put("/admin/os-registry-config")
async def put_admin_os_registry_config(payload: dict, _: str = Depends(get_current_admin_user_id)) -> dict[str, str]:
if not isinstance(payload, dict):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Payload must be an object")
provider = str(payload.get("provider") or "hub").strip().lower()
if provider not in ("hub", "harbor", "nexus"):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid provider")
registry_host = str(payload.get("registry_host") or "").strip()
repository_path = str(payload.get("repository_path") or "").strip()
username = str(payload.get("username") or "").strip()
password_in = payload.get("password")
password_plain = str(password_in).strip() if password_in is not None else ""
if provider in ("harbor", "nexus"):
if not registry_host or not repository_path:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="registry_host and repository_path are required for Harbor and Nexus.",
)
if provider == "hub" and not repository_path:
repository_path = get_settings().roleforge_os_docker_hub_repository.strip() or "inecs/roleforge"
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow("select value from app_config where key = $1", OS_REGISTRY_CONFIG_KEY)
prev = _app_config_row_value_dict(row)
new_doc: dict[str, Any] = {
"provider": provider,
"registry_host": registry_host,
"repository_path": repository_path,
"username": username,
}
if password_plain:
new_doc["password_ciphertext"] = encrypt_registry_secret(password_plain)
elif isinstance(prev.get("password_ciphertext"), str) and prev["password_ciphertext"].strip():
new_doc["password_ciphertext"] = prev["password_ciphertext"]
await conn.execute(
"""
insert into app_config (key, value, updated_at)
values ($1, $2::jsonb, now())
on conflict (key)
do update set value=excluded.value, updated_at=now()
""",
OS_REGISTRY_CONFIG_KEY,
json.dumps(new_doc),
)
return {"status": "updated"}
@router.get("/admin/yaml-lint-config")
async def get_admin_yaml_lint_config(_: str = Depends(get_current_admin_user_id)) -> dict[str, Any]:
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow("select value from app_config where key = $1", "yamllint")
saved = _app_config_row_value_dict(row)
merged = merge_yamllint_saved(saved)
return {"rules": serialize_rules_for_api(merged), "meta": list(RULE_META)}
@router.put("/admin/yaml-lint-config")
async def put_admin_yaml_lint_config(payload: dict, _: str = Depends(get_current_admin_user_id)) -> dict[str, str]:
try:
normalized = validate_put_payload(payload)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
pool = get_pool()
async with pool.acquire() as conn:
await conn.execute(
"""
insert into app_config (key, value, updated_at)
values ('yamllint', $1::jsonb, now())
on conflict (key)
do update set value=excluded.value, updated_at=now()
""",
json.dumps(normalized),
)
invalidate_yamllint_config_cache()
await refresh_yamllint_config_cache()
return {"status": "updated"}
@router.get("/admin/json-lint-config")
async def get_admin_json_lint_config(_: str = Depends(get_current_admin_user_id)) -> dict[str, Any]:
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow("select value from app_config where key = $1", "jsonlint")
saved = _app_config_row_value_dict(row)
merged = merge_jsonlint_saved(saved)
return {"rules": serialize_jsonlint_rules_for_api(merged), "meta": list(JSONLINT_RULE_META)}
@router.put("/admin/json-lint-config")
async def put_admin_json_lint_config(payload: dict, _: str = Depends(get_current_admin_user_id)) -> dict[str, str]:
try:
normalized = validate_jsonlint_put_payload(payload)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
pool = get_pool()
async with pool.acquire() as conn:
await conn.execute(
"""
insert into app_config (key, value, updated_at)
values ('jsonlint', $1::jsonb, now())
on conflict (key)
do update set value=excluded.value, updated_at=now()
""",
json.dumps(normalized),
)
invalidate_jsonlint_config_cache()
await refresh_jsonlint_config_cache()
return {"status": "updated"}
@router.post("/inventories")
async def create_inventory(
payload: InventoryCreate, user_id: str = Depends(get_current_user_id)
) -> dict[str, str]:
pool = get_pool()
async with pool.acquire() as conn:
inventory_id = await conn.fetchval(
"""
insert into inventories (owner_id, name, inventory_text)
values ($1, $2, $3)
returning id::text
""",
user_id,
payload.name,
payload.inventory_text,
)
return {"id": inventory_id}
@router.post("/playbooks")
async def create_playbook(
payload: PlaybookCreate, user_id: str = Depends(get_current_user_id)
) -> dict[str, str]:
pool = get_pool()
async with pool.acquire() as conn:
inventory_owner = await conn.fetchval(
"select owner_id::text from inventories where id=$1::uuid",
payload.inventory_id,
)
if inventory_owner != user_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Inventory access denied")
playbook_id = await conn.fetchval(
"""
insert into playbooks (owner_id, name, description, playbook_yaml, inventory_id, role_ids, is_shared)
values ($1, $2, $3, $4, $5::uuid, $6::uuid[], $7)
returning id::text
""",
user_id,
payload.name,
payload.description,
payload.playbook_yaml,
payload.inventory_id,
payload.role_ids,
payload.is_shared,
)
return {"id": playbook_id}
@router.post("/playbooks/import-yaml")
async def import_playbook_yaml(
payload: PlaybookYamlImportRequest, user_id: str = Depends(get_current_user_id)
) -> dict[str, str]:
try:
parsed = yaml.safe_load(payload.playbook_yaml)
except yaml.YAMLError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid YAML") from exc
if not isinstance(parsed, list):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Playbook YAML must be a list of plays",
)
pool = get_pool()
async with pool.acquire() as conn:
inventory_owner = await conn.fetchval(
"select owner_id::text from inventories where id=$1::uuid",
payload.inventory_id,
)
if inventory_owner != user_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Inventory access denied")
playbook_id = await conn.fetchval(
"""
insert into playbooks (owner_id, name, description, playbook_yaml, inventory_id, role_ids, is_shared)
values ($1::uuid, $2, $3, $4, $5::uuid, $6::uuid[], $7)
returning id::text
""",
user_id,
payload.name,
payload.description,
payload.playbook_yaml,
payload.inventory_id,
payload.role_ids,
payload.is_shared,
)
return {"id": playbook_id}
@router.get("/playbooks/{playbook_id}/export-yaml")
async def export_playbook_yaml(playbook_id: str, user_id: str = Depends(get_current_user_id)) -> Response:
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
select owner_id::text as owner_id, is_shared, playbook_yaml
from playbooks
where id=$1::uuid
""",
playbook_id,
)
if not row:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Playbook not found")
if row["owner_id"] != user_id and not row["is_shared"]:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Playbook access denied")
return Response(content=row["playbook_yaml"], media_type="application/x-yaml")
@router.post("/jobs/launch")
async def launch_job(
payload: JobLaunchRequest, user_id: str = Depends(get_current_user_id)
) -> dict[str, str]:
pool = get_pool()
async with pool.acquire() as conn:
playbook = await conn.fetchrow(
"""
select p.id::text as id, p.owner_id::text as owner_id, p.is_shared,
p.playbook_yaml, i.inventory_text
from playbooks p
join inventories i on i.id = p.inventory_id
where p.id=$1::uuid
""",
payload.playbook_id,
)
if not playbook:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Playbook not found")
if playbook["owner_id"] != user_id and not playbook["is_shared"]:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Playbook access denied")
job_id = await conn.fetchval(
"""
insert into jobs (owner_id, playbook_id, status, extra_vars, runtime_mode)
values ($1::uuid, $2::uuid, 'queued', $3::jsonb, $4)
returning id::text
""",
user_id,
payload.playbook_id,
json.dumps(payload.extra_vars),
payload.runtime_mode,
)
queue_name = f"job.{job_id}"
try:
worker_container_id, worker_container_name, runner_url = await launch_ephemeral_worker(
queue_name,
payload.runtime_mode,
)
except RuntimeError as exc:
async with pool.acquire() as conn:
await conn.execute("delete from jobs where id=$1::uuid", job_id)
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=str(exc)) from exc
runner_run_id = await create_playbook_run(
runner_url,
playbook_yaml=playbook["playbook_yaml"],
inventory_text=playbook["inventory_text"],
extra_vars=payload.extra_vars,
)
async with pool.acquire() as conn:
await conn.execute(
"""
update jobs
set status='running',
started_at=now(),
runner_url=$1,
runner_run_id=$2,
runner_container_name=$3,
runner_last_heartbeat=now()
where id=$4::uuid
""",
runner_url,
runner_run_id,
worker_container_name,
job_id,
)
asyncio.create_task(
_monitor_job(
job_id,
runner_url,
runner_run_id,
worker_container_name,
payload.runtime_mode,
)
)
return {
"job_id": job_id,
"runner_run_id": runner_run_id,
"worker_container_id": worker_container_id,
}
@router.post("/tests/launch")
async def launch_test(
payload: TestLaunchRequest, user_id: str = Depends(get_current_user_id)
) -> dict[str, str]:
if not payload.playbook_id and not payload.role_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Either playbook_id or role_id must be provided",
)
pool = get_pool()
async with pool.acquire() as conn:
if payload.playbook_id:
playbook = await conn.fetchrow(
"select owner_id::text as owner_id, is_shared from playbooks where id=$1::uuid",
payload.playbook_id,
)
if not playbook:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Playbook not found")
if playbook["owner_id"] != user_id and not playbook["is_shared"]:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Playbook access denied")
if payload.role_id:
role = await conn.fetchrow(
"select owner_id::text as owner_id, coalesce(visibility, 'public') as visibility from ansible_roles where id=$1::uuid",
payload.role_id,
)
if not role:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
if role["owner_id"] != user_id and role["visibility"] != "public":
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Role access denied")
test_id = await conn.fetchval(
"""
insert into test_runs (owner_id, playbook_id, role_id, status, runtime_mode, hosts_blueprint, extra_vars)
values ($1::uuid, $2::uuid, $3::uuid, 'queued', $4, $5::jsonb, $6::jsonb)
returning id::text
""",
user_id,
payload.playbook_id,
payload.role_id,
payload.runtime_mode,
json.dumps([host.model_dump() for host in payload.hosts]),
json.dumps(payload.extra_vars),
)
queue_name = f"test.{test_id}"
try:
worker_container_id, worker_container_name, runner_url = await launch_ephemeral_worker(
queue_name,
payload.runtime_mode,
)
except RuntimeError as exc:
async with pool.acquire() as conn:
await conn.execute("delete from test_runs where id=$1::uuid", test_id)
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=str(exc)) from exc
if payload.playbook_id:
async with pool.acquire() as conn:
row = await conn.fetchrow(
"select playbook_yaml from playbooks where id=$1::uuid",
payload.playbook_id,
)
runner_run_id = await create_molecule_playbook_run(
runner_url,
playbook_yaml=row["playbook_yaml"],
hosts=[host.model_dump() for host in payload.hosts],
extra_vars=payload.extra_vars,
)
else:
async with pool.acquire() as conn:
row = await conn.fetchrow(
"select name, content from ansible_roles where id=$1::uuid",
payload.role_id,
)
file_tasks = await conn.fetchval(
"select content from role_files where role_id=$1::uuid and path='tasks/main.yml'",
payload.role_id,
)
role_content = _parse_json_object(row["content"])
runner_run_id = await create_molecule_role_run(
runner_url,
role_name=row["name"],
role_tasks_yaml=str(
file_tasks
or role_content.get("tasks_yaml")
or "- name: ping\n ansible.builtin.ping:\n"
),
hosts=[host.model_dump() for host in payload.hosts],
extra_vars=payload.extra_vars,
)
async with pool.acquire() as conn:
await conn.execute(
"""
update test_runs
set status='running',
started_at=now(),
runner_url=$1,
runner_run_id=$2,
runner_container_name=$3,
runner_last_heartbeat=now()
where id=$4::uuid
""",
runner_url,
runner_run_id,
worker_container_name,
test_id,
)
asyncio.create_task(
_monitor_test(
test_id,
runner_url,
runner_run_id,
worker_container_name,
payload.runtime_mode,
)
)
return {
"test_id": test_id,
"runner_run_id": runner_run_id,
"worker_container_id": worker_container_id,
}
@router.get("/tests/os-options")
async def list_test_os_options(_: str = Depends(get_current_user_id)) -> dict[str, list[dict[str, str]]]:
return {"items": _scan_os_options()}
@router.get("/admin/os-images")
async def list_admin_os_images(_: str = Depends(get_current_admin_user_id)) -> dict[str, Any]:
s = get_settings()
hub_repo = (s.roleforge_os_docker_hub_repository or "").strip()
reg_env = _normalize_os_registry_prefix(s.roleforge_os_image_registry or "")
push_any = bool(hub_repo or reg_env)
return {
"items": _scan_os_options(),
"os_image_registry": _os_image_push_display(),
"push_requires_docker_login": push_any,
"registry_defaults": {
"provider": "hub",
"repository_path": hub_repo,
"registry_host": "",
},
"registry_help_urls": {
"hub": "https://hub.docker.com/",
"harbor": "https://goharbor.io/docs/",
"nexus": "https://help.sonatype.com/repomanager3/formats/docker-registry/docker-registry-repositories",
},
}
@router.post("/admin/os-images/build-all")
async def build_all_admin_os_images(
body: dict[str, Any] = Body(default_factory=dict),
_: str = Depends(get_current_admin_user_id),
) -> dict[str, Any]:
rp = _registry_payload_slice(body)
cfg_head = _resolve_os_push_config(rp)
_require_registry_credentials(cfg_head.kind, body.get("docker_login_username"), body.get("docker_login_password"))
du = str(body.get("docker_login_username") or "").strip()
dp = str(body.get("docker_login_password") or "")
no_cache = _request_bool_flag(body)
options = _scan_os_options()
if not options:
return {"status": "noop", "items": [], "logs": ["No dockerfiles found."]}
logs: list[str] = []
results: list[dict[str, str]] = []
for opt in options:
one = await _build_os_image(
dockerfile_rel=str(opt["dockerfile"]),
image_tag=str(opt["image"]),
platform=str(opt.get("platform") or "").strip(),
build_platforms=str(opt.get("build_platforms") or "").strip(),
docker_login_username=du,
docker_login_password=dp,
registry_payload=rp,
no_cache=no_cache,
)
logs.extend(one.get("logs") or [])
results.append({k: v for k, v in one.items() if k != "logs"})
if one.get("status") != "success":
return {"status": "failed", "items": results, "logs": logs}
return {"status": "success", "items": results, "logs": logs}
@router.post("/admin/os-images/build-one")
async def build_one_admin_os_image(
payload: dict[str, Any],
_: str = Depends(get_current_admin_user_id),
) -> dict[str, Any]:
dockerfile_rel = str(payload.get("dockerfile") or "").strip()
image_tag = str(payload.get("image") or "").strip()
platform = str(payload.get("platform") or "").strip()
build_platforms = str(payload.get("build_platforms") or "").strip()
if dockerfile_rel and (not platform or not build_platforms):
resolved = _resolve_os_option_by_dockerfile(dockerfile_rel)
if resolved:
if not platform:
platform = str(resolved.get("platform") or "").strip()
if not build_platforms:
build_platforms = str(resolved.get("build_platforms") or "").strip()
if not platform or not build_platforms:
inf_platform, inf_matrix = _infer_platform_defaults(dockerfile_rel)
if not platform:
platform = inf_platform
if not build_platforms:
build_platforms = inf_matrix
if not dockerfile_rel or not image_tag:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="dockerfile and image are required")
rp = _registry_payload_slice(payload)
cfg_one = _resolve_os_push_config(rp)
_require_registry_credentials(cfg_one.kind, payload.get("docker_login_username"), payload.get("docker_login_password"))
return await _build_os_image(
dockerfile_rel=dockerfile_rel,
image_tag=image_tag,
platform=platform,
build_platforms=build_platforms,
docker_login_username=str(payload.get("docker_login_username") or "").strip(),
docker_login_password=str(payload.get("docker_login_password") or ""),
registry_payload=rp,
no_cache=_request_bool_flag(payload),
)
@router.post("/admin/os-images/build-one-stream")
async def build_one_admin_os_image_stream(
payload: dict[str, Any],
_: str = Depends(get_current_admin_user_id),
) -> StreamingResponse:
dockerfile_rel = str(payload.get("dockerfile") or "").strip()
image_tag = str(payload.get("image") or "").strip()
platform = str(payload.get("platform") or "").strip()
build_platforms = [p.strip() for p in str(payload.get("build_platforms") or "").split(",") if p.strip()]
if dockerfile_rel and (not platform or not build_platforms):
resolved = _resolve_os_option_by_dockerfile(dockerfile_rel)
if resolved:
if not platform:
platform = str(resolved.get("platform") or "").strip()
if not build_platforms:
build_platforms = [
p.strip() for p in str(resolved.get("build_platforms") or "").split(",") if p.strip()
]
if not platform or not build_platforms:
inf_platform, inf_matrix = _infer_platform_defaults(dockerfile_rel)
if not platform:
platform = inf_platform
if not build_platforms:
build_platforms = [p.strip() for p in inf_matrix.split(",") if p.strip()]
if not dockerfile_rel or not image_tag:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="dockerfile and image are required")
try:
rp = _registry_payload_slice(payload)
cfg = _resolve_os_push_config(rp)
except HTTPException as exc:
async def _err_stream() -> AsyncIterator[str]:
yield f"[roleforge] {exc.detail}\n"
yield "__ROLEFORGE_BUILD_STATUS__:failed:400\n"
return StreamingResponse(_err_stream(), media_type="text/plain; charset=utf-8")
repo_root = str(DOCKERFILES_ROOT.parent)
dockerfile_abs = str(DOCKERFILES_ROOT / dockerfile_rel)
docker_user = str(payload.get("docker_login_username") or "").strip()
docker_pass = str(payload.get("docker_login_password") or "")
no_cache = _request_bool_flag(payload)
async def _stream() -> Any:
proc: asyncio.subprocess.Process | None = None
try:
buildx_mode, buildx_note = await _ensure_docker_buildx_mode()
yield f"[roleforge] {buildx_note}\n"
if buildx_mode == "none":
yield "__ROLEFORGE_BUILD_STATUS__:failed:127\n"
return
builder_ok, builder_note = await _ensure_buildx_builder()
yield f"[roleforge] {builder_note}\n"
if not builder_ok:
yield "__ROLEFORGE_BUILD_STATUS__:failed:127\n"
return
if no_cache:
yield "[roleforge] Docker BuildKit cache disabled for this run (--no-cache).\n"
push_kind, push_target = cfg.kind, cfg.target
legacy_layout = cfg.legacy_full_image
login_server = cfg.docker_login_server
if push_kind == "hub":
yield f"[roleforge] Docker Hub repository: docker.io/{push_target}\n"
if not docker_user or not docker_pass:
yield "[roleforge] docker_login_username and docker_login_password are required.\n"
yield "__ROLEFORGE_BUILD_STATUS__:failed:3\n"
return
yield f"$ docker login --username {docker_user!r} --password-stdin (Docker Hub)\n"
lg_code, lg_out = await _docker_login_stdin_password(
username=docker_user,
password=docker_pass,
registry_server=None,
)
for lg_line in lg_out.decode(errors="replace").splitlines()[-40:]:
yield f"{lg_line}\n"
if lg_code != 0:
yield f"__ROLEFORGE_BUILD_STATUS__:failed:{lg_code}\n"
return
elif push_kind == "registry":
yield (
f"[roleforge] Registry push target: {push_target}"
+ (" (legacy prefix layout)" if legacy_layout else "")
+ "\n"
)
if not docker_user or not docker_pass:
yield "[roleforge] docker_login_username and docker_login_password are required.\n"
yield "__ROLEFORGE_BUILD_STATUS__:failed:3\n"
return
srv = login_server or push_target.split("/")[0]
yield f"$ docker login --username {docker_user!r} --password-stdin {srv}\n"
lg_code, lg_out = await _docker_login_stdin_password(
username=docker_user,
password=docker_pass,
registry_server=srv,
)
for lg_line in lg_out.decode(errors="replace").splitlines()[-40:]:
yield f"{lg_line}\n"
if lg_code != 0:
yield f"__ROLEFORGE_BUILD_STATUS__:failed:{lg_code}\n"
return
matrix_csv = ",".join(build_platforms) if build_platforms else ""
cmds, plan_err = _os_build_command_plan(
dockerfile_abs=dockerfile_abs,
image_tag=image_tag,
platform=platform,
build_platforms_csv=matrix_csv,
repo_root=repo_root,
push_kind=push_kind,
push_target=push_target,
legacy_full_image=legacy_layout,
no_cache=no_cache,
)
if plan_err:
yield f"[roleforge] {plan_err}\n"
yield "__ROLEFORGE_BUILD_STATUS__:failed:2\n"
return
exit_code = 0
for step_cmd in cmds:
max_attempts = _OS_BUILDX_PUSH_RETRIES if _should_retry_os_registry_cmd(step_cmd) else 1
for attempt in range(max_attempts):
if attempt:
yield (
f"[roleforge] buildx push retry {attempt + 1}/{max_attempts} "
f"after {_OS_BUILDX_PUSH_RETRY_DELAY_SEC}s…\n"
)
await asyncio.sleep(_OS_BUILDX_PUSH_RETRY_DELAY_SEC)
yield f"$ {' '.join(step_cmd)}\n"
proc = await asyncio.create_subprocess_exec(
*step_cmd,
cwd=repo_root,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
buf: list[str] = []
async for text_chunk in _stream_process_stdout_chunks(proc):
buf.append(text_chunk)
yield text_chunk
exit_code = await proc.wait()
if exit_code == 0:
break
blob = "".join(buf)[-12000:]
if attempt < max_attempts - 1 and _should_retry_buildx_push_output(blob):
continue
yield f"__ROLEFORGE_BUILD_STATUS__:failed:{exit_code}\n"
return
if push_kind in ("hub", "registry") and push_target.strip():
mp = _manual_pull_and_tag_shell_commands(
push_kind,
push_target,
image_tag,
legacy_full_image=legacy_layout,
)
yield f"__ROLEFORGE_MANUAL_PULL__:{json.dumps({'commands': mp}, ensure_ascii=False)}\n"
yield f"__ROLEFORGE_BUILD_STATUS__:success:{exit_code}\n"
except asyncio.CancelledError:
asyncio.create_task(_reap_subprocess_after_stream_disconnect(proc))
raise
return StreamingResponse(_stream(), media_type="text/plain; charset=utf-8")
@router.get("/tests/{test_id}")
async def get_test_status(test_id: str, user_id: str = Depends(get_current_user_id)) -> dict[str, str]:
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
select id::text as id, owner_id::text as owner_id, status, created_at, started_at, finished_at
from test_runs
where id=$1::uuid
""",
test_id,
)
if not row:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Test run not found")
if row["owner_id"] != user_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Test run access denied")
return {
"id": row["id"],
"status": row["status"],
"created_at": row["created_at"].isoformat() if row["created_at"] else "",
"started_at": row["started_at"].isoformat() if row["started_at"] else "",
"finished_at": row["finished_at"].isoformat() if row["finished_at"] else "",
}
@router.post("/tests/{test_id}/stop")
async def stop_test(test_id: str, user_id: str = Depends(get_current_user_id)) -> dict[str, str]:
"""Terminate the ephemeral runner for this test and mark the run as failed (user-initiated stop)."""
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
select owner_id::text as owner_id, runner_container_name, runtime_mode, status
from test_runs
where id=$1::uuid
""",
test_id,
)
if not row:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Test run not found")
if row["owner_id"] != user_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Test run access denied")
status_val = str(row["status"] or "")
if status_val in ("success", "failed"):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Test run already finished")
runner_name = row["runner_container_name"]
runtime_mode = str(row["runtime_mode"] or "docker")
if not runner_name:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="No runner container for this test")
await terminate_ephemeral_worker(runner_name, runtime_mode)
async with pool.acquire() as conn:
await conn.execute(
"""
insert into test_logs (test_run_id, log_line)
values ($1::uuid, $2)
""",
test_id,
"[roleforge] Test stopped by user (runner container terminated).",
)
await conn.execute(
"""
update test_runs
set status='failed', finished_at=now()
where id=$1::uuid
and status not in ('success', 'failed')
""",
test_id,
)
return {"status": "stopped", "test_id": test_id}
@router.get("/tests/{test_id}/logs")
async def get_test_logs(
test_id: str,
after_id: int = 0,
limit: int = 200,
user_id: str = Depends(get_current_user_id),
) -> dict[str, Any]:
"""Return log lines after `after_id` (test_logs.id), chronological order — for live polling."""
safe_limit = max(1, min(limit, 5000))
safe_after = max(0, after_id)
pool = get_pool()
async with pool.acquire() as conn:
owner_id = await conn.fetchval("select owner_id::text from test_runs where id=$1::uuid", test_id)
if not owner_id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Test run not found")
if owner_id != user_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Test run access denied")
rows = await conn.fetch(
"""
select id, log_line
from test_logs
where test_run_id=$1::uuid and id > $2::bigint
order by id asc
limit $3
""",
test_id,
safe_after,
safe_limit,
)
max_log_id = await conn.fetchval(
"select coalesce(max(id), 0)::bigint from test_logs where test_run_id=$1::uuid",
test_id,
)
items = [{"id": int(row["id"]), "line": row["log_line"]} for row in rows]
next_after = int(rows[-1]["id"]) if rows else safe_after
return {
"items": items,
"next_after_id": next_after,
"max_log_id": int(max_log_id or 0),
}
@router.get("/runners/active")
async def active_runners(_: str = Depends(get_current_user_id)) -> dict[str, list[dict[str, str]]]:
return {"items": await list_active_runners()}
@router.post("/runners/{runner_name}/stop")
async def stop_runner(
runner_name: str,
payload: RunnerStopRequest,
_: str = Depends(get_current_user_id),
) -> dict[str, str]:
await terminate_ephemeral_worker(runner_name, payload.runtime_mode)
return {"status": "stopped", "runner_name": runner_name, "runtime_mode": payload.runtime_mode}