обновлён /admin/config и API для os_registry. - Molecule/раннер: env из конфигурации, ensure roleforge-os (ensure_roleforge_os.yml), os_registry_pull и доработки executors / runner / create.yml. - /admin/os-images: выбор реестра, buildx (в т.ч. split amd64+arm64 + imagetools), опция --no-cache, стрим логов; domain.py: план команд build, ретраи push. - UI: брендинг (app_name, app_tagline) из app_config через get_ui_branding_context; base.xhtml, role-create / role-view, core.js, pages-main, стили. - Dockerfiles: требование Python ≥3.9 (assert), доработки alt9/astra/debian9/ubuntu20 и др.; новые Dockerfile.arm64 для centos7/centos8. - Конфиг: .env.example, config.py, pyproject.toml.
3238 lines
120 KiB
Python
3238 lines
120 KiB
Python
import asyncio
|
|
import base64
|
|
import io
|
|
import json
|
|
import re
|
|
from uuid import UUID
|
|
import subprocess
|
|
import time
|
|
import zipfile
|
|
from pathlib import Path
|
|
from urllib.parse import quote
|
|
|
|
import yaml
|
|
from collections.abc import AsyncIterator
|
|
from typing import Any, Literal, NamedTuple
|
|
|
|
from fastapi import APIRouter, Body, Depends, File, HTTPException, Query, Response, UploadFile, status
|
|
from fastapi.responses import StreamingResponse
|
|
from yamllint import linter as yaml_linter
|
|
from yamllint.config import YamlLintConfig
|
|
|
|
from app.deps.auth import get_current_admin_user_id, get_current_user_id
|
|
from app.db.pool import get_pool
|
|
from app.schemas.domain import (
|
|
AddRoleToPlaybookRequest,
|
|
InventoryCreate,
|
|
JobLaunchRequest,
|
|
LintRoleFileError,
|
|
LintRoleFileRequest,
|
|
LintRoleFileResponse,
|
|
PlaybookCreate,
|
|
PlaybookYamlImportRequest,
|
|
RoleCategoryCreate,
|
|
RoleCreate,
|
|
RoleFilePayload,
|
|
RoleForkRequest,
|
|
RoleFilesReplaceRequest,
|
|
RoleImportItem,
|
|
RoleImportResult,
|
|
RoleUpdate,
|
|
RoleYamlImportRequest,
|
|
RunnerStopRequest,
|
|
TestLaunchRequest,
|
|
)
|
|
from app.core.config import get_settings
|
|
from app.services.dynamic_worker import launch_ephemeral_worker, list_active_runners, terminate_ephemeral_worker
|
|
from app.services.jsonlint_rules import (
|
|
JSONLINT_RULE_META,
|
|
merge_jsonlint_saved,
|
|
run_json_lint,
|
|
serialize_rules_for_api as serialize_jsonlint_rules_for_api,
|
|
validate_jsonlint_put_payload,
|
|
)
|
|
from app.services.jsonlint_runtime import (
|
|
get_jsonlint_config_for_lint,
|
|
invalidate_jsonlint_config_cache,
|
|
refresh_jsonlint_config_cache,
|
|
)
|
|
from app.services.yamllint_rules import RULE_META, merge_yamllint_saved, serialize_rules_for_api, validate_put_payload
|
|
from app.services.yamllint_runtime import get_yamllint_config_for_lint, invalidate_yamllint_config_cache, refresh_yamllint_config_cache
|
|
from app.services.runner_client import (
|
|
create_molecule_playbook_run,
|
|
create_molecule_role_run,
|
|
create_playbook_run,
|
|
get_run_status,
|
|
)
|
|
from app.services.os_registry_pull import (
|
|
CONFIG_KEY as OS_REGISTRY_CONFIG_KEY,
|
|
encrypt_registry_secret,
|
|
parse_stored_os_registry,
|
|
)
|
|
|
|
router = APIRouter(tags=["domain"])
|
|
|
|
_PROJECT_ADMIN_KEYS = frozenset(
|
|
{
|
|
"app_name",
|
|
"app_tagline",
|
|
"runner_image",
|
|
"docker_network",
|
|
"runner_timeout_sec",
|
|
"runner_poll_interval_sec",
|
|
"runner_max_poll_errors",
|
|
}
|
|
)
|
|
|
|
# Temporary debug switch: keep ephemeral test runner alive after run completion.
|
|
# Flip back to True when you want normal auto-cleanup again.
|
|
AUTO_TERMINATE_TEST_RUNNER = True
|
|
DOCKERFILES_ROOT = Path(__file__).resolve().parents[2] / "dockerfiles"
|
|
BUILDX_BUILDER_NAME = "roleforge-builder"
|
|
# Hub uploads often flake (broken pipe / resets); BuildKit also reports context canceled if the HTTP stream is torn down mid-push.
|
|
_OS_BUILDX_PUSH_RETRIES = 3
|
|
_OS_BUILDX_PUSH_RETRY_DELAY_SEC = 10
|
|
|
|
|
|
def _buildx_attestation_off_args() -> list[str]:
|
|
"""Skip provenance/SBOM attestations — fewer manifests and less push churn to Docker Hub."""
|
|
return ["--provenance=false", "--sbom=false"]
|
|
|
|
|
|
def _should_retry_os_registry_cmd(cmd: list[str]) -> bool:
|
|
"""Retry flaky registry operations (buildx --push and manifest creation)."""
|
|
if not cmd:
|
|
return False
|
|
if "buildx" in cmd and "build" in cmd and "--push" in cmd:
|
|
return True
|
|
if "buildx" in cmd and "imagetools" in cmd and "create" in cmd:
|
|
return True
|
|
return False
|
|
|
|
|
|
def _manifest_stage_tag(remote_ref: str, slug: str) -> str:
|
|
"""Temporary tag for per-arch pushes before `buildx imagetools create` merges the manifest."""
|
|
r = str(remote_ref or "").strip()
|
|
if not r:
|
|
return f"__invalid:{slug}"
|
|
if ":" not in r:
|
|
return f"{r}__rfstage_{slug}"
|
|
repo_part, tag_part = r.rsplit(":", 1)
|
|
return f"{repo_part}:{tag_part}__rfstage_{slug}"
|
|
|
|
|
|
def _paired_amd64_arm64_paths(dockerfile_abs: str) -> tuple[str | None, str | None]:
|
|
p = Path(dockerfile_abs)
|
|
if p.name != "Dockerfile":
|
|
return None, None
|
|
arm = p.parent / "Dockerfile.arm64"
|
|
if not arm.is_file():
|
|
return None, None
|
|
return str(p.resolve()), str(arm.resolve())
|
|
|
|
|
|
def _should_retry_buildx_push_output(blob: str) -> bool:
|
|
t = blob.lower()
|
|
needles = (
|
|
"broken pipe",
|
|
"connection reset",
|
|
"connection refused",
|
|
"context canceled",
|
|
"context cancelled",
|
|
"timeout",
|
|
"unexpected eof",
|
|
"eof",
|
|
"tls:",
|
|
"bad gateway",
|
|
"gateway timeout",
|
|
" 502 ",
|
|
" 503 ",
|
|
"use of closed network connection",
|
|
"failed to do request",
|
|
"temporary failure",
|
|
)
|
|
return any(n in t for n in needles)
|
|
|
|
|
|
def _run_platform_for_os_image(platform: str, build_platforms: str) -> str:
|
|
"""Architectural platform hint for Molecule (`docker_container.platform`).
|
|
|
|
Single-arch images: pin that arch (often linux/amd64). Multi-arch matrices: return empty so the
|
|
runner picks the host-native slice (Apple Silicon gets linux/arm64, no forced amd64 emulation).
|
|
"""
|
|
p = str(platform or "").strip()
|
|
if p:
|
|
return p
|
|
parts = [x.strip() for x in str(build_platforms or "").split(",") if x.strip()]
|
|
if len(parts) > 1:
|
|
return ""
|
|
return parts[0] if parts else ""
|
|
|
|
|
|
def _scan_os_options() -> list[dict[str, str]]:
|
|
"""Discover dockerfiles under dockerfiles/.
|
|
|
|
When both ``Dockerfile`` and ``Dockerfile.arm64`` exist in the same OS directory, emit a single
|
|
matrix row that builds/pushes one manifest tag (``roleforge-os:<os>``) with amd64 + arm64 slices
|
|
using the two dockerfiles.
|
|
"""
|
|
items: list[dict[str, str]] = []
|
|
root = DOCKERFILES_ROOT
|
|
if not root.exists():
|
|
return items
|
|
|
|
all_paths = sorted(root.rglob("Dockerfile*"))
|
|
by_parent: dict[str, list[Path]] = {}
|
|
for path in all_paths:
|
|
rel = path.relative_to(root)
|
|
key = str(rel.parent)
|
|
by_parent.setdefault(key, []).append(path)
|
|
|
|
paired_parents: set[str] = set()
|
|
for parent_key, paths in by_parent.items():
|
|
names = {p.name for p in paths}
|
|
if "Dockerfile" in names and "Dockerfile.arm64" in names:
|
|
paired_parents.add(parent_key)
|
|
|
|
merged_parents: set[str] = set()
|
|
|
|
for path in all_paths:
|
|
rel = path.relative_to(root)
|
|
parent_key = str(rel.parent)
|
|
os_key = rel.parent.name
|
|
|
|
if parent_key in paired_parents:
|
|
if path.name != "Dockerfile":
|
|
continue
|
|
if parent_key in merged_parents:
|
|
continue
|
|
merged_parents.add(parent_key)
|
|
rel_arm = (root / rel.parent / "Dockerfile.arm64").relative_to(root)
|
|
image_tag = f"roleforge-os:{os_key}"
|
|
build_platforms = "linux/amd64,linux/arm64"
|
|
platform = ""
|
|
run_platform = _run_platform_for_os_image(platform, build_platforms)
|
|
items.append(
|
|
{
|
|
"id": f"{os_key}:manifest",
|
|
"name": f"{os_key} (amd64 + arm64)",
|
|
"dockerfile": str(rel),
|
|
"dockerfile_arm64": str(rel_arm),
|
|
"image": image_tag,
|
|
"command": "/sbin/init",
|
|
"systemd": "true",
|
|
"platform": platform,
|
|
"build_platforms": build_platforms,
|
|
"run_platform": run_platform,
|
|
}
|
|
)
|
|
continue
|
|
|
|
variant = path.name.replace("Dockerfile", "").strip(".")
|
|
suffix = f" ({variant})" if variant else ""
|
|
image_tag = f"roleforge-os:{os_key}{('-' + variant) if variant else ''}"
|
|
if variant == "arm64":
|
|
platform = "linux/arm64"
|
|
build_platforms = "linux/arm64"
|
|
elif os_key in {"astra-linux", "redos", "centos7", "centos8"}:
|
|
platform = "linux/amd64"
|
|
build_platforms = "linux/amd64"
|
|
else:
|
|
platform = ""
|
|
build_platforms = "linux/amd64,linux/arm64"
|
|
run_platform = _run_platform_for_os_image(platform, build_platforms)
|
|
items.append(
|
|
{
|
|
"id": f"{os_key}:{variant or 'default'}",
|
|
"name": f"{os_key}{suffix}",
|
|
"dockerfile": str(rel),
|
|
"image": image_tag,
|
|
"command": "/sbin/init",
|
|
"systemd": "true",
|
|
"platform": platform,
|
|
"build_platforms": build_platforms,
|
|
"run_platform": run_platform,
|
|
}
|
|
)
|
|
return items
|
|
|
|
|
|
def _resolve_os_option_by_dockerfile(dockerfile_rel: str) -> dict[str, str] | None:
|
|
needle = str(dockerfile_rel or "").strip().replace("\\", "/")
|
|
if not needle:
|
|
return None
|
|
for item in _scan_os_options():
|
|
candidates: list[str] = []
|
|
df = str(item.get("dockerfile") or "").strip().replace("\\", "/")
|
|
if df:
|
|
candidates.append(df)
|
|
dfa = item.get("dockerfile_arm64")
|
|
if dfa:
|
|
candidates.append(str(dfa).strip().replace("\\", "/"))
|
|
for candidate in candidates:
|
|
if candidate == needle or candidate.endswith(needle) or needle.endswith(candidate):
|
|
return item
|
|
return None
|
|
|
|
|
|
def _infer_platform_defaults(dockerfile_rel: str) -> tuple[str, str]:
|
|
hit = _resolve_os_option_by_dockerfile(str(dockerfile_rel))
|
|
if hit:
|
|
return (str(hit.get("platform") or "").strip(), str(hit.get("build_platforms") or "").strip())
|
|
s = str(dockerfile_rel or "").replace("\\", "/").lower()
|
|
if "dockerfile.arm64" in s:
|
|
return ("linux/arm64", "linux/arm64")
|
|
return ("", "linux/amd64,linux/arm64")
|
|
|
|
|
|
async def _ensure_docker_buildx_mode() -> tuple[str, str]:
|
|
check = await asyncio.to_thread(
|
|
subprocess.run,
|
|
["docker", "buildx", "version"],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
if check.returncode == 0:
|
|
return "local", "docker buildx is available"
|
|
|
|
install_attempts = [
|
|
"apt-get update && apt-get install -y docker-buildx-plugin",
|
|
"apt-get update && apt-get install -y docker-buildx",
|
|
]
|
|
for cmd in install_attempts:
|
|
installed = await asyncio.to_thread(
|
|
subprocess.run,
|
|
["sh", "-lc", cmd],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
if installed.returncode == 0:
|
|
recheck = await asyncio.to_thread(
|
|
subprocess.run,
|
|
["docker", "buildx", "version"],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
if recheck.returncode == 0:
|
|
return "local", "docker buildx installed via package manager"
|
|
|
|
msg = (check.stderr or check.stdout or "").strip() or "unknown error"
|
|
return "none", f"docker buildx unavailable and auto-install failed: {msg}"
|
|
|
|
|
|
async def _ensure_buildx_builder() -> tuple[bool, str]:
|
|
inspect = await asyncio.to_thread(
|
|
subprocess.run,
|
|
["docker", "buildx", "inspect", BUILDX_BUILDER_NAME],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
if inspect.returncode != 0:
|
|
created = await asyncio.to_thread(
|
|
subprocess.run,
|
|
["docker", "buildx", "create", "--name", BUILDX_BUILDER_NAME, "--driver", "docker-container"],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
if created.returncode != 0:
|
|
err = (created.stderr or created.stdout or "").strip() or "unknown error"
|
|
return False, f"failed to create buildx builder '{BUILDX_BUILDER_NAME}': {err}"
|
|
|
|
boot = await asyncio.to_thread(
|
|
subprocess.run,
|
|
["docker", "buildx", "inspect", BUILDX_BUILDER_NAME, "--bootstrap"],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
if boot.returncode != 0:
|
|
err = (boot.stderr or boot.stdout or "").strip() or "unknown error"
|
|
return False, f"failed to bootstrap buildx builder '{BUILDX_BUILDER_NAME}': {err}"
|
|
return True, f"buildx builder '{BUILDX_BUILDER_NAME}' is ready"
|
|
|
|
|
|
def _normalize_os_registry_prefix(raw: str) -> str:
|
|
return (raw or "").strip().rstrip("/")
|
|
|
|
|
|
def _normalize_registry_host(raw: str) -> str:
|
|
"""Host[:port] only for docker login / push (strip scheme and path)."""
|
|
s = (raw or "").strip()
|
|
if not s:
|
|
return ""
|
|
s = s.replace("https://", "").replace("http://", "").strip()
|
|
return s.split("/", 1)[0].strip().rstrip("/")
|
|
|
|
|
|
class ResolvedOsPush(NamedTuple):
|
|
kind: str # hub | registry | none
|
|
target: str # push target base (see legacy_full_image)
|
|
docker_login_server: str | None # docker login server arg
|
|
legacy_full_image: bool # True: target is prefix + full roleforge-os:tag image ref
|
|
|
|
|
|
def _resolve_os_push_config(registry_payload: dict[str, Any] | None) -> ResolvedOsPush:
|
|
"""UI/registry_* fields override ROLEFORGE_OS_* env defaults."""
|
|
p = registry_payload or {}
|
|
provider = str(p.get("registry_provider") or "").strip().lower()
|
|
|
|
if provider in ("harbor", "nexus"):
|
|
host = _normalize_registry_host(str(p.get("registry_host") or ""))
|
|
repo_path = "/".join(
|
|
x.strip()
|
|
for x in str(p.get("repository_path") or "").replace("\\", "/").split("/")
|
|
if x.strip()
|
|
)
|
|
if not host or not repo_path:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="registry_host and repository_path are required for Harbor and Nexus.",
|
|
)
|
|
target = f"{host}/{repo_path}"
|
|
return ResolvedOsPush("registry", target, host, False)
|
|
|
|
if provider not in ("hub", "", "docker_hub"):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Unknown registry_provider: {provider or '(empty)'}",
|
|
)
|
|
|
|
repo_path = str(p.get("repository_path") or "").strip().rstrip("/")
|
|
if repo_path:
|
|
return ResolvedOsPush("hub", repo_path, None, False)
|
|
|
|
kind, tgt = _os_push_kind_and_target()
|
|
if kind == "hub":
|
|
return ResolvedOsPush("hub", tgt, None, False)
|
|
if kind == "registry":
|
|
return ResolvedOsPush("registry", tgt, tgt, True)
|
|
return ResolvedOsPush("none", "", None, False)
|
|
|
|
|
|
def _os_push_kind_and_target() -> tuple[str, str]:
|
|
"""('hub', 'user/repo') | ('registry', 'host[:port]') | ('none', '')."""
|
|
s = get_settings()
|
|
hub_repo = (s.roleforge_os_docker_hub_repository or "").strip().rstrip("/")
|
|
if hub_repo:
|
|
return "hub", hub_repo
|
|
reg = _normalize_os_registry_prefix(s.roleforge_os_image_registry or "")
|
|
if reg:
|
|
return "registry", reg
|
|
return "none", ""
|
|
|
|
|
|
def _os_image_push_display() -> str:
|
|
kind, target = _os_push_kind_and_target()
|
|
if kind == "hub":
|
|
return f"docker.io/{target}"
|
|
return target
|
|
|
|
|
|
async def _stream_process_stdout_chunks(proc: asyncio.subprocess.Process) -> AsyncIterator[str]:
|
|
"""Yield decoded stdout; chunk reads (not readline) so Docker progress bars using \\r do not stall."""
|
|
if proc.stdout is None:
|
|
return
|
|
while True:
|
|
block = await proc.stdout.read(65536)
|
|
if not block:
|
|
break
|
|
yield block.decode(errors="replace")
|
|
|
|
|
|
async def _reap_subprocess_after_stream_disconnect(proc: asyncio.subprocess.Process | None) -> None:
|
|
"""Drain stdout and wait — avoids killing BuildKit on client disconnect (prevents spurious context canceled)."""
|
|
if proc is None:
|
|
return
|
|
try:
|
|
if proc.returncode is not None:
|
|
return
|
|
if proc.stdout is not None:
|
|
while await proc.stdout.read(65536):
|
|
pass
|
|
await asyncio.wait_for(proc.wait(), timeout=6 * 3600)
|
|
except asyncio.TimeoutError:
|
|
try:
|
|
proc.kill()
|
|
except ProcessLookupError:
|
|
pass
|
|
except Exception: # noqa: BLE001
|
|
try:
|
|
proc.kill()
|
|
except ProcessLookupError:
|
|
pass
|
|
|
|
|
|
async def _docker_login_stdin_password(
|
|
*,
|
|
username: str,
|
|
password: str,
|
|
registry_server: str | None,
|
|
) -> tuple[int, bytes]:
|
|
cmd = ["docker", "login", "--username", username, "--password-stdin"]
|
|
if registry_server:
|
|
cmd.append(registry_server)
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdin=asyncio.subprocess.PIPE,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.STDOUT,
|
|
)
|
|
stdout, _ = await proc.communicate(input=password.encode())
|
|
code = int(proc.returncode or 0)
|
|
return code, stdout or b""
|
|
|
|
|
|
def _remote_os_image_ref(registry_prefix: str, image_tag: str) -> str:
|
|
"""Canonical tag roleforge-os:ubuntu20 -> localhost:5000/roleforge-os:ubuntu20."""
|
|
p = _normalize_os_registry_prefix(registry_prefix)
|
|
tag = str(image_tag or "").strip()
|
|
if not p:
|
|
return tag
|
|
return f"{p}/{tag}"
|
|
|
|
|
|
def _remote_push_reference(
|
|
kind: str,
|
|
target: str,
|
|
image_tag: str,
|
|
*,
|
|
legacy_full_image: bool = False,
|
|
) -> str:
|
|
"""roleforge-os:ubuntu20 → hub user/repo:ubuntu20 or host/proj/img:ubuntu20."""
|
|
if legacy_full_image:
|
|
return _remote_os_image_ref(target, image_tag)
|
|
_, sep, suffix = str(image_tag).partition(":")
|
|
tag_suffix = (suffix.strip() if sep else str(image_tag).strip()) or "latest"
|
|
return f"{target}:{tag_suffix}"
|
|
|
|
|
|
def _manual_pull_and_tag_shell_commands(
|
|
kind: str,
|
|
push_target: str,
|
|
image_tag: str,
|
|
*,
|
|
legacy_full_image: bool = False,
|
|
) -> list[str]:
|
|
"""Lines to run locally after a registry push (pull remote digest + alias for Molecule)."""
|
|
remote = _remote_push_reference(kind, push_target, image_tag, legacy_full_image=legacy_full_image)
|
|
local = str(image_tag).strip()
|
|
return [f"docker pull {remote}", f"docker tag {remote} {local}"]
|
|
|
|
|
|
def _os_build_command_plan(
|
|
*,
|
|
dockerfile_abs: str,
|
|
image_tag: str,
|
|
platform: str,
|
|
build_platforms_csv: str,
|
|
repo_root: str,
|
|
push_kind: str,
|
|
push_target: str,
|
|
legacy_full_image: bool = False,
|
|
no_cache: bool = False,
|
|
) -> tuple[list[list[str]], str | None]:
|
|
matrix = [p.strip() for p in str(build_platforms_csv or "").split(",") if p.strip()]
|
|
if not matrix and str(platform or "").strip():
|
|
matrix = [str(platform).strip()]
|
|
if not matrix:
|
|
return [], "No platforms to build (set platform / build_platforms)."
|
|
multi = len(matrix) > 1
|
|
target_ok = push_kind in ("hub", "registry") and bool(_normalize_os_registry_prefix(push_target))
|
|
|
|
def buildx_load(tag: str, plat: str) -> list[str]:
|
|
core = ["docker", "buildx", "build", "--builder", BUILDX_BUILDER_NAME]
|
|
if no_cache:
|
|
core.append("--no-cache")
|
|
core.extend(["-f", dockerfile_abs, "-t", tag])
|
|
core.extend(_buildx_attestation_off_args())
|
|
if plat:
|
|
core.extend(["--platform", plat])
|
|
core.extend(["--load", repo_root])
|
|
return core
|
|
|
|
cmds: list[list[str]] = []
|
|
|
|
manifest_platforms = {"linux/amd64", "linux/arm64"}
|
|
amd_path, arm_path = _paired_amd64_arm64_paths(dockerfile_abs)
|
|
use_split_manifest = (
|
|
multi
|
|
and target_ok
|
|
and bool(amd_path and arm_path)
|
|
and set(matrix) == manifest_platforms
|
|
)
|
|
|
|
if use_split_manifest and amd_path and arm_path:
|
|
remote = _remote_push_reference(
|
|
push_kind,
|
|
push_target,
|
|
image_tag,
|
|
legacy_full_image=legacy_full_image,
|
|
)
|
|
stage_amd = _manifest_stage_tag(remote, "amd64")
|
|
stage_arm = _manifest_stage_tag(remote, "arm64")
|
|
amd_cmd = [
|
|
"docker",
|
|
"buildx",
|
|
"build",
|
|
"--builder",
|
|
BUILDX_BUILDER_NAME,
|
|
]
|
|
if no_cache:
|
|
amd_cmd.append("--no-cache")
|
|
amd_cmd.extend(
|
|
[
|
|
"-f",
|
|
amd_path,
|
|
"-t",
|
|
stage_amd,
|
|
*_buildx_attestation_off_args(),
|
|
"--platform",
|
|
"linux/amd64",
|
|
"--push",
|
|
repo_root,
|
|
]
|
|
)
|
|
cmds.append(amd_cmd)
|
|
arm_cmd = [
|
|
"docker",
|
|
"buildx",
|
|
"build",
|
|
"--builder",
|
|
BUILDX_BUILDER_NAME,
|
|
]
|
|
if no_cache:
|
|
arm_cmd.append("--no-cache")
|
|
arm_cmd.extend(
|
|
[
|
|
"-f",
|
|
arm_path,
|
|
"-t",
|
|
stage_arm,
|
|
*_buildx_attestation_off_args(),
|
|
"--platform",
|
|
"linux/arm64",
|
|
"--push",
|
|
repo_root,
|
|
]
|
|
)
|
|
cmds.append(arm_cmd)
|
|
cmds.append(["docker", "buildx", "imagetools", "create", "-t", remote, stage_amd, stage_arm])
|
|
return cmds, None
|
|
|
|
if multi:
|
|
if not target_ok:
|
|
return [], (
|
|
"Multi-arch OS images need ROLEFORGE_OS_DOCKER_HUB_REPOSITORY or ROLEFORGE_OS_IMAGE_REGISTRY "
|
|
"(plain docker load cannot store one multi-arch tag locally)."
|
|
)
|
|
remote = _remote_push_reference(
|
|
push_kind,
|
|
push_target,
|
|
image_tag,
|
|
legacy_full_image=legacy_full_image,
|
|
)
|
|
multi_cmd = [
|
|
"docker",
|
|
"buildx",
|
|
"build",
|
|
"--builder",
|
|
BUILDX_BUILDER_NAME,
|
|
]
|
|
if no_cache:
|
|
multi_cmd.append("--no-cache")
|
|
multi_cmd.extend(
|
|
[
|
|
"-f",
|
|
dockerfile_abs,
|
|
"-t",
|
|
remote,
|
|
*_buildx_attestation_off_args(),
|
|
"--platform",
|
|
",".join(matrix),
|
|
"--push",
|
|
repo_root,
|
|
]
|
|
)
|
|
cmds.append(multi_cmd)
|
|
return cmds, None
|
|
|
|
single_plat = matrix[0]
|
|
if target_ok:
|
|
remote = _remote_push_reference(
|
|
push_kind,
|
|
push_target,
|
|
image_tag,
|
|
legacy_full_image=legacy_full_image,
|
|
)
|
|
one_cmd = [
|
|
"docker",
|
|
"buildx",
|
|
"build",
|
|
"--builder",
|
|
BUILDX_BUILDER_NAME,
|
|
]
|
|
if no_cache:
|
|
one_cmd.append("--no-cache")
|
|
one_cmd.extend(
|
|
[
|
|
"-f",
|
|
dockerfile_abs,
|
|
"-t",
|
|
remote,
|
|
*_buildx_attestation_off_args(),
|
|
"--platform",
|
|
single_plat,
|
|
"--push",
|
|
repo_root,
|
|
]
|
|
)
|
|
cmds.append(one_cmd)
|
|
else:
|
|
cmds.append(buildx_load(image_tag, single_plat))
|
|
|
|
return cmds, None
|
|
|
|
|
|
def _require_registry_credentials(kind: str, username: str | None, password: str | None) -> None:
|
|
if kind not in ("hub", "registry"):
|
|
return
|
|
if not str(username or "").strip() or not str(password or ""):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="docker_login_username and docker_login_password are required for registry pushes.",
|
|
)
|
|
|
|
|
|
def _registry_payload_slice(payload: dict[str, Any]) -> dict[str, Any]:
|
|
return {
|
|
"registry_provider": payload.get("registry_provider"),
|
|
"registry_host": payload.get("registry_host"),
|
|
"repository_path": payload.get("repository_path"),
|
|
}
|
|
|
|
|
|
def _request_bool_flag(payload: dict[str, Any], key: str = "no_cache") -> bool:
|
|
"""Parse UI/API booleans; accepts JSON bool or common string/int truthy forms."""
|
|
v = payload.get(key)
|
|
if isinstance(v, bool):
|
|
return v
|
|
if v is None:
|
|
return False
|
|
if isinstance(v, (int, float)):
|
|
return bool(v)
|
|
s = str(v).strip().lower()
|
|
return s in ("1", "true", "yes", "on")
|
|
|
|
|
|
async def _build_os_image(
|
|
dockerfile_rel: str,
|
|
image_tag: str,
|
|
platform: str,
|
|
build_platforms: str = "",
|
|
*,
|
|
docker_login_username: str | None = None,
|
|
docker_login_password: str | None = None,
|
|
registry_payload: dict[str, Any] | None = None,
|
|
no_cache: bool = False,
|
|
) -> dict[str, Any]:
|
|
logs: list[str] = []
|
|
|
|
def _failed_payload(code: str) -> dict[str, Any]:
|
|
return {
|
|
"dockerfile": dockerfile_rel,
|
|
"image": image_tag,
|
|
"platform": platform,
|
|
"status": "failed",
|
|
"exit_code": code,
|
|
"logs": logs,
|
|
}
|
|
|
|
cfg = _resolve_os_push_config(registry_payload)
|
|
push_kind, push_target = cfg.kind, cfg.target
|
|
legacy_layout = cfg.legacy_full_image
|
|
login_server = cfg.docker_login_server
|
|
if push_kind == "hub":
|
|
logs.append(f"[roleforge] Docker Hub repository: docker.io/{push_target}")
|
|
elif push_kind == "registry":
|
|
logs.append(
|
|
f"[roleforge] Registry push target: {push_target}"
|
|
+ (" (legacy prefix layout)" if legacy_layout else "")
|
|
)
|
|
|
|
buildx_mode, buildx_note = await _ensure_docker_buildx_mode()
|
|
logs.append(f"[roleforge] {buildx_note}")
|
|
if buildx_mode == "none":
|
|
return {
|
|
"dockerfile": dockerfile_rel,
|
|
"image": image_tag,
|
|
"platform": platform,
|
|
"status": "failed",
|
|
"exit_code": "127",
|
|
"logs": logs,
|
|
}
|
|
builder_ok, builder_note = await _ensure_buildx_builder()
|
|
logs.append(f"[roleforge] {builder_note}")
|
|
if not builder_ok:
|
|
return {
|
|
"dockerfile": dockerfile_rel,
|
|
"image": image_tag,
|
|
"platform": platform,
|
|
"status": "failed",
|
|
"exit_code": "127",
|
|
"logs": logs,
|
|
}
|
|
if no_cache:
|
|
logs.append("[roleforge] Docker BuildKit cache disabled for this run (--no-cache).")
|
|
|
|
du = str(docker_login_username or "").strip()
|
|
dp = str(docker_login_password or "")
|
|
if push_kind == "hub":
|
|
if not du or not dp:
|
|
logs.append("[roleforge] Registry credentials are required (docker_login_username / docker_login_password).")
|
|
return _failed_payload("3")
|
|
logs.append(f"$ docker login --username {du!r} --password-stdin (Docker Hub)")
|
|
lg_code, lg_out = await _docker_login_stdin_password(username=du, password=dp, registry_server=None)
|
|
logs.extend(lg_out.decode(errors="replace").splitlines()[-40:])
|
|
if lg_code != 0:
|
|
return _failed_payload(str(lg_code))
|
|
elif push_kind == "registry":
|
|
if not du or not dp:
|
|
logs.append("[roleforge] Registry credentials are required (docker_login_username / docker_login_password).")
|
|
return _failed_payload("3")
|
|
srv = login_server or push_target.split("/")[0]
|
|
logs.append(f"$ docker login --username {du!r} --password-stdin {srv}")
|
|
lg_code, lg_out = await _docker_login_stdin_password(username=du, password=dp, registry_server=srv)
|
|
logs.extend(lg_out.decode(errors="replace").splitlines()[-40:])
|
|
if lg_code != 0:
|
|
return _failed_payload(str(lg_code))
|
|
|
|
repo_root = str(DOCKERFILES_ROOT.parent)
|
|
dockerfile_abs = str(DOCKERFILES_ROOT / dockerfile_rel)
|
|
cmds, plan_err = _os_build_command_plan(
|
|
dockerfile_abs=dockerfile_abs,
|
|
image_tag=image_tag,
|
|
platform=platform,
|
|
build_platforms_csv=build_platforms,
|
|
repo_root=repo_root,
|
|
push_kind=push_kind,
|
|
push_target=push_target,
|
|
legacy_full_image=legacy_layout,
|
|
no_cache=no_cache,
|
|
)
|
|
if plan_err:
|
|
logs.append(f"[roleforge] {plan_err}")
|
|
return _failed_payload("2")
|
|
|
|
manual_pull_commands: list[str] = []
|
|
if push_kind in ("hub", "registry") and push_target.strip():
|
|
manual_pull_commands = _manual_pull_and_tag_shell_commands(
|
|
push_kind,
|
|
push_target,
|
|
image_tag,
|
|
legacy_full_image=legacy_layout,
|
|
)
|
|
|
|
async def run_one(cmd: list[str]) -> int:
|
|
max_attempts = _OS_BUILDX_PUSH_RETRIES if _should_retry_os_registry_cmd(cmd) else 1
|
|
last = 1
|
|
for attempt in range(max_attempts):
|
|
if attempt:
|
|
logs.append(
|
|
f"[roleforge] buildx push retry {attempt + 1}/{max_attempts} "
|
|
f"after {_OS_BUILDX_PUSH_RETRY_DELAY_SEC}s…"
|
|
)
|
|
await asyncio.sleep(_OS_BUILDX_PUSH_RETRY_DELAY_SEC)
|
|
logs.append(f"$ {' '.join(cmd)}")
|
|
proc = await asyncio.to_thread(
|
|
subprocess.run,
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
cwd=repo_root,
|
|
)
|
|
out = (proc.stdout or "").splitlines()
|
|
err = (proc.stderr or "").splitlines()
|
|
if out:
|
|
logs.extend(out[-200:])
|
|
if err:
|
|
logs.extend(err[-200:])
|
|
last = int(proc.returncode or 0)
|
|
if last == 0:
|
|
return 0
|
|
blob = f"{proc.stdout or ''}\n{proc.stderr or ''}"[-12000:]
|
|
if attempt < max_attempts - 1 and _should_retry_buildx_push_output(blob):
|
|
continue
|
|
return last
|
|
return last
|
|
|
|
for cmd in cmds:
|
|
code = await run_one(cmd)
|
|
if code != 0:
|
|
return {
|
|
"dockerfile": dockerfile_rel,
|
|
"image": image_tag,
|
|
"platform": platform,
|
|
"status": "failed",
|
|
"exit_code": str(code),
|
|
"logs": logs,
|
|
}
|
|
|
|
return {
|
|
"dockerfile": dockerfile_rel,
|
|
"image": image_tag,
|
|
"platform": platform,
|
|
"status": "success",
|
|
"exit_code": "0",
|
|
"logs": logs,
|
|
"manual_pull_commands": manual_pull_commands,
|
|
}
|
|
|
|
|
|
def _app_config_row_value_dict(row: Any) -> dict[str, Any]:
|
|
"""Decode `app_config.value` (JSONB) to a dict. Handles asyncpg objects and JSON stored as str."""
|
|
if not row:
|
|
return {}
|
|
try:
|
|
raw = row["value"]
|
|
except (KeyError, TypeError):
|
|
return {}
|
|
if raw is None:
|
|
return {}
|
|
if isinstance(raw, dict):
|
|
return dict(raw)
|
|
if isinstance(raw, str):
|
|
try:
|
|
parsed = json.loads(raw)
|
|
except (json.JSONDecodeError, TypeError, ValueError):
|
|
return {}
|
|
return dict(parsed) if isinstance(parsed, dict) else {}
|
|
return {}
|
|
|
|
|
|
# Binary uploads store body as this prefix + standard base64 (see pages-main.js ROLEFORGE_B64_PREFIX).
|
|
ROLEFORGE_B64_PREFIX = "ROLEFORGE_B64:"
|
|
LEGACY_FILES_UPLOAD_PREFIX = "files/_roleforge_uploads/"
|
|
|
|
|
|
def _role_file_body_zip_bytes(path: str, body: str) -> bytes:
|
|
"""Bytes for role archive: UTF-8 text, or base64-decoded legacy / prefixed binary."""
|
|
s = body if isinstance(body, str) else str(body)
|
|
if s.startswith(ROLEFORGE_B64_PREFIX):
|
|
raw_b64 = "".join(s[len(ROLEFORGE_B64_PREFIX) :].split())
|
|
try:
|
|
return base64.standard_b64decode(raw_b64, validate=False)
|
|
except Exception: # noqa: BLE001
|
|
return s.encode("utf-8")
|
|
if str(path).startswith(LEGACY_FILES_UPLOAD_PREFIX):
|
|
raw_b64 = "".join(s.split())
|
|
try:
|
|
return base64.standard_b64decode(raw_b64, validate=False)
|
|
except Exception: # noqa: BLE001
|
|
return s.encode("utf-8")
|
|
return s.encode("utf-8")
|
|
|
|
|
|
DEFAULT_ROLE_CATEGORIES = (
|
|
"Base OS Hardening",
|
|
"User and SSH Access",
|
|
"Package Management",
|
|
"System Services",
|
|
"Time and NTP",
|
|
"Logging and Audit",
|
|
"Monitoring and Observability",
|
|
"Backup and Restore",
|
|
"Disaster Recovery",
|
|
"Network Baseline",
|
|
"DNS and DHCP",
|
|
"PKI and Certificates",
|
|
"Secrets and Vault",
|
|
"Firewall and Security",
|
|
"Identity and SSO",
|
|
"Container Runtime",
|
|
"Kubernetes Bootstrap",
|
|
"Kubernetes Add-ons",
|
|
"Ingress and Load Balancing",
|
|
"Databases",
|
|
"Message Brokers",
|
|
"Caching and KV Stores",
|
|
"Web Servers",
|
|
"Reverse Proxy",
|
|
"CI/CD and Build Agents",
|
|
"Artifact Registry",
|
|
"Infrastructure Provisioning",
|
|
"Cloud Integrations",
|
|
"Storage and Filesystems",
|
|
"NFS and SMB",
|
|
"Virtualization",
|
|
"High Availability",
|
|
"Performance Tuning",
|
|
"Compliance and Benchmarks",
|
|
"Middleware and App Runtime",
|
|
"Developer Tooling",
|
|
"Automation Utilities",
|
|
"Molecule Tests",
|
|
"Migration and Upgrade",
|
|
"Custom Business Logic",
|
|
)
|
|
|
|
ALLOWED_ROLE_ROOT_DIRS = {
|
|
"tasks",
|
|
"handlers",
|
|
"templates",
|
|
"files",
|
|
"vars",
|
|
"defaults",
|
|
"meta",
|
|
"library",
|
|
"plugins",
|
|
"module_utils",
|
|
"lookup_plugins",
|
|
"filter_plugins",
|
|
"tests",
|
|
}
|
|
|
|
# New roles get these files only — no tests/ tree (Molecule uses its own layout on the runner).
|
|
DEFAULT_ROLE_SCAFFOLD_FILES = (
|
|
("tasks/main.yml", ""),
|
|
("handlers/main.yml", ""),
|
|
("defaults/main.yml", ""),
|
|
("vars/main.yml", ""),
|
|
("meta/main.yml", ""),
|
|
)
|
|
|
|
|
|
def _parse_json_object(value) -> dict:
|
|
if isinstance(value, dict):
|
|
return value
|
|
if isinstance(value, str):
|
|
try:
|
|
parsed = json.loads(value)
|
|
except Exception: # noqa: BLE001
|
|
return {}
|
|
return parsed if isinstance(parsed, dict) else {}
|
|
return {}
|
|
|
|
|
|
def _safe_role_archive_dir_name(name: str) -> str:
|
|
"""Filesystem-safe single segment for zip root (Ansible role folder name)."""
|
|
base = str(name or "").strip().lower() or "role"
|
|
base = re.sub(r"[^a-z0-9_.-]+", "_", base).strip("._-") or "role"
|
|
return base[:120]
|
|
|
|
|
|
def _sanitize_ansible_role_name(raw: str) -> str | None:
|
|
"""
|
|
Ansible-friendly role name: lowercase; spaces and any character outside [a-z0-9_] become '_'.
|
|
Collapses repeated underscores; trims leading/trailing underscores.
|
|
Returns None if nothing usable remains (caller should reject empty input).
|
|
"""
|
|
s = str(raw or "").strip().lower()
|
|
s = re.sub(r"[^a-z0-9_]+", "_", s)
|
|
s = re.sub(r"_+", "_", s).strip("_")
|
|
if not s:
|
|
return None
|
|
return s[:120]
|
|
|
|
|
|
ROLE_VISIBILITY_VALUES = frozenset({"public", "team", "personal"})
|
|
|
|
|
|
def _normalize_role_visibility(raw: str | None, default: str = "public") -> str:
|
|
v = str(raw if raw is not None else default).strip().lower() or default
|
|
if v not in ROLE_VISIBILITY_VALUES:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid role visibility")
|
|
return v
|
|
|
|
|
|
ROLE_OS_FAMILY_VALUES = frozenset({"rhel", "debian", "alpine", "suse", "arch", "bsd", "windows", "universal"})
|
|
|
|
|
|
def _normalize_role_tags(raw: object | None, *, max_tags: int = 32, max_len_each: int = 48) -> list[str]:
|
|
if raw is None:
|
|
return []
|
|
parts: list[str]
|
|
if isinstance(raw, str):
|
|
parts = [p.strip() for p in re.split(r"[,;]+", raw) if p.strip()]
|
|
elif isinstance(raw, (list, tuple)):
|
|
parts = [str(p).strip() for p in raw if str(p).strip()]
|
|
else:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="tags must be a list or comma-separated text")
|
|
out: list[str] = []
|
|
seen: set[str] = set()
|
|
for p in parts:
|
|
if len(p) > max_len_each:
|
|
p = p[:max_len_each].rstrip()
|
|
key = p.casefold()
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
out.append(p)
|
|
if len(out) >= max_tags:
|
|
break
|
|
return out
|
|
|
|
|
|
def _normalize_os_families(raw: object | None) -> list[str]:
|
|
if raw is None:
|
|
return []
|
|
if not isinstance(raw, (list, tuple)):
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="os_families must be a list")
|
|
out: list[str] = []
|
|
seen: set[str] = set()
|
|
for item in raw:
|
|
slug = str(item).strip().lower()
|
|
if slug not in ROLE_OS_FAMILY_VALUES:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Invalid os_family {item!r}; allowed: {', '.join(sorted(ROLE_OS_FAMILY_VALUES))}",
|
|
)
|
|
if slug in seen:
|
|
continue
|
|
seen.add(slug)
|
|
out.append(slug)
|
|
return out
|
|
|
|
|
|
async def _allocate_unique_role_name(conn, user_id: str, base: str) -> str:
|
|
base_s = _sanitize_ansible_role_name(base) or "role"
|
|
for i in range(80):
|
|
cand = base_s if i == 0 else f"{base_s}_copy{i}"[:120]
|
|
exists = await conn.fetchval(
|
|
"select 1 from ansible_roles where owner_id=$1::uuid and name=$2",
|
|
user_id,
|
|
cand,
|
|
)
|
|
if not exists:
|
|
return cand
|
|
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Could not allocate a unique role name")
|
|
|
|
|
|
async def _fork_role_copy(
|
|
conn,
|
|
source_role_id: str,
|
|
user_id: str,
|
|
target_visibility: str,
|
|
target_name: str | None = None,
|
|
*,
|
|
source_type: str | None = None,
|
|
source_ref: str | None = None,
|
|
category_id: str | None = None,
|
|
description: str | None = None,
|
|
) -> str:
|
|
row = await conn.fetchrow(
|
|
"""
|
|
select
|
|
name,
|
|
source_type,
|
|
source_ref,
|
|
category_id,
|
|
content,
|
|
coalesce(role_tags, array[]::text[]) as role_tags,
|
|
coalesce(os_families, array[]::text[]) as os_families
|
|
from ansible_roles where id=$1::uuid
|
|
""",
|
|
source_role_id,
|
|
)
|
|
if not row:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
|
|
orig_name_s = _sanitize_ansible_role_name(str(row["name"])) or ""
|
|
if target_name is not None:
|
|
unique_name = _sanitize_ansible_role_name(target_name)
|
|
if not unique_name:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Role name is required")
|
|
if unique_name == orig_name_s:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Fork name must differ from the original role name",
|
|
)
|
|
taken = await conn.fetchval(
|
|
"select 1 from ansible_roles where owner_id=$1::uuid and name=$2",
|
|
user_id,
|
|
unique_name,
|
|
)
|
|
if taken:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_409_CONFLICT, detail="You already have a role with this name"
|
|
)
|
|
else:
|
|
unique_name = await _allocate_unique_role_name(conn, user_id, str(row["name"]))
|
|
content_obj = row["content"]
|
|
if not isinstance(content_obj, dict):
|
|
content_obj = _parse_json_object(content_obj)
|
|
content_obj = dict(content_obj)
|
|
if target_name is None:
|
|
st = str(row["source_type"] or "inline")
|
|
sr = "fork"
|
|
cat = row["category_id"]
|
|
else:
|
|
content_obj["description"] = str(description if description is not None else "")
|
|
st = str(source_type or "inline").strip()
|
|
sr = str(source_ref if source_ref is not None else "")
|
|
cat = category_id
|
|
if cat:
|
|
cok = await conn.fetchval("select 1 from role_categories where id=$1::uuid", cat)
|
|
if not cok:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role category not found")
|
|
tag_copy = [str(x) for x in (row["role_tags"] or [])]
|
|
os_copy = [str(x) for x in (row["os_families"] or [])]
|
|
new_id = await conn.fetchval(
|
|
"""
|
|
insert into ansible_roles (
|
|
owner_id, name, source_type, source_ref, category_id, content, visibility,
|
|
forked_from_id, team_id, role_tags, os_families
|
|
)
|
|
values ($1::uuid, $2, $3, $4, $5::uuid, $6::jsonb, $7, $8::uuid, null, $9::text[], $10::text[])
|
|
returning id::text
|
|
""",
|
|
user_id,
|
|
unique_name,
|
|
st,
|
|
sr,
|
|
cat,
|
|
json.dumps(content_obj),
|
|
target_visibility,
|
|
source_role_id,
|
|
tag_copy,
|
|
os_copy,
|
|
)
|
|
files = await conn.fetch(
|
|
"select path, content from role_files where role_id=$1::uuid order by path",
|
|
source_role_id,
|
|
)
|
|
if files:
|
|
await conn.executemany(
|
|
"""
|
|
insert into role_files (role_id, path, content)
|
|
values ($1::uuid, $2, $3)
|
|
""",
|
|
[(new_id, str(fr["path"]), str(fr["content"])) for fr in files],
|
|
)
|
|
else:
|
|
await _upsert_role_files(conn, str(new_id), list(DEFAULT_ROLE_SCAFFOLD_FILES))
|
|
return str(new_id)
|
|
|
|
|
|
async def _user_is_admin(conn, user_id: str) -> bool:
|
|
role = await conn.fetchval("select role from users where id=$1::uuid", user_id)
|
|
return str(role or "") in ("admin", "root")
|
|
|
|
|
|
async def _is_active_team_member(conn, team_id: str | None, user_id: str) -> bool:
|
|
if not team_id:
|
|
return False
|
|
ok = await conn.fetchval(
|
|
"""
|
|
select 1 from team_memberships
|
|
where team_id = $1::uuid and user_id = $2::uuid and status = 'active'
|
|
""",
|
|
team_id,
|
|
user_id,
|
|
)
|
|
return ok is not None
|
|
|
|
|
|
async def _resolve_role_for_mutation(conn, role_id: str, user_id: str) -> tuple[str, bool]:
|
|
"""Return (effective_role_id, forked). Fork when editing another user's public role."""
|
|
row = await conn.fetchrow(
|
|
"""
|
|
select owner_id::text, visibility, team_id::text as team_id
|
|
from ansible_roles where id=$1::uuid
|
|
""",
|
|
role_id,
|
|
)
|
|
if not row:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
|
|
if row["owner_id"] == user_id:
|
|
return role_id, False
|
|
if await _user_is_admin(conn, user_id):
|
|
return role_id, False
|
|
vis = str(row["visibility"] or "personal")
|
|
tid = row["team_id"]
|
|
if vis == "team" and tid:
|
|
if await _is_active_team_member(conn, tid, user_id):
|
|
return role_id, False
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Join the team to edit this team role",
|
|
)
|
|
if vis == "personal":
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Cannot edit another user's personal role")
|
|
if vis == "public":
|
|
new_id = await _fork_role_copy(conn, role_id, user_id, "personal")
|
|
return new_id, True
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Role access denied")
|
|
|
|
|
|
async def _clamp_visibility_on_update(conn, role_id: str, user_id: str, requested: str) -> str:
|
|
"""Owner may change personal↔public↔team (subject to team_id validation). Public stays public for non-admins."""
|
|
req = _normalize_role_visibility(requested)
|
|
row = await conn.fetchrow(
|
|
"""
|
|
select owner_id::text, coalesce(visibility, 'personal') as visibility
|
|
from ansible_roles where id=$1::uuid
|
|
""",
|
|
role_id,
|
|
)
|
|
if not row:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
|
|
cur = str(row["visibility"])
|
|
is_owner = str(row["owner_id"]) == user_id
|
|
is_admin = await _user_is_admin(conn, user_id)
|
|
if cur == "public":
|
|
if is_admin:
|
|
return req
|
|
return "public"
|
|
if cur == "personal":
|
|
if req == "public" and (is_owner or is_admin):
|
|
return "public"
|
|
if req == "team" and (is_owner or is_admin):
|
|
return "team"
|
|
return "personal"
|
|
if cur == "team":
|
|
if is_admin or is_owner:
|
|
return req
|
|
return cur
|
|
return _normalize_role_visibility(requested, default="personal")
|
|
|
|
|
|
async def _assert_can_view_role(conn, role_id: str, user_id: str) -> dict[str, str]:
|
|
row = await conn.fetchrow(
|
|
"""
|
|
select
|
|
owner_id::text as owner_id,
|
|
coalesce(visibility, 'public') as visibility,
|
|
team_id::text as team_id
|
|
from ansible_roles where id=$1::uuid
|
|
""",
|
|
role_id,
|
|
)
|
|
if not row:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
|
|
out = {"owner_id": str(row["owner_id"]), "visibility": str(row["visibility"]), "team_id": row["team_id"]}
|
|
if await _user_is_admin(conn, user_id):
|
|
return out
|
|
vis = str(row["visibility"])
|
|
if vis == "public":
|
|
return out
|
|
if row["owner_id"] == user_id:
|
|
return out
|
|
if vis == "team" and row["team_id"] and await _is_active_team_member(conn, str(row["team_id"]), user_id):
|
|
return out
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Role access denied")
|
|
|
|
|
|
def _validate_role_file_path(path: str) -> str:
|
|
normalized = str(path or "").strip().lstrip("/")
|
|
if not normalized or normalized.endswith("/"):
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Role file path is invalid")
|
|
root = normalized.split("/", 1)[0]
|
|
if root not in ALLOWED_ROLE_ROOT_DIRS:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"Role file path must start with one of: {', '.join(sorted(ALLOWED_ROLE_ROOT_DIRS))}",
|
|
)
|
|
return normalized
|
|
|
|
|
|
def _zip_member_to_rel_paths(namelist: list[str]) -> tuple[list[tuple[str, str]], str | None]:
|
|
"""Map zip entry -> relative path under role; strip one shared top-level folder (Galaxy-style layout)."""
|
|
clean: list[str] = []
|
|
for n in namelist:
|
|
n = n.replace("\\", "/").strip()
|
|
if not n or n.endswith("/"):
|
|
continue
|
|
if "__MACOSX" in n or "/.DS_Store" in n or n.endswith(".DS_Store"):
|
|
continue
|
|
clean.append(n)
|
|
if not clean:
|
|
return [], None
|
|
parts = [c.split("/") for c in clean]
|
|
first0 = parts[0][0]
|
|
if len(parts[0]) > 1 and all(len(p) > 1 and p[0] == first0 for p in parts):
|
|
pairs = [(clean[i], "/".join(parts[i][1:])) for i in range(len(clean))]
|
|
return pairs, first0
|
|
return [(c, c) for c in clean], None
|
|
|
|
|
|
def _decode_import_bytes(rel_path: str, raw: bytes) -> tuple[str, str] | None:
|
|
"""Turn zip bytes into stored path + text or ROLEFORGE_B64 body; skip invalid paths."""
|
|
rel_path = rel_path.replace("\\", "/").strip().lstrip("/")
|
|
if not rel_path or rel_path.endswith("/"):
|
|
return None
|
|
try:
|
|
text = raw.decode("utf-8")
|
|
except UnicodeDecodeError:
|
|
candidate = rel_path
|
|
if not candidate.startswith("files/"):
|
|
base = candidate.split("/")[-1]
|
|
safe = re.sub(r"[^a-zA-Z0-9_.-]+", "_", base).strip("._-")[:200] or "file"
|
|
candidate = f"files/{safe}"
|
|
try:
|
|
norm = _validate_role_file_path(candidate)
|
|
except HTTPException:
|
|
return None
|
|
return norm, ROLEFORGE_B64_PREFIX + base64.standard_b64encode(raw).decode("ascii")
|
|
try:
|
|
norm = _validate_role_file_path(rel_path)
|
|
except HTTPException:
|
|
return None
|
|
return norm, text
|
|
|
|
|
|
def _parse_role_zip_bytes(raw: bytes, filename_hint: str) -> tuple[str, list[tuple[str, str]]]:
|
|
try:
|
|
zf = zipfile.ZipFile(io.BytesIO(raw))
|
|
except zipfile.BadZipFile as exc:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid ZIP file") from exc
|
|
pairs, root_folder = _zip_member_to_rel_paths(zf.namelist())
|
|
stem = Path(filename_hint or "imported.zip").stem
|
|
suggested = _safe_role_archive_dir_name(root_folder or stem)
|
|
merged: dict[str, str] = {}
|
|
for member, rel in pairs:
|
|
try:
|
|
data = zf.read(member)
|
|
except (KeyError, RuntimeError):
|
|
continue
|
|
got = _decode_import_bytes(rel, data)
|
|
if not got:
|
|
continue
|
|
path_s, body_s = got
|
|
merged[path_s] = body_s
|
|
if not merged:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="No allowed role files found in archive (use paths under tasks/, handlers/, templates/, files/, …).",
|
|
)
|
|
items = sorted(merged.items(), key=lambda x: x[0])
|
|
return suggested, items
|
|
|
|
|
|
def _role_editor_lint_kind(path: str) -> Literal["yaml", "json"]:
|
|
lower = str(path or "").strip().lstrip("/").lower()
|
|
base = lower.split("/")[-1] if lower else ""
|
|
if base.endswith((".json", ".tfstate", ".ndjson")):
|
|
return "json"
|
|
return "yaml"
|
|
|
|
|
|
def _lint_yaml_role_file(content: str, pseudo_path: str, cfg: YamlLintConfig) -> list[LintRoleFileError]:
|
|
out: list[LintRoleFileError] = []
|
|
for problem in yaml_linter.run(content, cfg, pseudo_path):
|
|
lvl = str(getattr(problem, "level", "") or "error").lower()
|
|
rule = getattr(problem, "rule", None)
|
|
desc = getattr(problem, "desc", "") or ""
|
|
prefix = f"{rule}: " if rule else ""
|
|
out.append(
|
|
LintRoleFileError(
|
|
line=getattr(problem, "line", None),
|
|
column=getattr(problem, "column", None),
|
|
level=lvl,
|
|
message=f"{prefix}{desc}".strip(),
|
|
)
|
|
)
|
|
return out
|
|
|
|
|
|
async def _upsert_role_files(conn, role_id: str, files: list[tuple[str, str]]) -> None:
|
|
if not files:
|
|
return
|
|
await conn.executemany(
|
|
"""
|
|
insert into role_files (role_id, path, content)
|
|
values ($1::uuid, $2, $3)
|
|
on conflict (role_id, path)
|
|
do update set content=excluded.content, updated_at=now()
|
|
""",
|
|
[(role_id, path, content) for path, content in files],
|
|
)
|
|
|
|
|
|
async def _ensure_default_role_categories(conn) -> None:
|
|
for name in DEFAULT_ROLE_CATEGORIES:
|
|
await conn.execute(
|
|
"""
|
|
insert into role_categories (owner_id, name)
|
|
select null, $1
|
|
where not exists (
|
|
select 1 from role_categories where lower(name) = lower($1)
|
|
)
|
|
""",
|
|
name,
|
|
)
|
|
|
|
|
|
async def _prune_legacy_empty_tests_scaffold_for_user(conn, user_id: str) -> None:
|
|
"""Remove old default tests/test.yml (empty) left from earlier RoleForge scaffolds."""
|
|
await conn.execute(
|
|
"""
|
|
delete from role_files rf
|
|
using ansible_roles r
|
|
where rf.role_id = r.id
|
|
and r.owner_id = $1::uuid
|
|
and rf.path = 'tests/test.yml'
|
|
and btrim(coalesce(rf.content, '')) = ''
|
|
""",
|
|
user_id,
|
|
)
|
|
|
|
|
|
async def _backfill_role_files_for_user(conn, user_id: str) -> None:
|
|
rows = await conn.fetch(
|
|
"""
|
|
select r.id::text as id, r.content
|
|
from ansible_roles r
|
|
where r.owner_id=$1::uuid
|
|
""",
|
|
user_id,
|
|
)
|
|
for row in rows:
|
|
role_id = str(row["id"])
|
|
file_count = await conn.fetchval(
|
|
"select count(*)::int from role_files where role_id=$1::uuid",
|
|
role_id,
|
|
)
|
|
if int(file_count or 0) > 0:
|
|
continue
|
|
content = _parse_json_object(row["content"])
|
|
tasks_yaml = str(content.get("tasks_yaml") or "")
|
|
files = [*DEFAULT_ROLE_SCAFFOLD_FILES]
|
|
if tasks_yaml.strip():
|
|
files = [*files, ("tasks/main.yml", tasks_yaml)]
|
|
await _upsert_role_files(conn, role_id, files)
|
|
await _prune_legacy_empty_tests_scaffold_for_user(conn, user_id)
|
|
|
|
|
|
async def _monitor_job(
|
|
job_id: str, runner_url: str, runner_run_id: str, runner_name: str, runtime_mode: str
|
|
) -> None:
|
|
pool = get_pool()
|
|
settings = get_settings()
|
|
started = time.monotonic()
|
|
poll_errors = 0
|
|
offset = 0
|
|
final_status = "failed"
|
|
try:
|
|
while True:
|
|
if time.monotonic() - started > settings.app_runner_timeout_sec:
|
|
final_status = "failed"
|
|
break
|
|
try:
|
|
snapshot = await get_run_status(runner_url, runner_run_id, offset)
|
|
poll_errors = 0
|
|
except Exception: # noqa: BLE001
|
|
poll_errors += 1
|
|
if poll_errors >= settings.app_runner_max_poll_errors:
|
|
final_status = "failed"
|
|
break
|
|
await asyncio.sleep(settings.app_runner_poll_interval_sec)
|
|
continue
|
|
|
|
lines = snapshot.get("lines", [])
|
|
offset = int(snapshot.get("next_offset", offset))
|
|
async with pool.acquire() as conn:
|
|
await conn.execute(
|
|
"update jobs set runner_last_heartbeat=now() where id=$1::uuid",
|
|
job_id,
|
|
)
|
|
if lines:
|
|
await conn.executemany(
|
|
"insert into job_logs (job_id, log_line) values ($1::uuid, $2)",
|
|
[(job_id, line) for line in lines],
|
|
)
|
|
status_name = str(snapshot.get("status", "running"))
|
|
if status_name in {"success", "failed"}:
|
|
final_status = status_name
|
|
break
|
|
await asyncio.sleep(settings.app_runner_poll_interval_sec)
|
|
finally:
|
|
if AUTO_TERMINATE_TEST_RUNNER:
|
|
await terminate_ephemeral_worker(runner_name, runtime_mode)
|
|
async with pool.acquire() as conn:
|
|
await conn.execute(
|
|
"update jobs set status=$1, finished_at=now() where id=$2::uuid",
|
|
final_status,
|
|
job_id,
|
|
)
|
|
|
|
|
|
async def _monitor_test(
|
|
test_id: str, runner_url: str, runner_run_id: str, runner_name: str, runtime_mode: str
|
|
) -> None:
|
|
pool = get_pool()
|
|
settings = get_settings()
|
|
started = time.monotonic()
|
|
poll_errors = 0
|
|
offset = 0
|
|
final_status = "failed"
|
|
try:
|
|
while True:
|
|
if time.monotonic() - started > settings.app_runner_timeout_sec:
|
|
final_status = "failed"
|
|
break
|
|
try:
|
|
snapshot = await get_run_status(runner_url, runner_run_id, offset)
|
|
poll_errors = 0
|
|
except Exception: # noqa: BLE001
|
|
poll_errors += 1
|
|
if poll_errors >= settings.app_runner_max_poll_errors:
|
|
final_status = "failed"
|
|
break
|
|
await asyncio.sleep(settings.app_runner_poll_interval_sec)
|
|
continue
|
|
lines = snapshot.get("lines", [])
|
|
offset = int(snapshot.get("next_offset", offset))
|
|
async with pool.acquire() as conn:
|
|
await conn.execute(
|
|
"update test_runs set runner_last_heartbeat=now() where id=$1::uuid",
|
|
test_id,
|
|
)
|
|
if lines:
|
|
await conn.executemany(
|
|
"insert into test_logs (test_run_id, log_line) values ($1::uuid, $2)",
|
|
[(test_id, line) for line in lines],
|
|
)
|
|
status_name = str(snapshot.get("status", "running"))
|
|
if status_name in {"success", "failed"}:
|
|
final_status = status_name
|
|
break
|
|
await asyncio.sleep(settings.app_runner_poll_interval_sec)
|
|
finally:
|
|
await terminate_ephemeral_worker(runner_name, runtime_mode)
|
|
async with pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
update test_runs
|
|
set status=$1, finished_at=now()
|
|
where id=$2::uuid
|
|
and status not in ('success', 'failed')
|
|
""",
|
|
final_status,
|
|
test_id,
|
|
)
|
|
|
|
|
|
@router.post("/roles/lint-file", response_model=LintRoleFileResponse)
|
|
async def lint_role_file(
|
|
payload: LintRoleFileRequest,
|
|
user_id: str = Depends(get_current_user_id),
|
|
) -> LintRoleFileResponse:
|
|
"""Validate editor buffer: YAML via yamllint (DB `yamllint`); JSON via project JSONLint rules (DB `jsonlint`)."""
|
|
_ = user_id
|
|
_validate_role_file_path(payload.path)
|
|
kind = _role_editor_lint_kind(payload.path)
|
|
if kind == "json":
|
|
merged = await get_jsonlint_config_for_lint()
|
|
errors = run_json_lint(payload.content, merged)
|
|
blocking = [e for e in errors if (e.level or "").lower() == "error"]
|
|
return LintRoleFileResponse(ok=len(blocking) == 0, kind="json", errors=errors)
|
|
cfg = await get_yamllint_config_for_lint()
|
|
errors = _lint_yaml_role_file(payload.content, payload.path, cfg)
|
|
blocking = [e for e in errors if (e.level or "").lower() == "error"]
|
|
return LintRoleFileResponse(ok=len(blocking) == 0, kind="yaml", errors=errors)
|
|
|
|
|
|
@router.post("/roles/import-archive", response_model=RoleImportResult)
|
|
async def import_role_archive(
|
|
file: UploadFile = File(...),
|
|
user_id: str = Depends(get_current_user_id),
|
|
) -> RoleImportResult:
|
|
"""Parse an Ansible role ZIP (same layout as export); returns files for the create-role UI."""
|
|
_ = user_id
|
|
raw = await file.read()
|
|
max_bytes = 15 * 1024 * 1024
|
|
if len(raw) > max_bytes:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
|
|
detail="ZIP is too large (max 15 MB).",
|
|
)
|
|
suggested, tuples = _parse_role_zip_bytes(raw, file.filename or "imported.zip")
|
|
return RoleImportResult(
|
|
suggested_name=suggested,
|
|
items=[RoleImportItem(path=p, content=c) for p, c in tuples],
|
|
)
|
|
|
|
|
|
@router.post("/roles")
|
|
async def create_role(payload: RoleCreate, user_id: str = Depends(get_current_user_id)) -> dict[str, str]:
|
|
name = _sanitize_ansible_role_name(payload.name)
|
|
if not name:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Role name is required")
|
|
_ = _normalize_role_visibility(payload.visibility)
|
|
visibility = "personal"
|
|
team_id_insert: str | None = None
|
|
if payload.team_id:
|
|
tid = str(payload.team_id).strip()
|
|
try:
|
|
UUID(tid)
|
|
except ValueError as exc:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid team_id") from exc
|
|
visibility = "team"
|
|
team_id_insert = tid
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
if team_id_insert:
|
|
team_ok = await conn.fetchval("select 1 from teams where id=$1::uuid", team_id_insert)
|
|
if not team_ok:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found")
|
|
if not await _is_active_team_member(conn, team_id_insert, user_id):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Must be an active team member to create a team role",
|
|
)
|
|
if payload.category_id:
|
|
exists = await conn.fetchval(
|
|
"select 1 from role_categories where id=$1::uuid",
|
|
payload.category_id,
|
|
)
|
|
if not exists:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role category not found")
|
|
tags_norm = _normalize_role_tags(payload.tags)
|
|
os_norm = _normalize_os_families(payload.os_families)
|
|
role_id = await conn.fetchval(
|
|
"""
|
|
insert into ansible_roles (
|
|
owner_id, name, source_type, source_ref, category_id, content, visibility, team_id,
|
|
role_tags, os_families
|
|
)
|
|
values ($1, $2, $3, $4, $5::uuid, $6::jsonb, $7, $8::uuid, $9::text[], $10::text[])
|
|
returning id::text
|
|
""",
|
|
user_id,
|
|
name,
|
|
payload.source_type,
|
|
payload.source_ref,
|
|
payload.category_id,
|
|
json.dumps({"description": payload.description, **(payload.content or {})}),
|
|
visibility,
|
|
team_id_insert,
|
|
tags_norm,
|
|
os_norm,
|
|
)
|
|
validated_files = [(_validate_role_file_path(item.path), item.content) for item in payload.files]
|
|
await _upsert_role_files(
|
|
conn,
|
|
str(role_id),
|
|
[*DEFAULT_ROLE_SCAFFOLD_FILES, *validated_files],
|
|
)
|
|
return {"id": role_id, "name": name}
|
|
|
|
|
|
@router.post("/roles/import-yaml")
|
|
async def import_role_yaml(
|
|
payload: RoleYamlImportRequest, user_id: str = Depends(get_current_user_id)
|
|
) -> dict[str, str]:
|
|
try:
|
|
parsed = yaml.safe_load(payload.tasks_yaml)
|
|
except yaml.YAMLError as exc:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid YAML") from exc
|
|
if not isinstance(parsed, list):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Role tasks YAML must be a list of Ansible tasks",
|
|
)
|
|
name = _sanitize_ansible_role_name(payload.name)
|
|
if not name:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Role name is required")
|
|
_ = _normalize_role_visibility(payload.visibility)
|
|
visibility = "personal"
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
if payload.category_id:
|
|
exists = await conn.fetchval(
|
|
"select 1 from role_categories where id=$1::uuid",
|
|
payload.category_id,
|
|
)
|
|
if not exists:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role category not found")
|
|
tags_norm = _normalize_role_tags(payload.tags)
|
|
os_norm = _normalize_os_families(payload.os_families)
|
|
role_id = await conn.fetchval(
|
|
"""
|
|
insert into ansible_roles (
|
|
owner_id, name, source_type, source_ref, category_id, content, visibility,
|
|
role_tags, os_families
|
|
)
|
|
values ($1::uuid, $2, 'inline', $3, $4::uuid, $5::jsonb, $6, $7::text[], $8::text[])
|
|
returning id::text
|
|
""",
|
|
user_id,
|
|
name,
|
|
payload.source_ref,
|
|
payload.category_id,
|
|
json.dumps({"description": payload.description, "tasks_yaml": payload.tasks_yaml}),
|
|
visibility,
|
|
tags_norm,
|
|
os_norm,
|
|
)
|
|
validated_files = [(_validate_role_file_path(item.path), item.content) for item in payload.files]
|
|
await _upsert_role_files(
|
|
conn,
|
|
str(role_id),
|
|
[
|
|
*DEFAULT_ROLE_SCAFFOLD_FILES,
|
|
("tasks/main.yml", payload.tasks_yaml),
|
|
*validated_files,
|
|
],
|
|
)
|
|
return {"id": role_id, "name": name}
|
|
|
|
|
|
@router.get("/roles/{role_id}/export-yaml")
|
|
async def export_role_yaml(role_id: str, user_id: str = Depends(get_current_user_id)) -> Response:
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
await _assert_can_view_role(conn, role_id, user_id)
|
|
row = await conn.fetchrow(
|
|
"""
|
|
select content
|
|
from ansible_roles
|
|
where id=$1::uuid
|
|
""",
|
|
role_id,
|
|
)
|
|
if not row:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
|
|
content = _parse_json_object(row["content"])
|
|
tasks_yaml = content.get("tasks_yaml")
|
|
if not tasks_yaml:
|
|
tasks_yaml = yaml.safe_dump(content, sort_keys=False, allow_unicode=False)
|
|
return Response(content=tasks_yaml, media_type="application/x-yaml")
|
|
|
|
|
|
@router.get("/roles/{role_id}/export-archive")
|
|
async def export_role_archive(role_id: str, user_id: str = Depends(get_current_user_id)) -> Response:
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
await _assert_can_view_role(conn, role_id, user_id)
|
|
row = await conn.fetchrow(
|
|
"""
|
|
select r.name, r.content
|
|
from ansible_roles r
|
|
where r.id=$1::uuid
|
|
""",
|
|
role_id,
|
|
)
|
|
if not row:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
|
|
content = _parse_json_object(row["content"])
|
|
file_rows = await conn.fetch(
|
|
"select path, content from role_files where role_id=$1::uuid order by path",
|
|
role_id,
|
|
)
|
|
if file_rows:
|
|
paths_body: dict[str, str] = {}
|
|
for fr in file_rows:
|
|
paths_body[str(fr["path"])] = str(fr["content"])
|
|
files = sorted(paths_body.items(), key=lambda x: x[0])
|
|
else:
|
|
tasks_yaml = str(content.get("tasks_yaml") or "")
|
|
merged: dict[str, str] = {path: body for path, body in DEFAULT_ROLE_SCAFFOLD_FILES}
|
|
if tasks_yaml.strip():
|
|
merged["tasks/main.yml"] = tasks_yaml
|
|
files = sorted(merged.items(), key=lambda x: x[0])
|
|
|
|
root = _safe_role_archive_dir_name(row["name"])
|
|
buf = io.BytesIO()
|
|
with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as zf:
|
|
for path, body in files:
|
|
arcname = f"{root}/{path}"
|
|
raw = _role_file_body_zip_bytes(path, body) if isinstance(body, str) else bytes(body)
|
|
zf.writestr(arcname, raw)
|
|
|
|
zip_bytes = buf.getvalue()
|
|
zip_filename = f"{root}.zip"
|
|
ascii_fallback = zip_filename.encode("ascii", "replace").decode("ascii")
|
|
disp = (
|
|
f'attachment; filename="{ascii_fallback}"; '
|
|
f"filename*=UTF-8''{quote(zip_filename)}"
|
|
)
|
|
return Response(
|
|
content=zip_bytes,
|
|
media_type="application/zip",
|
|
headers={"Content-Disposition": disp},
|
|
)
|
|
|
|
|
|
@router.get("/roles/{role_id}/files")
|
|
async def list_role_files(role_id: str, user_id: str = Depends(get_current_user_id)) -> dict[str, list[dict[str, str]]]:
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
await _assert_can_view_role(conn, role_id, user_id)
|
|
rows = await conn.fetch(
|
|
"select path, content from role_files where role_id=$1::uuid order by path",
|
|
role_id,
|
|
)
|
|
return {"items": [{"path": row["path"], "content": row["content"]} for row in rows]}
|
|
|
|
|
|
@router.get("/roles/details/{role_id}")
|
|
async def get_role_details(role_id: str, user_id: str = Depends(get_current_user_id)) -> dict[str, Any]:
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
await _assert_can_view_role(conn, role_id, user_id)
|
|
row = await conn.fetchrow(
|
|
"""
|
|
select
|
|
r.id::text as id,
|
|
r.owner_id::text as owner_id,
|
|
r.name,
|
|
r.source_type,
|
|
r.source_ref,
|
|
r.category_id::text as category_id,
|
|
r.created_at,
|
|
coalesce(r.visibility, 'personal') as visibility,
|
|
r.forked_from_id::text as forked_from_id,
|
|
r.team_id::text as team_id,
|
|
t.name as team_name,
|
|
c.name as category_name,
|
|
r.content,
|
|
coalesce(r.role_tags, array[]::text[]) as role_tags,
|
|
coalesce(r.os_families, array[]::text[]) as os_families
|
|
from ansible_roles r
|
|
left join role_categories c on c.id = r.category_id
|
|
left join teams t on t.id = r.team_id
|
|
where r.id=$1::uuid
|
|
""",
|
|
role_id,
|
|
)
|
|
if not row:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
|
|
is_admin = await _user_is_admin(conn, user_id)
|
|
content = _parse_json_object(row["content"])
|
|
is_owner = row["owner_id"] == user_id
|
|
vis = str(row["visibility"])
|
|
editable = bool(is_owner or is_admin)
|
|
deletable = (vis == "personal" and (is_owner or is_admin)) or (vis == "public" and is_admin)
|
|
return {
|
|
"id": row["id"],
|
|
"name": row["name"],
|
|
"source_type": row["source_type"],
|
|
"source_ref": row["source_ref"],
|
|
"category_id": row["category_id"],
|
|
"category_name": row["category_name"] or "Uncategorized",
|
|
"description": str(content.get("description") or ""),
|
|
"created_at": row["created_at"].isoformat() if row["created_at"] else "",
|
|
"visibility": vis,
|
|
"forked_from_id": row["forked_from_id"] or "",
|
|
"team_id": row["team_id"] or "",
|
|
"team_name": str(row["team_name"] or "") if row["team_id"] else "",
|
|
"tags": [str(t) for t in (row["role_tags"] or [])],
|
|
"os_families": [str(t) for t in (row["os_families"] or [])],
|
|
"is_owner": is_owner,
|
|
"is_admin": is_admin,
|
|
"editable": editable,
|
|
"deletable": deletable,
|
|
}
|
|
|
|
|
|
@router.post("/roles/{role_id}/fork")
|
|
async def fork_role(
|
|
role_id: str, payload: RoleForkRequest, user_id: str = Depends(get_current_user_id)
|
|
) -> dict[str, str]:
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
await _assert_can_view_role(conn, role_id, user_id)
|
|
new_id = await _fork_role_copy(
|
|
conn,
|
|
role_id,
|
|
user_id,
|
|
"personal",
|
|
target_name=payload.name,
|
|
source_type=payload.source_type,
|
|
source_ref=payload.source_ref,
|
|
category_id=payload.category_id,
|
|
description=payload.description,
|
|
)
|
|
row = await conn.fetchrow("select name from ansible_roles where id=$1::uuid", new_id)
|
|
return {"id": str(new_id), "name": str(row["name"]) if row else payload.name}
|
|
|
|
|
|
@router.put("/roles/details/{role_id}")
|
|
async def update_role_details(
|
|
role_id: str,
|
|
payload: RoleUpdate,
|
|
user_id: str = Depends(get_current_user_id),
|
|
) -> dict[str, Any]:
|
|
name = _sanitize_ansible_role_name(payload.name)
|
|
if not name:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Role name is required")
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
effective_id, forked = await _resolve_role_for_mutation(conn, role_id, user_id)
|
|
vis_payload = await _clamp_visibility_on_update(conn, effective_id, user_id, payload.visibility)
|
|
if payload.category_id:
|
|
exists = await conn.fetchval("select 1 from role_categories where id=$1::uuid", payload.category_id)
|
|
if not exists:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role category not found")
|
|
tags_norm = _normalize_role_tags(payload.tags)
|
|
os_norm = _normalize_os_families(payload.os_families)
|
|
|
|
tid_final: str | None = None
|
|
team_display_name = ""
|
|
if vis_payload == "team":
|
|
raw_new = str(payload.team_id or "").strip()
|
|
prev_row = await conn.fetchrow(
|
|
"select team_id::text as team_id from ansible_roles where id=$1::uuid",
|
|
effective_id,
|
|
)
|
|
prev_tid = str(prev_row["team_id"] or "").strip() if prev_row else ""
|
|
tid_use = raw_new or prev_tid
|
|
if not tid_use:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="team_id is required for team visibility",
|
|
)
|
|
try:
|
|
UUID(tid_use)
|
|
except ValueError as exc:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid team_id") from exc
|
|
team_exists = await conn.fetchval("select 1 from teams where id=$1::uuid", tid_use)
|
|
if not team_exists:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Team not found")
|
|
is_member = await _is_active_team_member(conn, tid_use, user_id)
|
|
if not is_member and not await _user_is_admin(conn, user_id):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Must be an active member of the selected team",
|
|
)
|
|
tid_final = tid_use
|
|
team_display_name = str(
|
|
await conn.fetchval("select name from teams where id=$1::uuid", tid_final) or ""
|
|
)
|
|
await conn.execute(
|
|
"""
|
|
update ansible_roles
|
|
set
|
|
name=$1,
|
|
source_type=$2,
|
|
source_ref=$3,
|
|
category_id=$4::uuid,
|
|
content=$5::jsonb,
|
|
visibility=$6,
|
|
role_tags=$7::text[],
|
|
os_families=$8::text[],
|
|
team_id=$9::uuid
|
|
where id=$10::uuid
|
|
""",
|
|
name,
|
|
payload.source_type,
|
|
payload.source_ref,
|
|
payload.category_id,
|
|
json.dumps({"description": payload.description}),
|
|
vis_payload,
|
|
tags_norm,
|
|
os_norm,
|
|
tid_final,
|
|
effective_id,
|
|
)
|
|
return {
|
|
"status": "updated",
|
|
"id": effective_id,
|
|
"name": name,
|
|
"forked": forked,
|
|
"visibility": vis_payload,
|
|
"team_id": tid_final or "",
|
|
"team_name": team_display_name if vis_payload == "team" else "",
|
|
}
|
|
|
|
|
|
@router.put("/roles/{role_id}/files")
|
|
async def replace_role_files(
|
|
role_id: str,
|
|
payload: RoleFilesReplaceRequest,
|
|
user_id: str = Depends(get_current_user_id),
|
|
) -> dict[str, Any]:
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
effective_id, forked = await _resolve_role_for_mutation(conn, role_id, user_id)
|
|
normalized: list[tuple[str, str]] = []
|
|
for item in payload.files:
|
|
path = _validate_role_file_path(item.path)
|
|
normalized.append((path, item.content))
|
|
await conn.execute("delete from role_files where role_id=$1::uuid", effective_id)
|
|
await _upsert_role_files(conn, effective_id, normalized if normalized else list(DEFAULT_ROLE_SCAFFOLD_FILES))
|
|
return {"status": "updated", "role_id": effective_id, "forked": forked}
|
|
|
|
|
|
@router.delete("/roles/{role_id}")
|
|
async def delete_role(role_id: str, user_id: str = Depends(get_current_user_id)) -> dict[str, str]:
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow(
|
|
"""
|
|
select
|
|
owner_id::text,
|
|
coalesce(visibility, 'personal') as visibility,
|
|
team_id::text as team_id
|
|
from ansible_roles where id=$1::uuid
|
|
""",
|
|
role_id,
|
|
)
|
|
if not row:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
|
|
is_admin = await _user_is_admin(conn, user_id)
|
|
vis = str(row["visibility"])
|
|
owner_id = str(row["owner_id"])
|
|
if vis == "public":
|
|
if not is_admin:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Only an administrator can delete a public role",
|
|
)
|
|
elif vis == "team" and row["team_id"]:
|
|
if not (
|
|
is_admin
|
|
or owner_id == user_id
|
|
or await _is_active_team_member(conn, str(row["team_id"]), user_id)
|
|
):
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Role delete denied")
|
|
elif owner_id != user_id and not is_admin:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Role delete denied")
|
|
await conn.execute(
|
|
"update playbooks set role_ids = array_remove(role_ids, $1::uuid) where $1::uuid = any(role_ids)",
|
|
role_id,
|
|
)
|
|
await conn.execute("delete from ansible_roles where id=$1::uuid", role_id)
|
|
return {"status": "deleted", "id": role_id}
|
|
|
|
|
|
@router.get("/roles")
|
|
async def list_roles(
|
|
user_id: str = Depends(get_current_user_id),
|
|
team_id: str | None = Query(None, description="List Ansible roles for this team (members only)."),
|
|
) -> dict[str, list[dict]]:
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
await _ensure_default_role_categories(conn)
|
|
await _backfill_role_files_for_user(conn, user_id)
|
|
if team_id:
|
|
if not await _is_active_team_member(conn, team_id, user_id):
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not a member of this team")
|
|
rows = await conn.fetch(
|
|
"""
|
|
select
|
|
r.id::text as id,
|
|
r.name,
|
|
r.source_type,
|
|
r.source_ref,
|
|
r.created_at,
|
|
r.category_id::text as category_id,
|
|
c.name as category_name,
|
|
r.content,
|
|
coalesce(r.visibility, 'personal') as visibility,
|
|
r.owner_id::text as owner_id,
|
|
r.team_id::text as team_id,
|
|
coalesce(fc.cnt, 0)::int as file_count,
|
|
coalesce(r.role_tags, array[]::text[]) as role_tags,
|
|
coalesce(r.os_families, array[]::text[]) as os_families
|
|
from ansible_roles r
|
|
left join role_categories c on c.id = r.category_id
|
|
left join (
|
|
select role_id, count(*)::int as cnt
|
|
from role_files
|
|
group by role_id
|
|
) fc on fc.role_id = r.id
|
|
where r.team_id = $1::uuid and coalesce(r.visibility, 'personal') = 'team'
|
|
order by coalesce(c.name, 'Uncategorized'), r.name
|
|
""",
|
|
team_id,
|
|
)
|
|
else:
|
|
rows = await conn.fetch(
|
|
"""
|
|
select
|
|
r.id::text as id,
|
|
r.name,
|
|
r.source_type,
|
|
r.source_ref,
|
|
r.created_at,
|
|
r.category_id::text as category_id,
|
|
c.name as category_name,
|
|
r.content,
|
|
coalesce(r.visibility, 'personal') as visibility,
|
|
r.owner_id::text as owner_id,
|
|
r.team_id::text as team_id,
|
|
coalesce(fc.cnt, 0)::int as file_count,
|
|
coalesce(r.role_tags, array[]::text[]) as role_tags,
|
|
coalesce(r.os_families, array[]::text[]) as os_families
|
|
from ansible_roles r
|
|
left join role_categories c on c.id = r.category_id
|
|
left join (
|
|
select role_id, count(*)::int as cnt
|
|
from role_files
|
|
group by role_id
|
|
) fc on fc.role_id = r.id
|
|
where coalesce(r.visibility, 'personal') = 'public' or r.owner_id = $1::uuid
|
|
order by coalesce(c.name, 'Uncategorized'), r.name
|
|
""",
|
|
user_id,
|
|
)
|
|
items = [
|
|
{
|
|
"id": row["id"],
|
|
"name": row["name"],
|
|
"source_type": row["source_type"],
|
|
"source_ref": row["source_ref"],
|
|
"created_at": row["created_at"].isoformat() if row["created_at"] else "",
|
|
"category_id": row["category_id"],
|
|
"category_name": row["category_name"] or "Uncategorized",
|
|
"description": str(_parse_json_object(row["content"]).get("description") or ""),
|
|
"file_count": int(row["file_count"] or 0),
|
|
"visibility": str(row["visibility"]),
|
|
"is_owner": row["owner_id"] == user_id,
|
|
"tags": [str(t) for t in (row["role_tags"] or [])],
|
|
"os_families": [str(t) for t in (row["os_families"] or [])],
|
|
}
|
|
for row in rows
|
|
]
|
|
return {"items": items}
|
|
|
|
|
|
@router.get("/playbooks")
|
|
async def list_playbooks(user_id: str = Depends(get_current_user_id)) -> dict[str, list[dict]]:
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
rows = await conn.fetch(
|
|
"""
|
|
select id::text as id, name, description, owner_id::text as owner_id
|
|
from playbooks
|
|
where owner_id=$1::uuid
|
|
order by name
|
|
""",
|
|
user_id,
|
|
)
|
|
return {
|
|
"items": [
|
|
{
|
|
"id": row["id"],
|
|
"name": row["name"],
|
|
"description": row["description"],
|
|
"owner_id": row["owner_id"],
|
|
}
|
|
for row in rows
|
|
]
|
|
}
|
|
|
|
|
|
@router.post("/playbooks/{playbook_id}/roles")
|
|
async def add_role_to_playbook(
|
|
playbook_id: str,
|
|
payload: AddRoleToPlaybookRequest,
|
|
user_id: str = Depends(get_current_user_id),
|
|
) -> dict[str, str]:
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
owner_id = await conn.fetchval(
|
|
"select owner_id::text from playbooks where id=$1::uuid",
|
|
playbook_id,
|
|
)
|
|
if not owner_id:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Playbook not found")
|
|
if owner_id != user_id:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Playbook access denied")
|
|
role_row = await conn.fetchrow(
|
|
"select owner_id::text, coalesce(visibility, 'public') as visibility from ansible_roles where id=$1::uuid",
|
|
payload.role_id,
|
|
)
|
|
if not role_row:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
|
|
if role_row["owner_id"] != user_id and role_row["visibility"] != "public":
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Role access denied")
|
|
await conn.execute(
|
|
"""
|
|
update playbooks
|
|
set role_ids = (
|
|
select array_agg(distinct role_id)
|
|
from unnest(role_ids || $1::uuid) as role_id
|
|
)
|
|
where id=$2::uuid
|
|
""",
|
|
payload.role_id,
|
|
playbook_id,
|
|
)
|
|
return {"status": "added", "playbook_id": playbook_id, "role_id": payload.role_id}
|
|
|
|
|
|
@router.get("/roles/categories")
|
|
async def list_role_categories(user_id: str = Depends(get_current_user_id)) -> dict[str, list[dict]]:
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
await _ensure_default_role_categories(conn)
|
|
rows = await conn.fetch(
|
|
"""
|
|
select
|
|
c.id::text as id,
|
|
c.name,
|
|
c.created_at,
|
|
count(r.id)::int as role_count
|
|
from role_categories c
|
|
left join ansible_roles r on r.category_id = c.id
|
|
group by c.id
|
|
order by c.name
|
|
"""
|
|
)
|
|
return {
|
|
"items": [
|
|
{
|
|
"id": row["id"],
|
|
"name": row["name"],
|
|
"role_count": row["role_count"],
|
|
"created_at": row["created_at"].isoformat() if row["created_at"] else "",
|
|
}
|
|
for row in rows
|
|
]
|
|
}
|
|
|
|
|
|
@router.post("/roles/categories")
|
|
async def create_role_category(
|
|
payload: RoleCategoryCreate,
|
|
_: str = Depends(get_current_admin_user_id),
|
|
) -> dict[str, str]:
|
|
category_name = payload.name.strip()
|
|
if not category_name:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Category name is required")
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
existing = await conn.fetchval(
|
|
"select id::text from role_categories where lower(name)=lower($1)",
|
|
category_name,
|
|
)
|
|
if existing:
|
|
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Category already exists")
|
|
category_id = await conn.fetchval(
|
|
"""
|
|
insert into role_categories (owner_id, name)
|
|
values (null, $1)
|
|
returning id::text
|
|
""",
|
|
category_name,
|
|
)
|
|
return {"id": category_id}
|
|
|
|
|
|
@router.delete("/roles/categories/{category_id}")
|
|
async def delete_role_category(category_id: str, _: str = Depends(get_current_admin_user_id)) -> dict[str, str]:
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
exists = await conn.fetchval("select 1 from role_categories where id=$1::uuid", category_id)
|
|
if not exists:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Category not found")
|
|
await conn.execute("update ansible_roles set category_id=null where category_id=$1::uuid", category_id)
|
|
await conn.execute("delete from role_categories where id=$1::uuid", category_id)
|
|
return {"status": "deleted", "id": category_id}
|
|
|
|
|
|
@router.patch("/roles/categories/{category_id}")
|
|
async def update_role_category(
|
|
category_id: str,
|
|
payload: RoleCategoryCreate,
|
|
_: str = Depends(get_current_admin_user_id),
|
|
) -> dict[str, str]:
|
|
category_name = payload.name.strip()
|
|
if not category_name:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Category name is required")
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
exists = await conn.fetchval("select 1 from role_categories where id=$1::uuid", category_id)
|
|
if not exists:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Category not found")
|
|
duplicate = await conn.fetchval(
|
|
"select id::text from role_categories where lower(name)=lower($1) and id<>$2::uuid",
|
|
category_name,
|
|
category_id,
|
|
)
|
|
if duplicate:
|
|
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Category already exists")
|
|
await conn.execute("update role_categories set name=$1 where id=$2::uuid", category_name, category_id)
|
|
return {"status": "updated", "id": category_id}
|
|
|
|
|
|
@router.get("/admin/config")
|
|
async def get_admin_config(_: str = Depends(get_current_admin_user_id)) -> dict[str, dict]:
|
|
settings = get_settings()
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow("select value from app_config where key='project'")
|
|
saved = _app_config_row_value_dict(row)
|
|
saved_proj = {k: v for k, v in saved.items() if k in _PROJECT_ADMIN_KEYS}
|
|
defaults = {
|
|
"app_name": settings.app_name,
|
|
"app_tagline": settings.app_tagline,
|
|
"runner_image": settings.app_runner_image,
|
|
"docker_network": settings.app_docker_network,
|
|
"runner_timeout_sec": settings.app_runner_timeout_sec,
|
|
"runner_poll_interval_sec": settings.app_runner_poll_interval_sec,
|
|
"runner_max_poll_errors": settings.app_runner_max_poll_errors,
|
|
}
|
|
return {"items": {**defaults, **saved_proj}}
|
|
|
|
|
|
@router.put("/admin/config")
|
|
async def update_admin_config(payload: dict, _: str = Depends(get_current_admin_user_id)) -> dict[str, str]:
|
|
if not isinstance(payload, dict):
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Config payload must be an object")
|
|
merged: dict[str, Any] = {}
|
|
for key in _PROJECT_ADMIN_KEYS:
|
|
if key not in payload:
|
|
continue
|
|
val = payload[key]
|
|
if key in ("runner_timeout_sec", "runner_poll_interval_sec", "runner_max_poll_errors"):
|
|
try:
|
|
merged[key] = int(val)
|
|
except (TypeError, ValueError):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail=f"{key} must be an integer",
|
|
) from None
|
|
elif key in ("app_name", "app_tagline", "runner_image", "docker_network"):
|
|
merged[key] = str(val)
|
|
else:
|
|
merged[key] = val
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow("select value from app_config where key='project'")
|
|
prev = _app_config_row_value_dict(row)
|
|
prev_proj = {k: v for k, v in prev.items() if k in _PROJECT_ADMIN_KEYS}
|
|
out = {**prev_proj, **merged}
|
|
await conn.execute(
|
|
"""
|
|
insert into app_config (key, value, updated_at)
|
|
values ('project', $1::jsonb, now())
|
|
on conflict (key)
|
|
do update set value=excluded.value, updated_at=now()
|
|
""",
|
|
json.dumps(out),
|
|
)
|
|
return {"status": "updated"}
|
|
|
|
|
|
@router.get("/admin/os-registry-config")
|
|
async def get_admin_os_registry_config(_: str = Depends(get_current_admin_user_id)) -> dict[str, Any]:
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow("select value from app_config where key = $1", OS_REGISTRY_CONFIG_KEY)
|
|
saved = _app_config_row_value_dict(row)
|
|
if not saved:
|
|
return {
|
|
"provider": "hub",
|
|
"registry_host": "",
|
|
"repository_path": get_settings().roleforge_os_docker_hub_repository.strip() or "inecs/roleforge",
|
|
"username": "",
|
|
"password_set": False,
|
|
}
|
|
return parse_stored_os_registry(saved)
|
|
|
|
|
|
@router.put("/admin/os-registry-config")
|
|
async def put_admin_os_registry_config(payload: dict, _: str = Depends(get_current_admin_user_id)) -> dict[str, str]:
|
|
if not isinstance(payload, dict):
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Payload must be an object")
|
|
provider = str(payload.get("provider") or "hub").strip().lower()
|
|
if provider not in ("hub", "harbor", "nexus"):
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid provider")
|
|
registry_host = str(payload.get("registry_host") or "").strip()
|
|
repository_path = str(payload.get("repository_path") or "").strip()
|
|
username = str(payload.get("username") or "").strip()
|
|
password_in = payload.get("password")
|
|
password_plain = str(password_in).strip() if password_in is not None else ""
|
|
|
|
if provider in ("harbor", "nexus"):
|
|
if not registry_host or not repository_path:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="registry_host and repository_path are required for Harbor and Nexus.",
|
|
)
|
|
if provider == "hub" and not repository_path:
|
|
repository_path = get_settings().roleforge_os_docker_hub_repository.strip() or "inecs/roleforge"
|
|
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow("select value from app_config where key = $1", OS_REGISTRY_CONFIG_KEY)
|
|
prev = _app_config_row_value_dict(row)
|
|
new_doc: dict[str, Any] = {
|
|
"provider": provider,
|
|
"registry_host": registry_host,
|
|
"repository_path": repository_path,
|
|
"username": username,
|
|
}
|
|
if password_plain:
|
|
new_doc["password_ciphertext"] = encrypt_registry_secret(password_plain)
|
|
elif isinstance(prev.get("password_ciphertext"), str) and prev["password_ciphertext"].strip():
|
|
new_doc["password_ciphertext"] = prev["password_ciphertext"]
|
|
|
|
await conn.execute(
|
|
"""
|
|
insert into app_config (key, value, updated_at)
|
|
values ($1, $2::jsonb, now())
|
|
on conflict (key)
|
|
do update set value=excluded.value, updated_at=now()
|
|
""",
|
|
OS_REGISTRY_CONFIG_KEY,
|
|
json.dumps(new_doc),
|
|
)
|
|
return {"status": "updated"}
|
|
|
|
|
|
@router.get("/admin/yaml-lint-config")
|
|
async def get_admin_yaml_lint_config(_: str = Depends(get_current_admin_user_id)) -> dict[str, Any]:
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow("select value from app_config where key = $1", "yamllint")
|
|
saved = _app_config_row_value_dict(row)
|
|
merged = merge_yamllint_saved(saved)
|
|
return {"rules": serialize_rules_for_api(merged), "meta": list(RULE_META)}
|
|
|
|
|
|
@router.put("/admin/yaml-lint-config")
|
|
async def put_admin_yaml_lint_config(payload: dict, _: str = Depends(get_current_admin_user_id)) -> dict[str, str]:
|
|
try:
|
|
normalized = validate_put_payload(payload)
|
|
except ValueError as exc:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
insert into app_config (key, value, updated_at)
|
|
values ('yamllint', $1::jsonb, now())
|
|
on conflict (key)
|
|
do update set value=excluded.value, updated_at=now()
|
|
""",
|
|
json.dumps(normalized),
|
|
)
|
|
invalidate_yamllint_config_cache()
|
|
await refresh_yamllint_config_cache()
|
|
return {"status": "updated"}
|
|
|
|
|
|
@router.get("/admin/json-lint-config")
|
|
async def get_admin_json_lint_config(_: str = Depends(get_current_admin_user_id)) -> dict[str, Any]:
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow("select value from app_config where key = $1", "jsonlint")
|
|
saved = _app_config_row_value_dict(row)
|
|
merged = merge_jsonlint_saved(saved)
|
|
return {"rules": serialize_jsonlint_rules_for_api(merged), "meta": list(JSONLINT_RULE_META)}
|
|
|
|
|
|
@router.put("/admin/json-lint-config")
|
|
async def put_admin_json_lint_config(payload: dict, _: str = Depends(get_current_admin_user_id)) -> dict[str, str]:
|
|
try:
|
|
normalized = validate_jsonlint_put_payload(payload)
|
|
except ValueError as exc:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
insert into app_config (key, value, updated_at)
|
|
values ('jsonlint', $1::jsonb, now())
|
|
on conflict (key)
|
|
do update set value=excluded.value, updated_at=now()
|
|
""",
|
|
json.dumps(normalized),
|
|
)
|
|
invalidate_jsonlint_config_cache()
|
|
await refresh_jsonlint_config_cache()
|
|
return {"status": "updated"}
|
|
|
|
|
|
@router.post("/inventories")
|
|
async def create_inventory(
|
|
payload: InventoryCreate, user_id: str = Depends(get_current_user_id)
|
|
) -> dict[str, str]:
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
inventory_id = await conn.fetchval(
|
|
"""
|
|
insert into inventories (owner_id, name, inventory_text)
|
|
values ($1, $2, $3)
|
|
returning id::text
|
|
""",
|
|
user_id,
|
|
payload.name,
|
|
payload.inventory_text,
|
|
)
|
|
return {"id": inventory_id}
|
|
|
|
|
|
@router.post("/playbooks")
|
|
async def create_playbook(
|
|
payload: PlaybookCreate, user_id: str = Depends(get_current_user_id)
|
|
) -> dict[str, str]:
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
inventory_owner = await conn.fetchval(
|
|
"select owner_id::text from inventories where id=$1::uuid",
|
|
payload.inventory_id,
|
|
)
|
|
if inventory_owner != user_id:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Inventory access denied")
|
|
playbook_id = await conn.fetchval(
|
|
"""
|
|
insert into playbooks (owner_id, name, description, playbook_yaml, inventory_id, role_ids, is_shared)
|
|
values ($1, $2, $3, $4, $5::uuid, $6::uuid[], $7)
|
|
returning id::text
|
|
""",
|
|
user_id,
|
|
payload.name,
|
|
payload.description,
|
|
payload.playbook_yaml,
|
|
payload.inventory_id,
|
|
payload.role_ids,
|
|
payload.is_shared,
|
|
)
|
|
return {"id": playbook_id}
|
|
|
|
|
|
@router.post("/playbooks/import-yaml")
|
|
async def import_playbook_yaml(
|
|
payload: PlaybookYamlImportRequest, user_id: str = Depends(get_current_user_id)
|
|
) -> dict[str, str]:
|
|
try:
|
|
parsed = yaml.safe_load(payload.playbook_yaml)
|
|
except yaml.YAMLError as exc:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid YAML") from exc
|
|
if not isinstance(parsed, list):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Playbook YAML must be a list of plays",
|
|
)
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
inventory_owner = await conn.fetchval(
|
|
"select owner_id::text from inventories where id=$1::uuid",
|
|
payload.inventory_id,
|
|
)
|
|
if inventory_owner != user_id:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Inventory access denied")
|
|
playbook_id = await conn.fetchval(
|
|
"""
|
|
insert into playbooks (owner_id, name, description, playbook_yaml, inventory_id, role_ids, is_shared)
|
|
values ($1::uuid, $2, $3, $4, $5::uuid, $6::uuid[], $7)
|
|
returning id::text
|
|
""",
|
|
user_id,
|
|
payload.name,
|
|
payload.description,
|
|
payload.playbook_yaml,
|
|
payload.inventory_id,
|
|
payload.role_ids,
|
|
payload.is_shared,
|
|
)
|
|
return {"id": playbook_id}
|
|
|
|
|
|
@router.get("/playbooks/{playbook_id}/export-yaml")
|
|
async def export_playbook_yaml(playbook_id: str, user_id: str = Depends(get_current_user_id)) -> Response:
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow(
|
|
"""
|
|
select owner_id::text as owner_id, is_shared, playbook_yaml
|
|
from playbooks
|
|
where id=$1::uuid
|
|
""",
|
|
playbook_id,
|
|
)
|
|
if not row:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Playbook not found")
|
|
if row["owner_id"] != user_id and not row["is_shared"]:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Playbook access denied")
|
|
return Response(content=row["playbook_yaml"], media_type="application/x-yaml")
|
|
|
|
|
|
@router.post("/jobs/launch")
|
|
async def launch_job(
|
|
payload: JobLaunchRequest, user_id: str = Depends(get_current_user_id)
|
|
) -> dict[str, str]:
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
playbook = await conn.fetchrow(
|
|
"""
|
|
select p.id::text as id, p.owner_id::text as owner_id, p.is_shared,
|
|
p.playbook_yaml, i.inventory_text
|
|
from playbooks p
|
|
join inventories i on i.id = p.inventory_id
|
|
where p.id=$1::uuid
|
|
""",
|
|
payload.playbook_id,
|
|
)
|
|
if not playbook:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Playbook not found")
|
|
if playbook["owner_id"] != user_id and not playbook["is_shared"]:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Playbook access denied")
|
|
job_id = await conn.fetchval(
|
|
"""
|
|
insert into jobs (owner_id, playbook_id, status, extra_vars, runtime_mode)
|
|
values ($1::uuid, $2::uuid, 'queued', $3::jsonb, $4)
|
|
returning id::text
|
|
""",
|
|
user_id,
|
|
payload.playbook_id,
|
|
json.dumps(payload.extra_vars),
|
|
payload.runtime_mode,
|
|
)
|
|
queue_name = f"job.{job_id}"
|
|
try:
|
|
worker_container_id, worker_container_name, runner_url = await launch_ephemeral_worker(
|
|
queue_name,
|
|
payload.runtime_mode,
|
|
)
|
|
except RuntimeError as exc:
|
|
async with pool.acquire() as conn:
|
|
await conn.execute("delete from jobs where id=$1::uuid", job_id)
|
|
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=str(exc)) from exc
|
|
runner_run_id = await create_playbook_run(
|
|
runner_url,
|
|
playbook_yaml=playbook["playbook_yaml"],
|
|
inventory_text=playbook["inventory_text"],
|
|
extra_vars=payload.extra_vars,
|
|
)
|
|
async with pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
update jobs
|
|
set status='running',
|
|
started_at=now(),
|
|
runner_url=$1,
|
|
runner_run_id=$2,
|
|
runner_container_name=$3,
|
|
runner_last_heartbeat=now()
|
|
where id=$4::uuid
|
|
""",
|
|
runner_url,
|
|
runner_run_id,
|
|
worker_container_name,
|
|
job_id,
|
|
)
|
|
asyncio.create_task(
|
|
_monitor_job(
|
|
job_id,
|
|
runner_url,
|
|
runner_run_id,
|
|
worker_container_name,
|
|
payload.runtime_mode,
|
|
)
|
|
)
|
|
return {
|
|
"job_id": job_id,
|
|
"runner_run_id": runner_run_id,
|
|
"worker_container_id": worker_container_id,
|
|
}
|
|
|
|
|
|
@router.post("/tests/launch")
|
|
async def launch_test(
|
|
payload: TestLaunchRequest, user_id: str = Depends(get_current_user_id)
|
|
) -> dict[str, str]:
|
|
if not payload.playbook_id and not payload.role_id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Either playbook_id or role_id must be provided",
|
|
)
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
if payload.playbook_id:
|
|
playbook = await conn.fetchrow(
|
|
"select owner_id::text as owner_id, is_shared from playbooks where id=$1::uuid",
|
|
payload.playbook_id,
|
|
)
|
|
if not playbook:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Playbook not found")
|
|
if playbook["owner_id"] != user_id and not playbook["is_shared"]:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Playbook access denied")
|
|
if payload.role_id:
|
|
role = await conn.fetchrow(
|
|
"select owner_id::text as owner_id, coalesce(visibility, 'public') as visibility from ansible_roles where id=$1::uuid",
|
|
payload.role_id,
|
|
)
|
|
if not role:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Role not found")
|
|
if role["owner_id"] != user_id and role["visibility"] != "public":
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Role access denied")
|
|
|
|
test_id = await conn.fetchval(
|
|
"""
|
|
insert into test_runs (owner_id, playbook_id, role_id, status, runtime_mode, hosts_blueprint, extra_vars)
|
|
values ($1::uuid, $2::uuid, $3::uuid, 'queued', $4, $5::jsonb, $6::jsonb)
|
|
returning id::text
|
|
""",
|
|
user_id,
|
|
payload.playbook_id,
|
|
payload.role_id,
|
|
payload.runtime_mode,
|
|
json.dumps([host.model_dump() for host in payload.hosts]),
|
|
json.dumps(payload.extra_vars),
|
|
)
|
|
|
|
queue_name = f"test.{test_id}"
|
|
try:
|
|
worker_container_id, worker_container_name, runner_url = await launch_ephemeral_worker(
|
|
queue_name,
|
|
payload.runtime_mode,
|
|
)
|
|
except RuntimeError as exc:
|
|
async with pool.acquire() as conn:
|
|
await conn.execute("delete from test_runs where id=$1::uuid", test_id)
|
|
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=str(exc)) from exc
|
|
|
|
if payload.playbook_id:
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow(
|
|
"select playbook_yaml from playbooks where id=$1::uuid",
|
|
payload.playbook_id,
|
|
)
|
|
runner_run_id = await create_molecule_playbook_run(
|
|
runner_url,
|
|
playbook_yaml=row["playbook_yaml"],
|
|
hosts=[host.model_dump() for host in payload.hosts],
|
|
extra_vars=payload.extra_vars,
|
|
)
|
|
else:
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow(
|
|
"select name, content from ansible_roles where id=$1::uuid",
|
|
payload.role_id,
|
|
)
|
|
file_tasks = await conn.fetchval(
|
|
"select content from role_files where role_id=$1::uuid and path='tasks/main.yml'",
|
|
payload.role_id,
|
|
)
|
|
role_content = _parse_json_object(row["content"])
|
|
runner_run_id = await create_molecule_role_run(
|
|
runner_url,
|
|
role_name=row["name"],
|
|
role_tasks_yaml=str(
|
|
file_tasks
|
|
or role_content.get("tasks_yaml")
|
|
or "- name: ping\n ansible.builtin.ping:\n"
|
|
),
|
|
hosts=[host.model_dump() for host in payload.hosts],
|
|
extra_vars=payload.extra_vars,
|
|
)
|
|
|
|
async with pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
update test_runs
|
|
set status='running',
|
|
started_at=now(),
|
|
runner_url=$1,
|
|
runner_run_id=$2,
|
|
runner_container_name=$3,
|
|
runner_last_heartbeat=now()
|
|
where id=$4::uuid
|
|
""",
|
|
runner_url,
|
|
runner_run_id,
|
|
worker_container_name,
|
|
test_id,
|
|
)
|
|
asyncio.create_task(
|
|
_monitor_test(
|
|
test_id,
|
|
runner_url,
|
|
runner_run_id,
|
|
worker_container_name,
|
|
payload.runtime_mode,
|
|
)
|
|
)
|
|
return {
|
|
"test_id": test_id,
|
|
"runner_run_id": runner_run_id,
|
|
"worker_container_id": worker_container_id,
|
|
}
|
|
|
|
|
|
@router.get("/tests/os-options")
|
|
async def list_test_os_options(_: str = Depends(get_current_user_id)) -> dict[str, list[dict[str, str]]]:
|
|
return {"items": _scan_os_options()}
|
|
|
|
|
|
@router.get("/admin/os-images")
|
|
async def list_admin_os_images(_: str = Depends(get_current_admin_user_id)) -> dict[str, Any]:
|
|
s = get_settings()
|
|
hub_repo = (s.roleforge_os_docker_hub_repository or "").strip()
|
|
reg_env = _normalize_os_registry_prefix(s.roleforge_os_image_registry or "")
|
|
push_any = bool(hub_repo or reg_env)
|
|
return {
|
|
"items": _scan_os_options(),
|
|
"os_image_registry": _os_image_push_display(),
|
|
"push_requires_docker_login": push_any,
|
|
"registry_defaults": {
|
|
"provider": "hub",
|
|
"repository_path": hub_repo,
|
|
"registry_host": "",
|
|
},
|
|
"registry_help_urls": {
|
|
"hub": "https://hub.docker.com/",
|
|
"harbor": "https://goharbor.io/docs/",
|
|
"nexus": "https://help.sonatype.com/repomanager3/formats/docker-registry/docker-registry-repositories",
|
|
},
|
|
}
|
|
|
|
|
|
@router.post("/admin/os-images/build-all")
|
|
async def build_all_admin_os_images(
|
|
body: dict[str, Any] = Body(default_factory=dict),
|
|
_: str = Depends(get_current_admin_user_id),
|
|
) -> dict[str, Any]:
|
|
rp = _registry_payload_slice(body)
|
|
cfg_head = _resolve_os_push_config(rp)
|
|
_require_registry_credentials(cfg_head.kind, body.get("docker_login_username"), body.get("docker_login_password"))
|
|
du = str(body.get("docker_login_username") or "").strip()
|
|
dp = str(body.get("docker_login_password") or "")
|
|
no_cache = _request_bool_flag(body)
|
|
options = _scan_os_options()
|
|
if not options:
|
|
return {"status": "noop", "items": [], "logs": ["No dockerfiles found."]}
|
|
logs: list[str] = []
|
|
results: list[dict[str, str]] = []
|
|
for opt in options:
|
|
one = await _build_os_image(
|
|
dockerfile_rel=str(opt["dockerfile"]),
|
|
image_tag=str(opt["image"]),
|
|
platform=str(opt.get("platform") or "").strip(),
|
|
build_platforms=str(opt.get("build_platforms") or "").strip(),
|
|
docker_login_username=du,
|
|
docker_login_password=dp,
|
|
registry_payload=rp,
|
|
no_cache=no_cache,
|
|
)
|
|
logs.extend(one.get("logs") or [])
|
|
results.append({k: v for k, v in one.items() if k != "logs"})
|
|
if one.get("status") != "success":
|
|
return {"status": "failed", "items": results, "logs": logs}
|
|
return {"status": "success", "items": results, "logs": logs}
|
|
|
|
|
|
@router.post("/admin/os-images/build-one")
|
|
async def build_one_admin_os_image(
|
|
payload: dict[str, Any],
|
|
_: str = Depends(get_current_admin_user_id),
|
|
) -> dict[str, Any]:
|
|
dockerfile_rel = str(payload.get("dockerfile") or "").strip()
|
|
image_tag = str(payload.get("image") or "").strip()
|
|
platform = str(payload.get("platform") or "").strip()
|
|
build_platforms = str(payload.get("build_platforms") or "").strip()
|
|
if dockerfile_rel and (not platform or not build_platforms):
|
|
resolved = _resolve_os_option_by_dockerfile(dockerfile_rel)
|
|
if resolved:
|
|
if not platform:
|
|
platform = str(resolved.get("platform") or "").strip()
|
|
if not build_platforms:
|
|
build_platforms = str(resolved.get("build_platforms") or "").strip()
|
|
if not platform or not build_platforms:
|
|
inf_platform, inf_matrix = _infer_platform_defaults(dockerfile_rel)
|
|
if not platform:
|
|
platform = inf_platform
|
|
if not build_platforms:
|
|
build_platforms = inf_matrix
|
|
if not dockerfile_rel or not image_tag:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="dockerfile and image are required")
|
|
rp = _registry_payload_slice(payload)
|
|
cfg_one = _resolve_os_push_config(rp)
|
|
_require_registry_credentials(cfg_one.kind, payload.get("docker_login_username"), payload.get("docker_login_password"))
|
|
return await _build_os_image(
|
|
dockerfile_rel=dockerfile_rel,
|
|
image_tag=image_tag,
|
|
platform=platform,
|
|
build_platforms=build_platforms,
|
|
docker_login_username=str(payload.get("docker_login_username") or "").strip(),
|
|
docker_login_password=str(payload.get("docker_login_password") or ""),
|
|
registry_payload=rp,
|
|
no_cache=_request_bool_flag(payload),
|
|
)
|
|
|
|
|
|
@router.post("/admin/os-images/build-one-stream")
|
|
async def build_one_admin_os_image_stream(
|
|
payload: dict[str, Any],
|
|
_: str = Depends(get_current_admin_user_id),
|
|
) -> StreamingResponse:
|
|
dockerfile_rel = str(payload.get("dockerfile") or "").strip()
|
|
image_tag = str(payload.get("image") or "").strip()
|
|
platform = str(payload.get("platform") or "").strip()
|
|
build_platforms = [p.strip() for p in str(payload.get("build_platforms") or "").split(",") if p.strip()]
|
|
if dockerfile_rel and (not platform or not build_platforms):
|
|
resolved = _resolve_os_option_by_dockerfile(dockerfile_rel)
|
|
if resolved:
|
|
if not platform:
|
|
platform = str(resolved.get("platform") or "").strip()
|
|
if not build_platforms:
|
|
build_platforms = [
|
|
p.strip() for p in str(resolved.get("build_platforms") or "").split(",") if p.strip()
|
|
]
|
|
if not platform or not build_platforms:
|
|
inf_platform, inf_matrix = _infer_platform_defaults(dockerfile_rel)
|
|
if not platform:
|
|
platform = inf_platform
|
|
if not build_platforms:
|
|
build_platforms = [p.strip() for p in inf_matrix.split(",") if p.strip()]
|
|
if not dockerfile_rel or not image_tag:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="dockerfile and image are required")
|
|
|
|
try:
|
|
rp = _registry_payload_slice(payload)
|
|
cfg = _resolve_os_push_config(rp)
|
|
except HTTPException as exc:
|
|
|
|
async def _err_stream() -> AsyncIterator[str]:
|
|
yield f"[roleforge] {exc.detail}\n"
|
|
yield "__ROLEFORGE_BUILD_STATUS__:failed:400\n"
|
|
|
|
return StreamingResponse(_err_stream(), media_type="text/plain; charset=utf-8")
|
|
|
|
repo_root = str(DOCKERFILES_ROOT.parent)
|
|
dockerfile_abs = str(DOCKERFILES_ROOT / dockerfile_rel)
|
|
docker_user = str(payload.get("docker_login_username") or "").strip()
|
|
docker_pass = str(payload.get("docker_login_password") or "")
|
|
no_cache = _request_bool_flag(payload)
|
|
|
|
async def _stream() -> Any:
|
|
proc: asyncio.subprocess.Process | None = None
|
|
|
|
try:
|
|
buildx_mode, buildx_note = await _ensure_docker_buildx_mode()
|
|
yield f"[roleforge] {buildx_note}\n"
|
|
if buildx_mode == "none":
|
|
yield "__ROLEFORGE_BUILD_STATUS__:failed:127\n"
|
|
return
|
|
builder_ok, builder_note = await _ensure_buildx_builder()
|
|
yield f"[roleforge] {builder_note}\n"
|
|
if not builder_ok:
|
|
yield "__ROLEFORGE_BUILD_STATUS__:failed:127\n"
|
|
return
|
|
if no_cache:
|
|
yield "[roleforge] Docker BuildKit cache disabled for this run (--no-cache).\n"
|
|
push_kind, push_target = cfg.kind, cfg.target
|
|
legacy_layout = cfg.legacy_full_image
|
|
login_server = cfg.docker_login_server
|
|
if push_kind == "hub":
|
|
yield f"[roleforge] Docker Hub repository: docker.io/{push_target}\n"
|
|
if not docker_user or not docker_pass:
|
|
yield "[roleforge] docker_login_username and docker_login_password are required.\n"
|
|
yield "__ROLEFORGE_BUILD_STATUS__:failed:3\n"
|
|
return
|
|
yield f"$ docker login --username {docker_user!r} --password-stdin (Docker Hub)\n"
|
|
lg_code, lg_out = await _docker_login_stdin_password(
|
|
username=docker_user,
|
|
password=docker_pass,
|
|
registry_server=None,
|
|
)
|
|
for lg_line in lg_out.decode(errors="replace").splitlines()[-40:]:
|
|
yield f"{lg_line}\n"
|
|
if lg_code != 0:
|
|
yield f"__ROLEFORGE_BUILD_STATUS__:failed:{lg_code}\n"
|
|
return
|
|
elif push_kind == "registry":
|
|
yield (
|
|
f"[roleforge] Registry push target: {push_target}"
|
|
+ (" (legacy prefix layout)" if legacy_layout else "")
|
|
+ "\n"
|
|
)
|
|
if not docker_user or not docker_pass:
|
|
yield "[roleforge] docker_login_username and docker_login_password are required.\n"
|
|
yield "__ROLEFORGE_BUILD_STATUS__:failed:3\n"
|
|
return
|
|
srv = login_server or push_target.split("/")[0]
|
|
yield f"$ docker login --username {docker_user!r} --password-stdin {srv}\n"
|
|
lg_code, lg_out = await _docker_login_stdin_password(
|
|
username=docker_user,
|
|
password=docker_pass,
|
|
registry_server=srv,
|
|
)
|
|
for lg_line in lg_out.decode(errors="replace").splitlines()[-40:]:
|
|
yield f"{lg_line}\n"
|
|
if lg_code != 0:
|
|
yield f"__ROLEFORGE_BUILD_STATUS__:failed:{lg_code}\n"
|
|
return
|
|
matrix_csv = ",".join(build_platforms) if build_platforms else ""
|
|
cmds, plan_err = _os_build_command_plan(
|
|
dockerfile_abs=dockerfile_abs,
|
|
image_tag=image_tag,
|
|
platform=platform,
|
|
build_platforms_csv=matrix_csv,
|
|
repo_root=repo_root,
|
|
push_kind=push_kind,
|
|
push_target=push_target,
|
|
legacy_full_image=legacy_layout,
|
|
no_cache=no_cache,
|
|
)
|
|
if plan_err:
|
|
yield f"[roleforge] {plan_err}\n"
|
|
yield "__ROLEFORGE_BUILD_STATUS__:failed:2\n"
|
|
return
|
|
exit_code = 0
|
|
for step_cmd in cmds:
|
|
max_attempts = _OS_BUILDX_PUSH_RETRIES if _should_retry_os_registry_cmd(step_cmd) else 1
|
|
for attempt in range(max_attempts):
|
|
if attempt:
|
|
yield (
|
|
f"[roleforge] buildx push retry {attempt + 1}/{max_attempts} "
|
|
f"after {_OS_BUILDX_PUSH_RETRY_DELAY_SEC}s…\n"
|
|
)
|
|
await asyncio.sleep(_OS_BUILDX_PUSH_RETRY_DELAY_SEC)
|
|
yield f"$ {' '.join(step_cmd)}\n"
|
|
proc = await asyncio.create_subprocess_exec(
|
|
*step_cmd,
|
|
cwd=repo_root,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.STDOUT,
|
|
)
|
|
buf: list[str] = []
|
|
async for text_chunk in _stream_process_stdout_chunks(proc):
|
|
buf.append(text_chunk)
|
|
yield text_chunk
|
|
exit_code = await proc.wait()
|
|
if exit_code == 0:
|
|
break
|
|
blob = "".join(buf)[-12000:]
|
|
if attempt < max_attempts - 1 and _should_retry_buildx_push_output(blob):
|
|
continue
|
|
yield f"__ROLEFORGE_BUILD_STATUS__:failed:{exit_code}\n"
|
|
return
|
|
if push_kind in ("hub", "registry") and push_target.strip():
|
|
mp = _manual_pull_and_tag_shell_commands(
|
|
push_kind,
|
|
push_target,
|
|
image_tag,
|
|
legacy_full_image=legacy_layout,
|
|
)
|
|
yield f"__ROLEFORGE_MANUAL_PULL__:{json.dumps({'commands': mp}, ensure_ascii=False)}\n"
|
|
yield f"__ROLEFORGE_BUILD_STATUS__:success:{exit_code}\n"
|
|
except asyncio.CancelledError:
|
|
asyncio.create_task(_reap_subprocess_after_stream_disconnect(proc))
|
|
raise
|
|
|
|
return StreamingResponse(_stream(), media_type="text/plain; charset=utf-8")
|
|
|
|
|
|
@router.get("/tests/{test_id}")
|
|
async def get_test_status(test_id: str, user_id: str = Depends(get_current_user_id)) -> dict[str, str]:
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow(
|
|
"""
|
|
select id::text as id, owner_id::text as owner_id, status, created_at, started_at, finished_at
|
|
from test_runs
|
|
where id=$1::uuid
|
|
""",
|
|
test_id,
|
|
)
|
|
if not row:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Test run not found")
|
|
if row["owner_id"] != user_id:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Test run access denied")
|
|
return {
|
|
"id": row["id"],
|
|
"status": row["status"],
|
|
"created_at": row["created_at"].isoformat() if row["created_at"] else "",
|
|
"started_at": row["started_at"].isoformat() if row["started_at"] else "",
|
|
"finished_at": row["finished_at"].isoformat() if row["finished_at"] else "",
|
|
}
|
|
|
|
|
|
@router.post("/tests/{test_id}/stop")
|
|
async def stop_test(test_id: str, user_id: str = Depends(get_current_user_id)) -> dict[str, str]:
|
|
"""Terminate the ephemeral runner for this test and mark the run as failed (user-initiated stop)."""
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow(
|
|
"""
|
|
select owner_id::text as owner_id, runner_container_name, runtime_mode, status
|
|
from test_runs
|
|
where id=$1::uuid
|
|
""",
|
|
test_id,
|
|
)
|
|
if not row:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Test run not found")
|
|
if row["owner_id"] != user_id:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Test run access denied")
|
|
status_val = str(row["status"] or "")
|
|
if status_val in ("success", "failed"):
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Test run already finished")
|
|
runner_name = row["runner_container_name"]
|
|
runtime_mode = str(row["runtime_mode"] or "docker")
|
|
if not runner_name:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="No runner container for this test")
|
|
await terminate_ephemeral_worker(runner_name, runtime_mode)
|
|
async with pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
insert into test_logs (test_run_id, log_line)
|
|
values ($1::uuid, $2)
|
|
""",
|
|
test_id,
|
|
"[roleforge] Test stopped by user (runner container terminated).",
|
|
)
|
|
await conn.execute(
|
|
"""
|
|
update test_runs
|
|
set status='failed', finished_at=now()
|
|
where id=$1::uuid
|
|
and status not in ('success', 'failed')
|
|
""",
|
|
test_id,
|
|
)
|
|
return {"status": "stopped", "test_id": test_id}
|
|
|
|
|
|
@router.get("/tests/{test_id}/logs")
|
|
async def get_test_logs(
|
|
test_id: str,
|
|
after_id: int = 0,
|
|
limit: int = 200,
|
|
user_id: str = Depends(get_current_user_id),
|
|
) -> dict[str, Any]:
|
|
"""Return log lines after `after_id` (test_logs.id), chronological order — for live polling."""
|
|
safe_limit = max(1, min(limit, 5000))
|
|
safe_after = max(0, after_id)
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
owner_id = await conn.fetchval("select owner_id::text from test_runs where id=$1::uuid", test_id)
|
|
if not owner_id:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Test run not found")
|
|
if owner_id != user_id:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Test run access denied")
|
|
rows = await conn.fetch(
|
|
"""
|
|
select id, log_line
|
|
from test_logs
|
|
where test_run_id=$1::uuid and id > $2::bigint
|
|
order by id asc
|
|
limit $3
|
|
""",
|
|
test_id,
|
|
safe_after,
|
|
safe_limit,
|
|
)
|
|
max_log_id = await conn.fetchval(
|
|
"select coalesce(max(id), 0)::bigint from test_logs where test_run_id=$1::uuid",
|
|
test_id,
|
|
)
|
|
items = [{"id": int(row["id"]), "line": row["log_line"]} for row in rows]
|
|
next_after = int(rows[-1]["id"]) if rows else safe_after
|
|
return {
|
|
"items": items,
|
|
"next_after_id": next_after,
|
|
"max_log_id": int(max_log_id or 0),
|
|
}
|
|
|
|
|
|
@router.get("/runners/active")
|
|
async def active_runners(_: str = Depends(get_current_user_id)) -> dict[str, list[dict[str, str]]]:
|
|
return {"items": await list_active_runners()}
|
|
|
|
|
|
@router.post("/runners/{runner_name}/stop")
|
|
async def stop_runner(
|
|
runner_name: str,
|
|
payload: RunnerStopRequest,
|
|
_: str = Depends(get_current_user_id),
|
|
) -> dict[str, str]:
|
|
await terminate_ephemeral_worker(runner_name, payload.runtime_mode)
|
|
return {"status": "stopped", "runner_name": runner_name, "runtime_mode": payload.runtime_mode}
|