Files
RoleForge/app/routers/domain.py
Sergey Antropoff 9727ff6402 Molecule и Docker-тесты: vendored create playbook и явная платформа образа
- Добавлен molecule docker create playbook (create.yml + tasks/create_network.yml)
  с правкой tmpfs: словарь из molecule-plugins приводится к списку строк для
  community.docker.docker_container; сценарии копируют playbook и задают
  provisioner.playbooks.create.
- Для systemd-платформ tmpfs задаётся списком строк вместо mounts.
- В опциях ОС — run_platform (каноническая архитектура после build); в
  TestHostSpec и hosts теста передаётся platform в molecule/docker_container,
  чтобы на ARM не падал /sbin/init из-за amd64 без --platform.
- Страницы роли (просмотр и создание): одна dashboard-карточка на всю ширину,
  вкладки Role details / Role file catalog в
2026-05-05 08:56:54 +03:00

2482 lines
93 KiB
Python

import asyncio
import base64
import io
import json
import re
from uuid import UUID
import subprocess
import time
import zipfile
from pathlib import Path
from urllib.parse import quote
import yaml
from typing import Any, Literal
from fastapi import APIRouter, Depends, File, HTTPException, Query, Response, UploadFile, status
from fastapi.responses import StreamingResponse
from yamllint import linter as yaml_linter
from yamllint.config import YamlLintConfig
from app.deps.auth import get_current_admin_user_id, get_current_user_id
from app.db.pool import get_pool
from app.schemas.domain import (
AddRoleToPlaybookRequest,
InventoryCreate,
JobLaunchRequest,
LintRoleFileError,
LintRoleFileRequest,
LintRoleFileResponse,
PlaybookCreate,
PlaybookYamlImportRequest,
RoleCategoryCreate,
RoleCreate,
RoleFilePayload,
RoleForkRequest,
RoleFilesReplaceRequest,
RoleImportItem,
RoleImportResult,
RoleUpdate,
RoleYamlImportRequest,
RunnerStopRequest,
TestLaunchRequest,
)
from app.core.config import get_settings
from app.services.dynamic_worker import launch_ephemeral_worker, list_active_runners, terminate_ephemeral_worker
from app.services.jsonlint_rules import (
JSONLINT_RULE_META,
merge_jsonlint_saved,
run_json_lint,
serialize_rules_for_api as serialize_jsonlint_rules_for_api,
validate_jsonlint_put_payload,
)
from app.services.jsonlint_runtime import (
get_jsonlint_config_for_lint,
invalidate_jsonlint_config_cache,
refresh_jsonlint_config_cache,
)
from app.services.yamllint_rules import RULE_META, merge_yamllint_saved, serialize_rules_for_api, validate_put_payload
from app.services.yamllint_runtime import get_yamllint_config_for_lint, invalidate_yamllint_config_cache, refresh_yamllint_config_cache
from app.services.runner_client import (
create_molecule_playbook_run,
create_molecule_role_run,
create_playbook_run,
get_run_status,
)
router = APIRouter(tags=["domain"])
# Temporary debug switch: keep ephemeral test runner alive after run completion.
# Flip back to True when you want normal auto-cleanup again.
AUTO_TERMINATE_TEST_RUNNER = True
DOCKERFILES_ROOT = Path(__file__).resolve().parents[2] / "dockerfiles"
BUILDX_BUILDER_NAME = "roleforge-builder"
def _run_platform_for_os_image(platform: str, build_platforms: str) -> str:
"""Architectural platform for `docker run --platform` / molecule docker_container.platform.
Matches the canonical single-arch tag produced by build-all (first matrix entry when build allows multi-arch).
Without this on Apple Silicon, an amd64-only layer triggers exec format error on /sbin/init.
"""
p = str(platform or "").strip()
if p:
return p
parts = [x.strip() for x in str(build_platforms or "").split(",") if x.strip()]
return parts[0] if parts else "linux/amd64"
def _scan_os_options() -> list[dict[str, str]]:
items: list[dict[str, str]] = []
root = DOCKERFILES_ROOT
if not root.exists():
return items
for path in sorted(root.rglob("Dockerfile*")):
rel = path.relative_to(root)
os_key = rel.parent.name
variant = path.name.replace("Dockerfile", "").strip(".")
suffix = f" ({variant})" if variant else ""
image_tag = f"roleforge-os:{os_key}{('-' + variant) if variant else ''}"
if variant == "arm64":
platform = "linux/arm64"
build_platforms = "linux/arm64"
elif os_key in {"astra-linux", "redos", "centos7", "centos8"}:
# Vendor base images are amd64-only unless we explicitly use *.arm64 Dockerfiles.
platform = "linux/amd64"
build_platforms = "linux/amd64"
else:
platform = ""
build_platforms = "linux/amd64,linux/arm64"
run_platform = _run_platform_for_os_image(platform, build_platforms)
items.append(
{
"id": f"{os_key}:{variant or 'default'}",
"name": f"{os_key}{suffix}",
"dockerfile": str(rel),
"image": image_tag,
"command": "/sbin/init",
"systemd": "true",
"platform": platform,
"build_platforms": build_platforms,
"run_platform": run_platform,
}
)
return items
def _resolve_os_option_by_dockerfile(dockerfile_rel: str) -> dict[str, str] | None:
needle = str(dockerfile_rel or "").strip().replace("\\", "/")
if not needle:
return None
for item in _scan_os_options():
candidate = str(item.get("dockerfile") or "").replace("\\", "/")
if candidate == needle or candidate.endswith(needle) or needle.endswith(candidate):
return item
return None
def _infer_platform_defaults(dockerfile_rel: str) -> tuple[str, str]:
s = str(dockerfile_rel or "").replace("\\", "/").lower()
if "dockerfile.arm64" in s:
return ("linux/arm64", "linux/arm64")
if "/astra-linux/" in s or "/redos/" in s or "/centos7/" in s or "/centos8/" in s:
return ("linux/amd64", "linux/amd64")
return ("", "linux/amd64,linux/arm64")
async def _ensure_docker_buildx_mode() -> tuple[str, str]:
check = await asyncio.to_thread(
subprocess.run,
["docker", "buildx", "version"],
capture_output=True,
text=True,
)
if check.returncode == 0:
return "local", "docker buildx is available"
install_attempts = [
"apt-get update && apt-get install -y docker-buildx-plugin",
"apt-get update && apt-get install -y docker-buildx",
]
for cmd in install_attempts:
installed = await asyncio.to_thread(
subprocess.run,
["sh", "-lc", cmd],
capture_output=True,
text=True,
)
if installed.returncode == 0:
recheck = await asyncio.to_thread(
subprocess.run,
["docker", "buildx", "version"],
capture_output=True,
text=True,
)
if recheck.returncode == 0:
return "local", "docker buildx installed via package manager"
msg = (check.stderr or check.stdout or "").strip() or "unknown error"
return "none", f"docker buildx unavailable and auto-install failed: {msg}"
async def _ensure_buildx_builder() -> tuple[bool, str]:
inspect = await asyncio.to_thread(
subprocess.run,
["docker", "buildx", "inspect", BUILDX_BUILDER_NAME],
capture_output=True,
text=True,
)
if inspect.returncode != 0:
created = await asyncio.to_thread(
subprocess.run,
["docker", "buildx", "create", "--name", BUILDX_BUILDER_NAME, "--driver", "docker-container"],
capture_output=True,
text=True,
)
if created.returncode != 0:
err = (created.stderr or created.stdout or "").strip() or "unknown error"
return False, f"failed to create buildx builder '{BUILDX_BUILDER_NAME}': {err}"
boot = await asyncio.to_thread(
subprocess.run,
["docker", "buildx", "inspect", BUILDX_BUILDER_NAME, "--bootstrap"],
capture_output=True,
text=True,
)
if boot.returncode != 0:
err = (boot.stderr or boot.stdout or "").strip() or "unknown error"
return False, f"failed to bootstrap buildx builder '{BUILDX_BUILDER_NAME}': {err}"
return True, f"buildx builder '{BUILDX_BUILDER_NAME}' is ready"
async def _build_os_image(
dockerfile_rel: str,
image_tag: str,
platform: str,
build_platforms: str = "",
) -> dict[str, Any]:
logs: list[str] = []
buildx_mode, buildx_note = await _ensure_docker_buildx_mode()
logs.append(f"[roleforge] {buildx_note}")
if buildx_mode == "none":
return {
"dockerfile": dockerfile_rel,
"image": image_tag,
"platform": platform,
"status": "failed",
"exit_code": "127",
"logs": logs,
}
builder_ok, builder_note = await _ensure_buildx_builder()
logs.append(f"[roleforge] {builder_note}")
if not builder_ok:
return {
"dockerfile": dockerfile_rel,
"image": image_tag,
"platform": platform,
"status": "failed",
"exit_code": "127",
"logs": logs,
}
repo_root = str(DOCKERFILES_ROOT.parent)
dockerfile_abs = str(DOCKERFILES_ROOT / dockerfile_rel)
matrix = [p.strip() for p in str(build_platforms or "").split(",") if p.strip()]
if not matrix and platform:
matrix = [platform]
async def run_one(cmd: list[str]) -> int:
logs.append(f"$ {' '.join(cmd)}")
proc = await asyncio.to_thread(
subprocess.run,
cmd,
capture_output=True,
text=True,
cwd=repo_root,
)
out = (proc.stdout or "").splitlines()
err = (proc.stderr or "").splitlines()
if out:
logs.extend(out[-200:])
if err:
logs.extend(err[-200:])
return int(proc.returncode)
def buildx_cmd(tag: str, platform_value: str) -> list[str]:
core = ["buildx", "build", "--builder", BUILDX_BUILDER_NAME, "-f", dockerfile_abs, "-t", tag]
if platform_value:
core.extend(["--platform", platform_value])
# We build for local usage in RoleForge, so load the result into local Docker engine.
core.extend(["--load", repo_root])
return ["docker", *core]
if matrix:
for p in matrix:
arch = p.split("/")[-1]
tag = f"{image_tag}-{arch}"
code = await run_one(buildx_cmd(tag, p))
if code != 0:
return {
"dockerfile": dockerfile_rel,
"image": image_tag,
"platform": platform,
"status": "failed",
"exit_code": str(code),
"logs": logs,
}
# Keep canonical tag; always pin platform to avoid host-arch ambiguity on ARM hosts.
effective_platform = (platform or "").strip() or (matrix[0] if matrix else "")
final_cmd = buildx_cmd(image_tag, effective_platform)
code = await run_one(final_cmd)
if code != 0:
return {
"dockerfile": dockerfile_rel,
"image": image_tag,
"platform": platform,
"status": "failed",
"exit_code": str(code),
"logs": logs,
}
else:
code = await run_one(buildx_cmd(image_tag, (platform or "").strip()))
if code != 0:
return {
"dockerfile": dockerfile_rel,
"image": image_tag,
"platform": platform,
"status": "failed",
"exit_code": str(code),
"logs": logs,
}
return {
"dockerfile": dockerfile_rel,
"image": image_tag,
"platform": platform,
"status": "success",
"exit_code": "0",
"logs": logs,
}
def _app_config_row_value_dict(row: Any) -> dict[str, Any]:
"""Decode `app_config.value` (JSONB) to a dict. Handles asyncpg objects and JSON stored as str."""
if not row:
return {}
try:
raw = row["value"]
except (KeyError, TypeError):
return {}
if raw is None:
return {}
if isinstance(raw, dict):
return dict(raw)
if isinstance(raw, str):
try:
parsed = json.loads(raw)
except (json.JSONDecodeError, TypeError, ValueError):
return {}
return dict(parsed) if isinstance(parsed, dict) else {}
return {}
# Binary uploads store body as this prefix + standard base64 (see pages-main.js ROLEFORGE_B64_PREFIX).
ROLEFORGE_B64_PREFIX = "ROLEFORGE_B64:"
LEGACY_FILES_UPLOAD_PREFIX = "files/_roleforge_uploads/"
def _role_file_body_zip_bytes(path: str, body: str) -> bytes:
"""Bytes for role archive: UTF-8 text, or base64-decoded legacy / prefixed binary."""
s = body if isinstance(body, str) else str(body)
if s.startswith(ROLEFORGE_B64_PREFIX):
raw_b64 = "".join(s[len(ROLEFORGE_B64_PREFIX) :].split())
try:
return base64.standard_b64decode(raw_b64, validate=False)
except Exception: # noqa: BLE001
return s.encode("utf-8")
if str(path).startswith(LEGACY_FILES_UPLOAD_PREFIX):
raw_b64 = "".join(s.split())
try:
return base64.standard_b64decode(raw_b64, validate=False)
except Exception: # noqa: BLE001
return s.encode("utf-8")
return s.encode("utf-8")
DEFAULT_ROLE_CATEGORIES = (
"Base OS Hardening",
"User and SSH Access",
"Package Management",
"System Services",
"Time and NTP",
"Logging and Audit",
"Monitoring and Observability",
"Backup and Restore",
"Disaster Recovery",
"Network Baseline",
"DNS and DHCP",
"PKI and Certificates",
"Secrets and Vault",
"Firewall and Security",
"Identity and SSO",
"Container Runtime",
"Kubernetes Bootstrap",
"Kubernetes Add-ons",
"Ingress and Load Balancing",
"Databases",
"Message Brokers",
"Caching and KV Stores",
"Web Servers",
"Reverse Proxy",
"CI/CD and Build Agents",
"Artifact Registry",
"Infrastructure Provisioning",
"Cloud Integrations",
"Storage and Filesystems",
"NFS and SMB",
"Virtualization",
"High Availability",
"Performance Tuning",
"Compliance and Benchmarks",
"Middleware and App Runtime",
"Developer Tooling",
"Automation Utilities",
"Molecule Tests",
"Migration and Upgrade",
"Custom Business Logic",
)
ALLOWED_ROLE_ROOT_DIRS = {
"tasks",
"handlers",
"templates",
"files",
"vars",
"defaults",
"meta",
"library",
"plugins",
"module_utils",
"lookup_plugins",
"filter_plugins",
"tests",
}
# New roles get these files only — no tests/ tree (Molecule uses its own layout on the runner).
DEFAULT_ROLE_SCAFFOLD_FILES = (
("tasks/main.yml", ""),
("handlers/main.yml", ""),
("defaults/main.yml", ""),
("vars/main.yml", ""),
("meta/main.yml", ""),
)
def _parse_json_object(value) -> dict:
if isinstance(value, dict):
return value
if isinstance(value, str):
try:
parsed = json.loads(value)
except Exception: # noqa: BLE001
return {}
return parsed if isinstance(parsed, dict) else {}
return {}
def _safe_role_archive_dir_name(name: str) -> str:
"""Filesystem-safe single segment for zip root (Ansible role folder name)."""
base = str(name or "").strip().lower() or "role"
base = re.sub(r"[^a-z0-9_.-]+", "_", base).strip("._-") or "role"
return base[:120]
def _sanitize_ansible_role_name(raw: str) -> str | None:
"""
Ansible-friendly role name: lowercase; spaces and any character outside [a-z0-9_] become '_'.
Collapses repeated underscores; trims leading/trailing underscores.
Returns None if nothing usable remains (caller should reject empty input).
"""
s = str(raw or "").strip().lower()
s = re.sub(r"[^a-z0-9_]+", "_", s)
s = re.sub(r"_+", "_", s).strip("_")
if not s:
return None
return s[:120]
ROLE_VISIBILITY_VALUES = frozenset({"public", "team", "personal"})
def _normalize_role_visibility(raw: str | None, default: str = "public") -> str:
v = str(raw if raw is not None else default).strip().lower() or default
if v not in ROLE_VISIBILITY_VALUES:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid role visibility")
return v
ROLE_OS_FAMILY_VALUES = frozenset({"rhel", "debian", "alpine", "suse", "arch", "bsd", "windows", "universal"})
def _normalize_role_tags(raw: object | None, *, max_tags: int = 32, max_len_each: int = 48) -> list[str]:
if raw is None:
return []
parts: list[str]
if isinstance(raw, str):
parts = [p.strip() for p in re.split(r"[,;]+", raw) if p.strip()]
elif isinstance(raw, (list, tuple)):
parts = [str(p).strip() for p in raw if str(p).strip()]
else:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="tags must be a list or comma-separated text")
out: list[str] = []
seen: set[str] = set()
for p in parts:
if len(p) > max_len_each:
p = p[:max_len_each].rstrip()
key = p.casefold()
if key in seen:
continue
seen.add(key)
out.append(p)
if len(out) >= max_tags:
break
return out
def _normalize_os_families(raw: object | None) -> list[str]:
if raw is None:
return []
if not isinstance(raw, (list, tuple)):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="os_families must be a list")
out: list[str] = []
seen: set[str] = set()
for item in raw:
slug = str(item).strip().lower()
if slug not in ROLE_OS_FAMILY_VALUES:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid os_family {item!r}; allowed: {', '.join(sorted(ROLE_OS_FAMILY_VALUES))}",
)
if slug in seen:
continue
seen.add(slug)
out.append(slug)
return out
async def _allocate_unique_role_name(conn, user_id: str, base: str) -> str:
base_s = _sanitize_ansible_role_name(base) or "role"
for i in range(80):
cand = base_s if i == 0 else f"{base_s}_copy{i}"[:120]
exists = await conn.fetchval(
"select 1 from ansible_roles where owner_id=$1::uuid and name=$2",
user_id,
cand,
)
if not exists:
return cand
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Could not allocate a unique role name")
async def _fork_role_copy(
conn,
source_role_id: str,
user_id: str,
target_visibility: str,
target_name: str | None = None,
*,
source_type: str | None = None,
source_ref: str | None = None,
category_id: str | None = None,
description: str | None = None,
) -> str:
row = await conn.fetchrow(
"""
select
name,
source_type,
source_ref,
category_id,
content,
coalesce(role_tags, array[]::text[]) as role_tags,
coalesce(os_families, array[]::text[]) as os_families
from ansible_roles where id=$1::uuid
""",
source_role_id,
)
if not row:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
orig_name_s = _sanitize_ansible_role_name(str(row["name"])) or ""
if target_name is not None:
unique_name = _sanitize_ansible_role_name(target_name)
if not unique_name:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Role name is required")
if unique_name == orig_name_s:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Fork name must differ from the original role name",
)
taken = await conn.fetchval(
"select 1 from ansible_roles where owner_id=$1::uuid and name=$2",
user_id,
unique_name,
)
if taken:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="You already have a role with this name"
)
else:
unique_name = await _allocate_unique_role_name(conn, user_id, str(row["name"]))
content_obj = row["content"]
if not isinstance(content_obj, dict):
content_obj = _parse_json_object(content_obj)
content_obj = dict(content_obj)
if target_name is None:
st = str(row["source_type"] or "inline")
sr = "fork"
cat = row["category_id"]
else:
content_obj["description"] = str(description if description is not None else "")
st = str(source_type or "inline").strip()
sr = str(source_ref if source_ref is not None else "")
cat = category_id
if cat:
cok = await conn.fetchval("select 1 from role_categories where id=$1::uuid", cat)
if not cok:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role category not found")
tag_copy = [str(x) for x in (row["role_tags"] or [])]
os_copy = [str(x) for x in (row["os_families"] or [])]
new_id = await conn.fetchval(
"""
insert into ansible_roles (
owner_id, name, source_type, source_ref, category_id, content, visibility,
forked_from_id, team_id, role_tags, os_families
)
values ($1::uuid, $2, $3, $4, $5::uuid, $6::jsonb, $7, $8::uuid, null, $9::text[], $10::text[])
returning id::text
""",
user_id,
unique_name,
st,
sr,
cat,
json.dumps(content_obj),
target_visibility,
source_role_id,
tag_copy,
os_copy,
)
files = await conn.fetch(
"select path, content from role_files where role_id=$1::uuid order by path",
source_role_id,
)
if files:
await conn.executemany(
"""
insert into role_files (role_id, path, content)
values ($1::uuid, $2, $3)
""",
[(new_id, str(fr["path"]), str(fr["content"])) for fr in files],
)
else:
await _upsert_role_files(conn, str(new_id), list(DEFAULT_ROLE_SCAFFOLD_FILES))
return str(new_id)
async def _user_is_admin(conn, user_id: str) -> bool:
role = await conn.fetchval("select role from users where id=$1::uuid", user_id)
return str(role or "") in ("admin", "root")
async def _is_active_team_member(conn, team_id: str | None, user_id: str) -> bool:
if not team_id:
return False
ok = await conn.fetchval(
"""
select 1 from team_memberships
where team_id = $1::uuid and user_id = $2::uuid and status = 'active'
""",
team_id,
user_id,
)
return ok is not None
async def _resolve_role_for_mutation(conn, role_id: str, user_id: str) -> tuple[str, bool]:
"""Return (effective_role_id, forked). Fork when editing another user's public role."""
row = await conn.fetchrow(
"""
select owner_id::text, visibility, team_id::text as team_id
from ansible_roles where id=$1::uuid
""",
role_id,
)
if not row:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
if row["owner_id"] == user_id:
return role_id, False
if await _user_is_admin(conn, user_id):
return role_id, False
vis = str(row["visibility"] or "personal")
tid = row["team_id"]
if vis == "team" and tid:
if await _is_active_team_member(conn, tid, user_id):
return role_id, False
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Join the team to edit this team role",
)
if vis == "personal":
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot edit another user's personal role")
if vis == "public":
new_id = await _fork_role_copy(conn, role_id, user_id, "personal")
return new_id, True
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Role access denied")
async def _clamp_visibility_on_update(conn, role_id: str, user_id: str, requested: str) -> str:
"""Owner may change personal↔public↔team (subject to team_id validation). Public stays public for non-admins."""
req = _normalize_role_visibility(requested)
row = await conn.fetchrow(
"""
select owner_id::text, coalesce(visibility, 'personal') as visibility
from ansible_roles where id=$1::uuid
""",
role_id,
)
if not row:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
cur = str(row["visibility"])
is_owner = str(row["owner_id"]) == user_id
is_admin = await _user_is_admin(conn, user_id)
if cur == "public":
if is_admin:
return req
return "public"
if cur == "personal":
if req == "public" and (is_owner or is_admin):
return "public"
if req == "team" and (is_owner or is_admin):
return "team"
return "personal"
if cur == "team":
if is_admin or is_owner:
return req
return cur
return _normalize_role_visibility(requested, default="personal")
async def _assert_can_view_role(conn, role_id: str, user_id: str) -> dict[str, str]:
row = await conn.fetchrow(
"""
select
owner_id::text as owner_id,
coalesce(visibility, 'public') as visibility,
team_id::text as team_id
from ansible_roles where id=$1::uuid
""",
role_id,
)
if not row:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
out = {"owner_id": str(row["owner_id"]), "visibility": str(row["visibility"]), "team_id": row["team_id"]}
if await _user_is_admin(conn, user_id):
return out
vis = str(row["visibility"])
if vis == "public":
return out
if row["owner_id"] == user_id:
return out
if vis == "team" and row["team_id"] and await _is_active_team_member(conn, str(row["team_id"]), user_id):
return out
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Role access denied")
def _validate_role_file_path(path: str) -> str:
normalized = str(path or "").strip().lstrip("/")
if not normalized or normalized.endswith("/"):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Role file path is invalid")
root = normalized.split("/", 1)[0]
if root not in ALLOWED_ROLE_ROOT_DIRS:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Role file path must start with one of: {', '.join(sorted(ALLOWED_ROLE_ROOT_DIRS))}",
)
return normalized
def _zip_member_to_rel_paths(namelist: list[str]) -> tuple[list[tuple[str, str]], str | None]:
"""Map zip entry -> relative path under role; strip one shared top-level folder (Galaxy-style layout)."""
clean: list[str] = []
for n in namelist:
n = n.replace("\\", "/").strip()
if not n or n.endswith("/"):
continue
if "__MACOSX" in n or "/.DS_Store" in n or n.endswith(".DS_Store"):
continue
clean.append(n)
if not clean:
return [], None
parts = [c.split("/") for c in clean]
first0 = parts[0][0]
if len(parts[0]) > 1 and all(len(p) > 1 and p[0] == first0 for p in parts):
pairs = [(clean[i], "/".join(parts[i][1:])) for i in range(len(clean))]
return pairs, first0
return [(c, c) for c in clean], None
def _decode_import_bytes(rel_path: str, raw: bytes) -> tuple[str, str] | None:
"""Turn zip bytes into stored path + text or ROLEFORGE_B64 body; skip invalid paths."""
rel_path = rel_path.replace("\\", "/").strip().lstrip("/")
if not rel_path or rel_path.endswith("/"):
return None
try:
text = raw.decode("utf-8")
except UnicodeDecodeError:
candidate = rel_path
if not candidate.startswith("files/"):
base = candidate.split("/")[-1]
safe = re.sub(r"[^a-zA-Z0-9_.-]+", "_", base).strip("._-")[:200] or "file"
candidate = f"files/{safe}"
try:
norm = _validate_role_file_path(candidate)
except HTTPException:
return None
return norm, ROLEFORGE_B64_PREFIX + base64.standard_b64encode(raw).decode("ascii")
try:
norm = _validate_role_file_path(rel_path)
except HTTPException:
return None
return norm, text
def _parse_role_zip_bytes(raw: bytes, filename_hint: str) -> tuple[str, list[tuple[str, str]]]:
try:
zf = zipfile.ZipFile(io.BytesIO(raw))
except zipfile.BadZipFile as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid ZIP file") from exc
pairs, root_folder = _zip_member_to_rel_paths(zf.namelist())
stem = Path(filename_hint or "imported.zip").stem
suggested = _safe_role_archive_dir_name(root_folder or stem)
merged: dict[str, str] = {}
for member, rel in pairs:
try:
data = zf.read(member)
except (KeyError, RuntimeError):
continue
got = _decode_import_bytes(rel, data)
if not got:
continue
path_s, body_s = got
merged[path_s] = body_s
if not merged:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No allowed role files found in archive (use paths under tasks/, handlers/, templates/, files/, …).",
)
items = sorted(merged.items(), key=lambda x: x[0])
return suggested, items
def _role_editor_lint_kind(path: str) -> Literal["yaml", "json"]:
lower = str(path or "").strip().lstrip("/").lower()
base = lower.split("/")[-1] if lower else ""
if base.endswith((".json", ".tfstate", ".ndjson")):
return "json"
return "yaml"
def _lint_yaml_role_file(content: str, pseudo_path: str, cfg: YamlLintConfig) -> list[LintRoleFileError]:
out: list[LintRoleFileError] = []
for problem in yaml_linter.run(content, cfg, pseudo_path):
lvl = str(getattr(problem, "level", "") or "error").lower()
rule = getattr(problem, "rule", None)
desc = getattr(problem, "desc", "") or ""
prefix = f"{rule}: " if rule else ""
out.append(
LintRoleFileError(
line=getattr(problem, "line", None),
column=getattr(problem, "column", None),
level=lvl,
message=f"{prefix}{desc}".strip(),
)
)
return out
async def _upsert_role_files(conn, role_id: str, files: list[tuple[str, str]]) -> None:
if not files:
return
await conn.executemany(
"""
insert into role_files (role_id, path, content)
values ($1::uuid, $2, $3)
on conflict (role_id, path)
do update set content=excluded.content, updated_at=now()
""",
[(role_id, path, content) for path, content in files],
)
async def _ensure_default_role_categories(conn) -> None:
for name in DEFAULT_ROLE_CATEGORIES:
await conn.execute(
"""
insert into role_categories (owner_id, name)
select null, $1
where not exists (
select 1 from role_categories where lower(name) = lower($1)
)
""",
name,
)
async def _prune_legacy_empty_tests_scaffold_for_user(conn, user_id: str) -> None:
"""Remove old default tests/test.yml (empty) left from earlier RoleForge scaffolds."""
await conn.execute(
"""
delete from role_files rf
using ansible_roles r
where rf.role_id = r.id
and r.owner_id = $1::uuid
and rf.path = 'tests/test.yml'
and btrim(coalesce(rf.content, '')) = ''
""",
user_id,
)
async def _backfill_role_files_for_user(conn, user_id: str) -> None:
rows = await conn.fetch(
"""
select r.id::text as id, r.content
from ansible_roles r
where r.owner_id=$1::uuid
""",
user_id,
)
for row in rows:
role_id = str(row["id"])
file_count = await conn.fetchval(
"select count(*)::int from role_files where role_id=$1::uuid",
role_id,
)
if int(file_count or 0) > 0:
continue
content = _parse_json_object(row["content"])
tasks_yaml = str(content.get("tasks_yaml") or "")
files = [*DEFAULT_ROLE_SCAFFOLD_FILES]
if tasks_yaml.strip():
files = [*files, ("tasks/main.yml", tasks_yaml)]
await _upsert_role_files(conn, role_id, files)
await _prune_legacy_empty_tests_scaffold_for_user(conn, user_id)
async def _monitor_job(
job_id: str, runner_url: str, runner_run_id: str, runner_name: str, runtime_mode: str
) -> None:
pool = get_pool()
settings = get_settings()
started = time.monotonic()
poll_errors = 0
offset = 0
final_status = "failed"
try:
while True:
if time.monotonic() - started > settings.app_runner_timeout_sec:
final_status = "failed"
break
try:
snapshot = await get_run_status(runner_url, runner_run_id, offset)
poll_errors = 0
except Exception: # noqa: BLE001
poll_errors += 1
if poll_errors >= settings.app_runner_max_poll_errors:
final_status = "failed"
break
await asyncio.sleep(settings.app_runner_poll_interval_sec)
continue
lines = snapshot.get("lines", [])
offset = int(snapshot.get("next_offset", offset))
async with pool.acquire() as conn:
await conn.execute(
"update jobs set runner_last_heartbeat=now() where id=$1::uuid",
job_id,
)
if lines:
await conn.executemany(
"insert into job_logs (job_id, log_line) values ($1::uuid, $2)",
[(job_id, line) for line in lines],
)
status_name = str(snapshot.get("status", "running"))
if status_name in {"success", "failed"}:
final_status = status_name
break
await asyncio.sleep(settings.app_runner_poll_interval_sec)
finally:
if AUTO_TERMINATE_TEST_RUNNER:
await terminate_ephemeral_worker(runner_name, runtime_mode)
async with pool.acquire() as conn:
await conn.execute(
"update jobs set status=$1, finished_at=now() where id=$2::uuid",
final_status,
job_id,
)
async def _monitor_test(
test_id: str, runner_url: str, runner_run_id: str, runner_name: str, runtime_mode: str
) -> None:
pool = get_pool()
settings = get_settings()
started = time.monotonic()
poll_errors = 0
offset = 0
final_status = "failed"
try:
while True:
if time.monotonic() - started > settings.app_runner_timeout_sec:
final_status = "failed"
break
try:
snapshot = await get_run_status(runner_url, runner_run_id, offset)
poll_errors = 0
except Exception: # noqa: BLE001
poll_errors += 1
if poll_errors >= settings.app_runner_max_poll_errors:
final_status = "failed"
break
await asyncio.sleep(settings.app_runner_poll_interval_sec)
continue
lines = snapshot.get("lines", [])
offset = int(snapshot.get("next_offset", offset))
async with pool.acquire() as conn:
await conn.execute(
"update test_runs set runner_last_heartbeat=now() where id=$1::uuid",
test_id,
)
if lines:
await conn.executemany(
"insert into test_logs (test_run_id, log_line) values ($1::uuid, $2)",
[(test_id, line) for line in lines],
)
status_name = str(snapshot.get("status", "running"))
if status_name in {"success", "failed"}:
final_status = status_name
break
await asyncio.sleep(settings.app_runner_poll_interval_sec)
finally:
await terminate_ephemeral_worker(runner_name, runtime_mode)
async with pool.acquire() as conn:
await conn.execute(
"""
update test_runs
set status=$1, finished_at=now()
where id=$2::uuid
and status not in ('success', 'failed')
""",
final_status,
test_id,
)
@router.post("/roles/lint-file", response_model=LintRoleFileResponse)
async def lint_role_file(
payload: LintRoleFileRequest,
user_id: str = Depends(get_current_user_id),
) -> LintRoleFileResponse:
"""Validate editor buffer: YAML via yamllint (DB `yamllint`); JSON via project JSONLint rules (DB `jsonlint`)."""
_ = user_id
_validate_role_file_path(payload.path)
kind = _role_editor_lint_kind(payload.path)
if kind == "json":
merged = await get_jsonlint_config_for_lint()
errors = run_json_lint(payload.content, merged)
blocking = [e for e in errors if (e.level or "").lower() == "error"]
return LintRoleFileResponse(ok=len(blocking) == 0, kind="json", errors=errors)
cfg = await get_yamllint_config_for_lint()
errors = _lint_yaml_role_file(payload.content, payload.path, cfg)
blocking = [e for e in errors if (e.level or "").lower() == "error"]
return LintRoleFileResponse(ok=len(blocking) == 0, kind="yaml", errors=errors)
@router.post("/roles/import-archive", response_model=RoleImportResult)
async def import_role_archive(
file: UploadFile = File(...),
user_id: str = Depends(get_current_user_id),
) -> RoleImportResult:
"""Parse an Ansible role ZIP (same layout as export); returns files for the create-role UI."""
_ = user_id
raw = await file.read()
max_bytes = 15 * 1024 * 1024
if len(raw) > max_bytes:
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail="ZIP is too large (max 15 MB).",
)
suggested, tuples = _parse_role_zip_bytes(raw, file.filename or "imported.zip")
return RoleImportResult(
suggested_name=suggested,
items=[RoleImportItem(path=p, content=c) for p, c in tuples],
)
@router.post("/roles")
async def create_role(payload: RoleCreate, user_id: str = Depends(get_current_user_id)) -> dict[str, str]:
name = _sanitize_ansible_role_name(payload.name)
if not name:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Role name is required")
_ = _normalize_role_visibility(payload.visibility)
visibility = "personal"
team_id_insert: str | None = None
if payload.team_id:
tid = str(payload.team_id).strip()
try:
UUID(tid)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid team_id") from exc
visibility = "team"
team_id_insert = tid
pool = get_pool()
async with pool.acquire() as conn:
if team_id_insert:
team_ok = await conn.fetchval("select 1 from teams where id=$1::uuid", team_id_insert)
if not team_ok:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found")
if not await _is_active_team_member(conn, team_id_insert, user_id):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Must be an active team member to create a team role",
)
if payload.category_id:
exists = await conn.fetchval(
"select 1 from role_categories where id=$1::uuid",
payload.category_id,
)
if not exists:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role category not found")
tags_norm = _normalize_role_tags(payload.tags)
os_norm = _normalize_os_families(payload.os_families)
role_id = await conn.fetchval(
"""
insert into ansible_roles (
owner_id, name, source_type, source_ref, category_id, content, visibility, team_id,
role_tags, os_families
)
values ($1, $2, $3, $4, $5::uuid, $6::jsonb, $7, $8::uuid, $9::text[], $10::text[])
returning id::text
""",
user_id,
name,
payload.source_type,
payload.source_ref,
payload.category_id,
json.dumps({"description": payload.description, **(payload.content or {})}),
visibility,
team_id_insert,
tags_norm,
os_norm,
)
validated_files = [(_validate_role_file_path(item.path), item.content) for item in payload.files]
await _upsert_role_files(
conn,
str(role_id),
[*DEFAULT_ROLE_SCAFFOLD_FILES, *validated_files],
)
return {"id": role_id, "name": name}
@router.post("/roles/import-yaml")
async def import_role_yaml(
payload: RoleYamlImportRequest, user_id: str = Depends(get_current_user_id)
) -> dict[str, str]:
try:
parsed = yaml.safe_load(payload.tasks_yaml)
except yaml.YAMLError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid YAML") from exc
if not isinstance(parsed, list):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Role tasks YAML must be a list of Ansible tasks",
)
name = _sanitize_ansible_role_name(payload.name)
if not name:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Role name is required")
_ = _normalize_role_visibility(payload.visibility)
visibility = "personal"
pool = get_pool()
async with pool.acquire() as conn:
if payload.category_id:
exists = await conn.fetchval(
"select 1 from role_categories where id=$1::uuid",
payload.category_id,
)
if not exists:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role category not found")
tags_norm = _normalize_role_tags(payload.tags)
os_norm = _normalize_os_families(payload.os_families)
role_id = await conn.fetchval(
"""
insert into ansible_roles (
owner_id, name, source_type, source_ref, category_id, content, visibility,
role_tags, os_families
)
values ($1::uuid, $2, 'inline', $3, $4::uuid, $5::jsonb, $6, $7::text[], $8::text[])
returning id::text
""",
user_id,
name,
payload.source_ref,
payload.category_id,
json.dumps({"description": payload.description, "tasks_yaml": payload.tasks_yaml}),
visibility,
tags_norm,
os_norm,
)
validated_files = [(_validate_role_file_path(item.path), item.content) for item in payload.files]
await _upsert_role_files(
conn,
str(role_id),
[
*DEFAULT_ROLE_SCAFFOLD_FILES,
("tasks/main.yml", payload.tasks_yaml),
*validated_files,
],
)
return {"id": role_id, "name": name}
@router.get("/roles/{role_id}/export-yaml")
async def export_role_yaml(role_id: str, user_id: str = Depends(get_current_user_id)) -> Response:
pool = get_pool()
async with pool.acquire() as conn:
await _assert_can_view_role(conn, role_id, user_id)
row = await conn.fetchrow(
"""
select content
from ansible_roles
where id=$1::uuid
""",
role_id,
)
if not row:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
content = _parse_json_object(row["content"])
tasks_yaml = content.get("tasks_yaml")
if not tasks_yaml:
tasks_yaml = yaml.safe_dump(content, sort_keys=False, allow_unicode=False)
return Response(content=tasks_yaml, media_type="application/x-yaml")
@router.get("/roles/{role_id}/export-archive")
async def export_role_archive(role_id: str, user_id: str = Depends(get_current_user_id)) -> Response:
pool = get_pool()
async with pool.acquire() as conn:
await _assert_can_view_role(conn, role_id, user_id)
row = await conn.fetchrow(
"""
select r.name, r.content
from ansible_roles r
where r.id=$1::uuid
""",
role_id,
)
if not row:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
content = _parse_json_object(row["content"])
file_rows = await conn.fetch(
"select path, content from role_files where role_id=$1::uuid order by path",
role_id,
)
if file_rows:
paths_body: dict[str, str] = {}
for fr in file_rows:
paths_body[str(fr["path"])] = str(fr["content"])
files = sorted(paths_body.items(), key=lambda x: x[0])
else:
tasks_yaml = str(content.get("tasks_yaml") or "")
merged: dict[str, str] = {path: body for path, body in DEFAULT_ROLE_SCAFFOLD_FILES}
if tasks_yaml.strip():
merged["tasks/main.yml"] = tasks_yaml
files = sorted(merged.items(), key=lambda x: x[0])
root = _safe_role_archive_dir_name(row["name"])
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for path, body in files:
arcname = f"{root}/{path}"
raw = _role_file_body_zip_bytes(path, body) if isinstance(body, str) else bytes(body)
zf.writestr(arcname, raw)
zip_bytes = buf.getvalue()
zip_filename = f"{root}.zip"
ascii_fallback = zip_filename.encode("ascii", "replace").decode("ascii")
disp = (
f'attachment; filename="{ascii_fallback}"; '
f"filename*=UTF-8''{quote(zip_filename)}"
)
return Response(
content=zip_bytes,
media_type="application/zip",
headers={"Content-Disposition": disp},
)
@router.get("/roles/{role_id}/files")
async def list_role_files(role_id: str, user_id: str = Depends(get_current_user_id)) -> dict[str, list[dict[str, str]]]:
pool = get_pool()
async with pool.acquire() as conn:
await _assert_can_view_role(conn, role_id, user_id)
rows = await conn.fetch(
"select path, content from role_files where role_id=$1::uuid order by path",
role_id,
)
return {"items": [{"path": row["path"], "content": row["content"]} for row in rows]}
@router.get("/roles/details/{role_id}")
async def get_role_details(role_id: str, user_id: str = Depends(get_current_user_id)) -> dict[str, Any]:
pool = get_pool()
async with pool.acquire() as conn:
await _assert_can_view_role(conn, role_id, user_id)
row = await conn.fetchrow(
"""
select
r.id::text as id,
r.owner_id::text as owner_id,
r.name,
r.source_type,
r.source_ref,
r.category_id::text as category_id,
r.created_at,
coalesce(r.visibility, 'personal') as visibility,
r.forked_from_id::text as forked_from_id,
r.team_id::text as team_id,
t.name as team_name,
c.name as category_name,
r.content,
coalesce(r.role_tags, array[]::text[]) as role_tags,
coalesce(r.os_families, array[]::text[]) as os_families
from ansible_roles r
left join role_categories c on c.id = r.category_id
left join teams t on t.id = r.team_id
where r.id=$1::uuid
""",
role_id,
)
if not row:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
is_admin = await _user_is_admin(conn, user_id)
content = _parse_json_object(row["content"])
is_owner = row["owner_id"] == user_id
vis = str(row["visibility"])
editable = bool(is_owner or is_admin)
deletable = (vis == "personal" and (is_owner or is_admin)) or (vis == "public" and is_admin)
return {
"id": row["id"],
"name": row["name"],
"source_type": row["source_type"],
"source_ref": row["source_ref"],
"category_id": row["category_id"],
"category_name": row["category_name"] or "Uncategorized",
"description": str(content.get("description") or ""),
"created_at": row["created_at"].isoformat() if row["created_at"] else "",
"visibility": vis,
"forked_from_id": row["forked_from_id"] or "",
"team_id": row["team_id"] or "",
"team_name": str(row["team_name"] or "") if row["team_id"] else "",
"tags": [str(t) for t in (row["role_tags"] or [])],
"os_families": [str(t) for t in (row["os_families"] or [])],
"is_owner": is_owner,
"is_admin": is_admin,
"editable": editable,
"deletable": deletable,
}
@router.post("/roles/{role_id}/fork")
async def fork_role(
role_id: str, payload: RoleForkRequest, user_id: str = Depends(get_current_user_id)
) -> dict[str, str]:
pool = get_pool()
async with pool.acquire() as conn:
await _assert_can_view_role(conn, role_id, user_id)
new_id = await _fork_role_copy(
conn,
role_id,
user_id,
"personal",
target_name=payload.name,
source_type=payload.source_type,
source_ref=payload.source_ref,
category_id=payload.category_id,
description=payload.description,
)
row = await conn.fetchrow("select name from ansible_roles where id=$1::uuid", new_id)
return {"id": str(new_id), "name": str(row["name"]) if row else payload.name}
@router.put("/roles/details/{role_id}")
async def update_role_details(
role_id: str,
payload: RoleUpdate,
user_id: str = Depends(get_current_user_id),
) -> dict[str, Any]:
name = _sanitize_ansible_role_name(payload.name)
if not name:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Role name is required")
pool = get_pool()
async with pool.acquire() as conn:
effective_id, forked = await _resolve_role_for_mutation(conn, role_id, user_id)
vis_payload = await _clamp_visibility_on_update(conn, effective_id, user_id, payload.visibility)
if payload.category_id:
exists = await conn.fetchval("select 1 from role_categories where id=$1::uuid", payload.category_id)
if not exists:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role category not found")
tags_norm = _normalize_role_tags(payload.tags)
os_norm = _normalize_os_families(payload.os_families)
tid_final: str | None = None
team_display_name = ""
if vis_payload == "team":
raw_new = str(payload.team_id or "").strip()
prev_row = await conn.fetchrow(
"select team_id::text as team_id from ansible_roles where id=$1::uuid",
effective_id,
)
prev_tid = str(prev_row["team_id"] or "").strip() if prev_row else ""
tid_use = raw_new or prev_tid
if not tid_use:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="team_id is required for team visibility",
)
try:
UUID(tid_use)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid team_id") from exc
team_exists = await conn.fetchval("select 1 from teams where id=$1::uuid", tid_use)
if not team_exists:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found")
is_member = await _is_active_team_member(conn, tid_use, user_id)
if not is_member and not await _user_is_admin(conn, user_id):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Must be an active member of the selected team",
)
tid_final = tid_use
team_display_name = str(
await conn.fetchval("select name from teams where id=$1::uuid", tid_final) or ""
)
await conn.execute(
"""
update ansible_roles
set
name=$1,
source_type=$2,
source_ref=$3,
category_id=$4::uuid,
content=$5::jsonb,
visibility=$6,
role_tags=$7::text[],
os_families=$8::text[],
team_id=$9::uuid
where id=$10::uuid
""",
name,
payload.source_type,
payload.source_ref,
payload.category_id,
json.dumps({"description": payload.description}),
vis_payload,
tags_norm,
os_norm,
tid_final,
effective_id,
)
return {
"status": "updated",
"id": effective_id,
"name": name,
"forked": forked,
"visibility": vis_payload,
"team_id": tid_final or "",
"team_name": team_display_name if vis_payload == "team" else "",
}
@router.put("/roles/{role_id}/files")
async def replace_role_files(
role_id: str,
payload: RoleFilesReplaceRequest,
user_id: str = Depends(get_current_user_id),
) -> dict[str, Any]:
pool = get_pool()
async with pool.acquire() as conn:
effective_id, forked = await _resolve_role_for_mutation(conn, role_id, user_id)
normalized: list[tuple[str, str]] = []
for item in payload.files:
path = _validate_role_file_path(item.path)
normalized.append((path, item.content))
await conn.execute("delete from role_files where role_id=$1::uuid", effective_id)
await _upsert_role_files(conn, effective_id, normalized if normalized else list(DEFAULT_ROLE_SCAFFOLD_FILES))
return {"status": "updated", "role_id": effective_id, "forked": forked}
@router.delete("/roles/{role_id}")
async def delete_role(role_id: str, user_id: str = Depends(get_current_user_id)) -> dict[str, str]:
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
select
owner_id::text,
coalesce(visibility, 'personal') as visibility,
team_id::text as team_id
from ansible_roles where id=$1::uuid
""",
role_id,
)
if not row:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
is_admin = await _user_is_admin(conn, user_id)
vis = str(row["visibility"])
owner_id = str(row["owner_id"])
if vis == "public":
if not is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only an administrator can delete a public role",
)
elif vis == "team" and row["team_id"]:
if not (
is_admin
or owner_id == user_id
or await _is_active_team_member(conn, str(row["team_id"]), user_id)
):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Role delete denied")
elif owner_id != user_id and not is_admin:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Role delete denied")
await conn.execute(
"update playbooks set role_ids = array_remove(role_ids, $1::uuid) where $1::uuid = any(role_ids)",
role_id,
)
await conn.execute("delete from ansible_roles where id=$1::uuid", role_id)
return {"status": "deleted", "id": role_id}
@router.get("/roles")
async def list_roles(
user_id: str = Depends(get_current_user_id),
team_id: str | None = Query(None, description="List Ansible roles for this team (members only)."),
) -> dict[str, list[dict]]:
pool = get_pool()
async with pool.acquire() as conn:
await _ensure_default_role_categories(conn)
await _backfill_role_files_for_user(conn, user_id)
if team_id:
if not await _is_active_team_member(conn, team_id, user_id):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not a member of this team")
rows = await conn.fetch(
"""
select
r.id::text as id,
r.name,
r.source_type,
r.source_ref,
r.created_at,
r.category_id::text as category_id,
c.name as category_name,
r.content,
coalesce(r.visibility, 'personal') as visibility,
r.owner_id::text as owner_id,
r.team_id::text as team_id,
coalesce(fc.cnt, 0)::int as file_count,
coalesce(r.role_tags, array[]::text[]) as role_tags,
coalesce(r.os_families, array[]::text[]) as os_families
from ansible_roles r
left join role_categories c on c.id = r.category_id
left join (
select role_id, count(*)::int as cnt
from role_files
group by role_id
) fc on fc.role_id = r.id
where r.team_id = $1::uuid and coalesce(r.visibility, 'personal') = 'team'
order by coalesce(c.name, 'Uncategorized'), r.name
""",
team_id,
)
else:
rows = await conn.fetch(
"""
select
r.id::text as id,
r.name,
r.source_type,
r.source_ref,
r.created_at,
r.category_id::text as category_id,
c.name as category_name,
r.content,
coalesce(r.visibility, 'personal') as visibility,
r.owner_id::text as owner_id,
r.team_id::text as team_id,
coalesce(fc.cnt, 0)::int as file_count,
coalesce(r.role_tags, array[]::text[]) as role_tags,
coalesce(r.os_families, array[]::text[]) as os_families
from ansible_roles r
left join role_categories c on c.id = r.category_id
left join (
select role_id, count(*)::int as cnt
from role_files
group by role_id
) fc on fc.role_id = r.id
where coalesce(r.visibility, 'personal') = 'public' or r.owner_id = $1::uuid
order by coalesce(c.name, 'Uncategorized'), r.name
""",
user_id,
)
items = [
{
"id": row["id"],
"name": row["name"],
"source_type": row["source_type"],
"source_ref": row["source_ref"],
"created_at": row["created_at"].isoformat() if row["created_at"] else "",
"category_id": row["category_id"],
"category_name": row["category_name"] or "Uncategorized",
"description": str(_parse_json_object(row["content"]).get("description") or ""),
"file_count": int(row["file_count"] or 0),
"visibility": str(row["visibility"]),
"is_owner": row["owner_id"] == user_id,
"tags": [str(t) for t in (row["role_tags"] or [])],
"os_families": [str(t) for t in (row["os_families"] or [])],
}
for row in rows
]
return {"items": items}
@router.get("/playbooks")
async def list_playbooks(user_id: str = Depends(get_current_user_id)) -> dict[str, list[dict]]:
pool = get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
select id::text as id, name, description, owner_id::text as owner_id
from playbooks
where owner_id=$1::uuid
order by name
""",
user_id,
)
return {
"items": [
{
"id": row["id"],
"name": row["name"],
"description": row["description"],
"owner_id": row["owner_id"],
}
for row in rows
]
}
@router.post("/playbooks/{playbook_id}/roles")
async def add_role_to_playbook(
playbook_id: str,
payload: AddRoleToPlaybookRequest,
user_id: str = Depends(get_current_user_id),
) -> dict[str, str]:
pool = get_pool()
async with pool.acquire() as conn:
owner_id = await conn.fetchval(
"select owner_id::text from playbooks where id=$1::uuid",
playbook_id,
)
if not owner_id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Playbook not found")
if owner_id != user_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Playbook access denied")
role_row = await conn.fetchrow(
"select owner_id::text, coalesce(visibility, 'public') as visibility from ansible_roles where id=$1::uuid",
payload.role_id,
)
if not role_row:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
if role_row["owner_id"] != user_id and role_row["visibility"] != "public":
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Role access denied")
await conn.execute(
"""
update playbooks
set role_ids = (
select array_agg(distinct role_id)
from unnest(role_ids || $1::uuid) as role_id
)
where id=$2::uuid
""",
payload.role_id,
playbook_id,
)
return {"status": "added", "playbook_id": playbook_id, "role_id": payload.role_id}
@router.get("/roles/categories")
async def list_role_categories(user_id: str = Depends(get_current_user_id)) -> dict[str, list[dict]]:
pool = get_pool()
async with pool.acquire() as conn:
await _ensure_default_role_categories(conn)
rows = await conn.fetch(
"""
select
c.id::text as id,
c.name,
c.created_at,
count(r.id)::int as role_count
from role_categories c
left join ansible_roles r on r.category_id = c.id
group by c.id
order by c.name
"""
)
return {
"items": [
{
"id": row["id"],
"name": row["name"],
"role_count": row["role_count"],
"created_at": row["created_at"].isoformat() if row["created_at"] else "",
}
for row in rows
]
}
@router.post("/roles/categories")
async def create_role_category(
payload: RoleCategoryCreate,
_: str = Depends(get_current_admin_user_id),
) -> dict[str, str]:
category_name = payload.name.strip()
if not category_name:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Category name is required")
pool = get_pool()
async with pool.acquire() as conn:
existing = await conn.fetchval(
"select id::text from role_categories where lower(name)=lower($1)",
category_name,
)
if existing:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Category already exists")
category_id = await conn.fetchval(
"""
insert into role_categories (owner_id, name)
values (null, $1)
returning id::text
""",
category_name,
)
return {"id": category_id}
@router.delete("/roles/categories/{category_id}")
async def delete_role_category(category_id: str, _: str = Depends(get_current_admin_user_id)) -> dict[str, str]:
pool = get_pool()
async with pool.acquire() as conn:
exists = await conn.fetchval("select 1 from role_categories where id=$1::uuid", category_id)
if not exists:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Category not found")
await conn.execute("update ansible_roles set category_id=null where category_id=$1::uuid", category_id)
await conn.execute("delete from role_categories where id=$1::uuid", category_id)
return {"status": "deleted", "id": category_id}
@router.patch("/roles/categories/{category_id}")
async def update_role_category(
category_id: str,
payload: RoleCategoryCreate,
_: str = Depends(get_current_admin_user_id),
) -> dict[str, str]:
category_name = payload.name.strip()
if not category_name:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Category name is required")
pool = get_pool()
async with pool.acquire() as conn:
exists = await conn.fetchval("select 1 from role_categories where id=$1::uuid", category_id)
if not exists:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Category not found")
duplicate = await conn.fetchval(
"select id::text from role_categories where lower(name)=lower($1) and id<>$2::uuid",
category_name,
category_id,
)
if duplicate:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Category already exists")
await conn.execute("update role_categories set name=$1 where id=$2::uuid", category_name, category_id)
return {"status": "updated", "id": category_id}
@router.get("/admin/config")
async def get_admin_config(_: str = Depends(get_current_admin_user_id)) -> dict[str, dict]:
settings = get_settings()
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow("select value from app_config where key='project'")
saved = _app_config_row_value_dict(row)
defaults = {
"app_name": settings.app_name,
"runner_image": settings.app_runner_image,
"docker_network": settings.app_docker_network,
"runner_timeout_sec": settings.app_runner_timeout_sec,
"runner_poll_interval_sec": settings.app_runner_poll_interval_sec,
"runner_max_poll_errors": settings.app_runner_max_poll_errors,
}
return {"items": {**defaults, **saved}}
@router.put("/admin/config")
async def update_admin_config(payload: dict, _: str = Depends(get_current_admin_user_id)) -> dict[str, str]:
if not isinstance(payload, dict):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Config payload must be an object")
pool = get_pool()
async with pool.acquire() as conn:
await conn.execute(
"""
insert into app_config (key, value, updated_at)
values ('project', $1::jsonb, now())
on conflict (key)
do update set value=excluded.value, updated_at=now()
""",
json.dumps(payload),
)
return {"status": "updated"}
@router.get("/admin/yaml-lint-config")
async def get_admin_yaml_lint_config(_: str = Depends(get_current_admin_user_id)) -> dict[str, Any]:
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow("select value from app_config where key = $1", "yamllint")
saved = _app_config_row_value_dict(row)
merged = merge_yamllint_saved(saved)
return {"rules": serialize_rules_for_api(merged), "meta": list(RULE_META)}
@router.put("/admin/yaml-lint-config")
async def put_admin_yaml_lint_config(payload: dict, _: str = Depends(get_current_admin_user_id)) -> dict[str, str]:
try:
normalized = validate_put_payload(payload)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
pool = get_pool()
async with pool.acquire() as conn:
await conn.execute(
"""
insert into app_config (key, value, updated_at)
values ('yamllint', $1::jsonb, now())
on conflict (key)
do update set value=excluded.value, updated_at=now()
""",
json.dumps(normalized),
)
invalidate_yamllint_config_cache()
await refresh_yamllint_config_cache()
return {"status": "updated"}
@router.get("/admin/json-lint-config")
async def get_admin_json_lint_config(_: str = Depends(get_current_admin_user_id)) -> dict[str, Any]:
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow("select value from app_config where key = $1", "jsonlint")
saved = _app_config_row_value_dict(row)
merged = merge_jsonlint_saved(saved)
return {"rules": serialize_jsonlint_rules_for_api(merged), "meta": list(JSONLINT_RULE_META)}
@router.put("/admin/json-lint-config")
async def put_admin_json_lint_config(payload: dict, _: str = Depends(get_current_admin_user_id)) -> dict[str, str]:
try:
normalized = validate_jsonlint_put_payload(payload)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
pool = get_pool()
async with pool.acquire() as conn:
await conn.execute(
"""
insert into app_config (key, value, updated_at)
values ('jsonlint', $1::jsonb, now())
on conflict (key)
do update set value=excluded.value, updated_at=now()
""",
json.dumps(normalized),
)
invalidate_jsonlint_config_cache()
await refresh_jsonlint_config_cache()
return {"status": "updated"}
@router.post("/inventories")
async def create_inventory(
payload: InventoryCreate, user_id: str = Depends(get_current_user_id)
) -> dict[str, str]:
pool = get_pool()
async with pool.acquire() as conn:
inventory_id = await conn.fetchval(
"""
insert into inventories (owner_id, name, inventory_text)
values ($1, $2, $3)
returning id::text
""",
user_id,
payload.name,
payload.inventory_text,
)
return {"id": inventory_id}
@router.post("/playbooks")
async def create_playbook(
payload: PlaybookCreate, user_id: str = Depends(get_current_user_id)
) -> dict[str, str]:
pool = get_pool()
async with pool.acquire() as conn:
inventory_owner = await conn.fetchval(
"select owner_id::text from inventories where id=$1::uuid",
payload.inventory_id,
)
if inventory_owner != user_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Inventory access denied")
playbook_id = await conn.fetchval(
"""
insert into playbooks (owner_id, name, description, playbook_yaml, inventory_id, role_ids, is_shared)
values ($1, $2, $3, $4, $5::uuid, $6::uuid[], $7)
returning id::text
""",
user_id,
payload.name,
payload.description,
payload.playbook_yaml,
payload.inventory_id,
payload.role_ids,
payload.is_shared,
)
return {"id": playbook_id}
@router.post("/playbooks/import-yaml")
async def import_playbook_yaml(
payload: PlaybookYamlImportRequest, user_id: str = Depends(get_current_user_id)
) -> dict[str, str]:
try:
parsed = yaml.safe_load(payload.playbook_yaml)
except yaml.YAMLError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid YAML") from exc
if not isinstance(parsed, list):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Playbook YAML must be a list of plays",
)
pool = get_pool()
async with pool.acquire() as conn:
inventory_owner = await conn.fetchval(
"select owner_id::text from inventories where id=$1::uuid",
payload.inventory_id,
)
if inventory_owner != user_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Inventory access denied")
playbook_id = await conn.fetchval(
"""
insert into playbooks (owner_id, name, description, playbook_yaml, inventory_id, role_ids, is_shared)
values ($1::uuid, $2, $3, $4, $5::uuid, $6::uuid[], $7)
returning id::text
""",
user_id,
payload.name,
payload.description,
payload.playbook_yaml,
payload.inventory_id,
payload.role_ids,
payload.is_shared,
)
return {"id": playbook_id}
@router.get("/playbooks/{playbook_id}/export-yaml")
async def export_playbook_yaml(playbook_id: str, user_id: str = Depends(get_current_user_id)) -> Response:
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
select owner_id::text as owner_id, is_shared, playbook_yaml
from playbooks
where id=$1::uuid
""",
playbook_id,
)
if not row:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Playbook not found")
if row["owner_id"] != user_id and not row["is_shared"]:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Playbook access denied")
return Response(content=row["playbook_yaml"], media_type="application/x-yaml")
@router.post("/jobs/launch")
async def launch_job(
payload: JobLaunchRequest, user_id: str = Depends(get_current_user_id)
) -> dict[str, str]:
pool = get_pool()
async with pool.acquire() as conn:
playbook = await conn.fetchrow(
"""
select p.id::text as id, p.owner_id::text as owner_id, p.is_shared,
p.playbook_yaml, i.inventory_text
from playbooks p
join inventories i on i.id = p.inventory_id
where p.id=$1::uuid
""",
payload.playbook_id,
)
if not playbook:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Playbook not found")
if playbook["owner_id"] != user_id and not playbook["is_shared"]:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Playbook access denied")
job_id = await conn.fetchval(
"""
insert into jobs (owner_id, playbook_id, status, extra_vars, runtime_mode)
values ($1::uuid, $2::uuid, 'queued', $3::jsonb, $4)
returning id::text
""",
user_id,
payload.playbook_id,
json.dumps(payload.extra_vars),
payload.runtime_mode,
)
queue_name = f"job.{job_id}"
try:
worker_container_id, worker_container_name, runner_url = await launch_ephemeral_worker(
queue_name,
payload.runtime_mode,
)
except RuntimeError as exc:
async with pool.acquire() as conn:
await conn.execute("delete from jobs where id=$1::uuid", job_id)
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=str(exc)) from exc
runner_run_id = await create_playbook_run(
runner_url,
playbook_yaml=playbook["playbook_yaml"],
inventory_text=playbook["inventory_text"],
extra_vars=payload.extra_vars,
)
async with pool.acquire() as conn:
await conn.execute(
"""
update jobs
set status='running',
started_at=now(),
runner_url=$1,
runner_run_id=$2,
runner_container_name=$3,
runner_last_heartbeat=now()
where id=$4::uuid
""",
runner_url,
runner_run_id,
worker_container_name,
job_id,
)
asyncio.create_task(
_monitor_job(
job_id,
runner_url,
runner_run_id,
worker_container_name,
payload.runtime_mode,
)
)
return {
"job_id": job_id,
"runner_run_id": runner_run_id,
"worker_container_id": worker_container_id,
}
@router.post("/tests/launch")
async def launch_test(
payload: TestLaunchRequest, user_id: str = Depends(get_current_user_id)
) -> dict[str, str]:
if not payload.playbook_id and not payload.role_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Either playbook_id or role_id must be provided",
)
pool = get_pool()
async with pool.acquire() as conn:
if payload.playbook_id:
playbook = await conn.fetchrow(
"select owner_id::text as owner_id, is_shared from playbooks where id=$1::uuid",
payload.playbook_id,
)
if not playbook:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Playbook not found")
if playbook["owner_id"] != user_id and not playbook["is_shared"]:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Playbook access denied")
if payload.role_id:
role = await conn.fetchrow(
"select owner_id::text as owner_id, coalesce(visibility, 'public') as visibility from ansible_roles where id=$1::uuid",
payload.role_id,
)
if not role:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
if role["owner_id"] != user_id and role["visibility"] != "public":
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Role access denied")
test_id = await conn.fetchval(
"""
insert into test_runs (owner_id, playbook_id, role_id, status, runtime_mode, hosts_blueprint, extra_vars)
values ($1::uuid, $2::uuid, $3::uuid, 'queued', $4, $5::jsonb, $6::jsonb)
returning id::text
""",
user_id,
payload.playbook_id,
payload.role_id,
payload.runtime_mode,
json.dumps([host.model_dump() for host in payload.hosts]),
json.dumps(payload.extra_vars),
)
queue_name = f"test.{test_id}"
try:
worker_container_id, worker_container_name, runner_url = await launch_ephemeral_worker(
queue_name,
payload.runtime_mode,
)
except RuntimeError as exc:
async with pool.acquire() as conn:
await conn.execute("delete from test_runs where id=$1::uuid", test_id)
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=str(exc)) from exc
if payload.playbook_id:
async with pool.acquire() as conn:
row = await conn.fetchrow(
"select playbook_yaml from playbooks where id=$1::uuid",
payload.playbook_id,
)
runner_run_id = await create_molecule_playbook_run(
runner_url,
playbook_yaml=row["playbook_yaml"],
hosts=[host.model_dump() for host in payload.hosts],
extra_vars=payload.extra_vars,
)
else:
async with pool.acquire() as conn:
row = await conn.fetchrow(
"select name, content from ansible_roles where id=$1::uuid",
payload.role_id,
)
file_tasks = await conn.fetchval(
"select content from role_files where role_id=$1::uuid and path='tasks/main.yml'",
payload.role_id,
)
role_content = _parse_json_object(row["content"])
runner_run_id = await create_molecule_role_run(
runner_url,
role_name=row["name"],
role_tasks_yaml=str(
file_tasks
or role_content.get("tasks_yaml")
or "- name: ping\n ansible.builtin.ping:\n"
),
hosts=[host.model_dump() for host in payload.hosts],
extra_vars=payload.extra_vars,
)
async with pool.acquire() as conn:
await conn.execute(
"""
update test_runs
set status='running',
started_at=now(),
runner_url=$1,
runner_run_id=$2,
runner_container_name=$3,
runner_last_heartbeat=now()
where id=$4::uuid
""",
runner_url,
runner_run_id,
worker_container_name,
test_id,
)
asyncio.create_task(
_monitor_test(
test_id,
runner_url,
runner_run_id,
worker_container_name,
payload.runtime_mode,
)
)
return {
"test_id": test_id,
"runner_run_id": runner_run_id,
"worker_container_id": worker_container_id,
}
@router.get("/tests/os-options")
async def list_test_os_options(_: str = Depends(get_current_user_id)) -> dict[str, list[dict[str, str]]]:
return {"items": _scan_os_options()}
@router.get("/admin/os-images")
async def list_admin_os_images(_: str = Depends(get_current_admin_user_id)) -> dict[str, list[dict[str, str]]]:
return {"items": _scan_os_options()}
@router.post("/admin/os-images/build-all")
async def build_all_admin_os_images(_: str = Depends(get_current_admin_user_id)) -> dict[str, Any]:
options = _scan_os_options()
if not options:
return {"status": "noop", "items": [], "logs": ["No dockerfiles found."]}
logs: list[str] = []
results: list[dict[str, str]] = []
repo_root = str(DOCKERFILES_ROOT.parent)
for opt in options:
one = await _build_os_image(
dockerfile_rel=str(opt["dockerfile"]),
image_tag=str(opt["image"]),
platform=str(opt.get("platform") or "").strip(),
build_platforms=str(opt.get("build_platforms") or "").strip(),
)
logs.extend(one.get("logs") or [])
results.append({k: v for k, v in one.items() if k != "logs"})
if one.get("status") != "success":
return {"status": "failed", "items": results, "logs": logs}
return {"status": "success", "items": results, "logs": logs}
@router.post("/admin/os-images/build-one")
async def build_one_admin_os_image(
payload: dict[str, str],
_: str = Depends(get_current_admin_user_id),
) -> dict[str, Any]:
dockerfile_rel = str(payload.get("dockerfile") or "").strip()
image_tag = str(payload.get("image") or "").strip()
platform = str(payload.get("platform") or "").strip()
build_platforms = str(payload.get("build_platforms") or "").strip()
if dockerfile_rel and (not platform or not build_platforms):
resolved = _resolve_os_option_by_dockerfile(dockerfile_rel)
if resolved:
if not platform:
platform = str(resolved.get("platform") or "").strip()
if not build_platforms:
build_platforms = str(resolved.get("build_platforms") or "").strip()
if not platform or not build_platforms:
inf_platform, inf_matrix = _infer_platform_defaults(dockerfile_rel)
if not platform:
platform = inf_platform
if not build_platforms:
build_platforms = inf_matrix
if not dockerfile_rel or not image_tag:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="dockerfile and image are required")
return await _build_os_image(
dockerfile_rel=dockerfile_rel,
image_tag=image_tag,
platform=platform,
build_platforms=build_platforms,
)
@router.post("/admin/os-images/build-one-stream")
async def build_one_admin_os_image_stream(
payload: dict[str, str],
_: str = Depends(get_current_admin_user_id),
) -> StreamingResponse:
dockerfile_rel = str(payload.get("dockerfile") or "").strip()
image_tag = str(payload.get("image") or "").strip()
platform = str(payload.get("platform") or "").strip()
build_platforms = [p.strip() for p in str(payload.get("build_platforms") or "").split(",") if p.strip()]
if dockerfile_rel and (not platform or not build_platforms):
resolved = _resolve_os_option_by_dockerfile(dockerfile_rel)
if resolved:
if not platform:
platform = str(resolved.get("platform") or "").strip()
if not build_platforms:
build_platforms = [
p.strip() for p in str(resolved.get("build_platforms") or "").split(",") if p.strip()
]
if not platform or not build_platforms:
inf_platform, inf_matrix = _infer_platform_defaults(dockerfile_rel)
if not platform:
platform = inf_platform
if not build_platforms:
build_platforms = [p.strip() for p in inf_matrix.split(",") if p.strip()]
if not dockerfile_rel or not image_tag:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="dockerfile and image are required")
repo_root = str(DOCKERFILES_ROOT.parent)
dockerfile_abs = str(DOCKERFILES_ROOT / dockerfile_rel)
async def _stream() -> Any:
proc: asyncio.subprocess.Process | None = None
try:
buildx_mode, buildx_note = await _ensure_docker_buildx_mode()
yield f"[roleforge] {buildx_note}\n"
if buildx_mode == "none":
yield "__ROLEFORGE_BUILD_STATUS__:failed:127\n"
return
builder_ok, builder_note = await _ensure_buildx_builder()
yield f"[roleforge] {builder_note}\n"
if not builder_ok:
yield "__ROLEFORGE_BUILD_STATUS__:failed:127\n"
return
def stream_buildx_cmd(tag: str, platform_value: str) -> list[str]:
core = ["buildx", "build", "--builder", BUILDX_BUILDER_NAME, "-f", dockerfile_abs, "-t", tag]
if platform_value:
core.extend(["--platform", platform_value])
core.extend(["--load", repo_root])
return ["docker", *core]
matrix = build_platforms[:] if build_platforms else ([platform] if platform else [])
if matrix:
for p in matrix:
arch = p.split("/")[-1]
tag = f"{image_tag}-{arch}"
step_cmd = stream_buildx_cmd(tag, p)
yield f"$ {' '.join(step_cmd)}\n"
proc = await asyncio.create_subprocess_exec(
*step_cmd,
cwd=repo_root,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
assert proc.stdout is not None
while True:
line = await proc.stdout.readline()
if not line:
break
yield line.decode(errors="replace")
code = await proc.wait()
if code != 0:
yield f"__ROLEFORGE_BUILD_STATUS__:failed:{code}\n"
return
# canonical tag (always pin platform to avoid host-arch ambiguity on ARM hosts)
effective_platform = (platform or "").strip() or (matrix[0] if matrix else "")
final_cmd = stream_buildx_cmd(image_tag, effective_platform)
yield f"$ {' '.join(final_cmd)}\n"
proc = await asyncio.create_subprocess_exec(
*final_cmd,
cwd=repo_root,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
assert proc.stdout is not None
while True:
line = await proc.stdout.readline()
if not line:
break
yield line.decode(errors="replace")
code = await proc.wait()
status_name = "success" if code == 0 else "failed"
yield f"__ROLEFORGE_BUILD_STATUS__:{status_name}:{code}\n"
except asyncio.CancelledError:
if proc is not None and proc.returncode is None:
proc.terminate()
try:
await asyncio.wait_for(proc.wait(), timeout=3.0)
except Exception: # noqa: BLE001
proc.kill()
raise
return StreamingResponse(_stream(), media_type="text/plain; charset=utf-8")
@router.get("/tests/{test_id}")
async def get_test_status(test_id: str, user_id: str = Depends(get_current_user_id)) -> dict[str, str]:
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
select id::text as id, owner_id::text as owner_id, status, created_at, started_at, finished_at
from test_runs
where id=$1::uuid
""",
test_id,
)
if not row:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Test run not found")
if row["owner_id"] != user_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Test run access denied")
return {
"id": row["id"],
"status": row["status"],
"created_at": row["created_at"].isoformat() if row["created_at"] else "",
"started_at": row["started_at"].isoformat() if row["started_at"] else "",
"finished_at": row["finished_at"].isoformat() if row["finished_at"] else "",
}
@router.post("/tests/{test_id}/stop")
async def stop_test(test_id: str, user_id: str = Depends(get_current_user_id)) -> dict[str, str]:
"""Terminate the ephemeral runner for this test and mark the run as failed (user-initiated stop)."""
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
select owner_id::text as owner_id, runner_container_name, runtime_mode, status
from test_runs
where id=$1::uuid
""",
test_id,
)
if not row:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Test run not found")
if row["owner_id"] != user_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Test run access denied")
status_val = str(row["status"] or "")
if status_val in ("success", "failed"):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Test run already finished")
runner_name = row["runner_container_name"]
runtime_mode = str(row["runtime_mode"] or "docker")
if not runner_name:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="No runner container for this test")
await terminate_ephemeral_worker(runner_name, runtime_mode)
async with pool.acquire() as conn:
await conn.execute(
"""
insert into test_logs (test_run_id, log_line)
values ($1::uuid, $2)
""",
test_id,
"[roleforge] Test stopped by user (runner container terminated).",
)
await conn.execute(
"""
update test_runs
set status='failed', finished_at=now()
where id=$1::uuid
and status not in ('success', 'failed')
""",
test_id,
)
return {"status": "stopped", "test_id": test_id}
@router.get("/tests/{test_id}/logs")
async def get_test_logs(
test_id: str,
after_id: int = 0,
limit: int = 200,
user_id: str = Depends(get_current_user_id),
) -> dict[str, Any]:
"""Return log lines after `after_id` (test_logs.id), chronological order — for live polling."""
safe_limit = max(1, min(limit, 5000))
safe_after = max(0, after_id)
pool = get_pool()
async with pool.acquire() as conn:
owner_id = await conn.fetchval("select owner_id::text from test_runs where id=$1::uuid", test_id)
if not owner_id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Test run not found")
if owner_id != user_id:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Test run access denied")
rows = await conn.fetch(
"""
select id, log_line
from test_logs
where test_run_id=$1::uuid and id > $2::bigint
order by id asc
limit $3
""",
test_id,
safe_after,
safe_limit,
)
max_log_id = await conn.fetchval(
"select coalesce(max(id), 0)::bigint from test_logs where test_run_id=$1::uuid",
test_id,
)
items = [{"id": int(row["id"]), "line": row["log_line"]} for row in rows]
next_after = int(rows[-1]["id"]) if rows else safe_after
return {
"items": items,
"next_after_id": next_after,
"max_log_id": int(max_log_id or 0),
}
@router.get("/runners/active")
async def active_runners(_: str = Depends(get_current_user_id)) -> dict[str, list[dict[str, str]]]:
return {"items": await list_active_runners()}
@router.post("/runners/{runner_name}/stop")
async def stop_runner(
runner_name: str,
payload: RunnerStopRequest,
_: str = Depends(get_current_user_id),
) -> dict[str, str]:
await terminate_ephemeral_worker(runner_name, payload.runtime_mode)
return {"status": "stopped", "runner_name": runner_name, "runtime_mode": payload.runtime_mode}