import asyncio import logging import re import secrets import docker import httpx from kubernetes import client as k8s_client from kubernetes import config as k8s_config from kubernetes.client.exceptions import ApiException from app.core.config import get_settings _logger = logging.getLogger(__name__) def _runner_env(settings) -> dict[str, str]: return { "DATABASE_DSN": settings.database_dsn, "REDIS_URL": settings.redis_url, "CELERY_BROKER_URL": settings.celery_broker_url, "CELERY_RESULT_BACKEND": settings.celery_result_backend, "APP_SECRET_KEY": settings.app_secret_key, "APP_ACCESS_TOKEN_EXPIRE_MIN": str(settings.app_access_token_expire_min), "APP_REFRESH_TOKEN_EXPIRE_DAYS": str(settings.app_refresh_token_expire_days), "APP_RUNNER_IMAGE": settings.app_runner_image, "APP_DOCKER_NETWORK": settings.app_docker_network, "APP_K8S_NAMESPACE": settings.app_k8s_namespace, "APP_K8S_SERVICE_ACCOUNT": settings.app_k8s_service_account, } def _start_docker_runner(queue_name: str) -> tuple[str, str, str]: settings = get_settings() client = docker.from_env() container_name = f"roleforge-worker-{queue_name.replace('.', '-')}" volumes_cfg: dict[str, dict[str, str]] = {} if settings.app_runner_mount_docker_socket: volumes_cfg["/var/run/docker.sock"] = {"bind": "/var/run/docker.sock", "mode": "rw"} kw: dict = { "image": settings.app_runner_image, "command": ["uvicorn", "app.runner.main:app", "--host", "0.0.0.0", "--port", "9000"], "environment": _runner_env(settings), "detach": True, "auto_remove": True, "network": settings.app_docker_network, "name": container_name, "volumes": volumes_cfg or None, } if settings.app_runner_privileged: kw["privileged"] = True kw = {k: v for k, v in kw.items() if v is not None} container = client.containers.run(**kw) _logger.info( "Started ephemeral runner container %s on network %s (image=%s)", container_name, settings.app_docker_network, settings.app_runner_image, ) return container.id, container_name, f"http://{container_name}:9000" def _sanitize_dns_name(value: str) -> str: normalized = re.sub(r"[^a-z0-9-]", "-", value.lower()) normalized = re.sub(r"-+", "-", normalized).strip("-") return normalized[:40] or "runner" def _load_k8s_config() -> None: try: k8s_config.load_incluster_config() except Exception: # noqa: BLE001 k8s_config.load_kube_config() def _start_k8s_runner(queue_name: str) -> tuple[str, str, str]: settings = get_settings() _load_k8s_config() core = k8s_client.CoreV1Api() suffix = secrets.token_hex(3) base_name = f"roleforge-runner-{_sanitize_dns_name(queue_name)}-{suffix}" namespace = settings.app_k8s_namespace labels = {"app": "roleforge-runner", "runner-name": base_name} env = [k8s_client.V1EnvVar(name=k, value=v) for k, v in _runner_env(settings).items()] container = k8s_client.V1Container( name="runner", image=settings.app_runner_image, command=["uvicorn", "app.runner.main:app", "--host", "0.0.0.0", "--port", "9000"], env=env, ports=[k8s_client.V1ContainerPort(container_port=9000)], ) pod = k8s_client.V1Pod( metadata=k8s_client.V1ObjectMeta(name=base_name, labels=labels), spec=k8s_client.V1PodSpec( containers=[container], restart_policy="Never", service_account_name=settings.app_k8s_service_account, ), ) service = k8s_client.V1Service( metadata=k8s_client.V1ObjectMeta(name=base_name, labels=labels), spec=k8s_client.V1ServiceSpec( selector={"runner-name": base_name}, ports=[k8s_client.V1ServicePort(name="http", port=9000, target_port=9000)], ), ) core.create_namespaced_pod(namespace=namespace, body=pod) try: core.create_namespaced_service(namespace=namespace, body=service) except ApiException as exc: if exc.status != 409: raise for _ in range(60): current = core.read_namespaced_pod(name=base_name, namespace=namespace) if current.status and current.status.phase == "Running": conditions = current.status.conditions or [] if any(c.type == "Ready" and c.status == "True" for c in conditions): break if current.status and current.status.phase in {"Failed", "Succeeded"}: raise RuntimeError(f"k8s runner pod terminated early: {current.status.phase}") import time time.sleep(1.0) pod_uid = current.metadata.uid or base_name base_url = f"http://{base_name}.{namespace}.svc.cluster.local:9000" return pod_uid, base_name, base_url async def launch_ephemeral_worker(queue_name: str, runtime_mode: str = "docker") -> tuple[str, str, str]: starter = _start_k8s_runner if runtime_mode == "k8s" else _start_docker_runner container_id, container_name, base_url = await asyncio.to_thread(starter, queue_name) healthy = False async with httpx.AsyncClient(timeout=3.0) as client: for attempt in range(80): try: response = await client.get(f"{base_url}/healthz") if response.status_code == 200: healthy = True break except Exception: # noqa: BLE001 pass await asyncio.sleep(0.5) if not healthy: _logger.error("Ephemeral runner never became healthy at %s (queue=%s)", base_url, queue_name) terminator = _terminate_k8s_runner if runtime_mode == "k8s" else _terminate_docker_runner await asyncio.to_thread(terminator, container_name) raise RuntimeError( f"Ephemeral runner did not respond at {base_url}/healthz. " "Ensure the API container mounts /var/run/docker.sock, APP_DOCKER_NETWORK matches docker compose " "(e.g. roleforge_default), and APP_RUNNER_IMAGE is runnable." ) return container_id, container_name, base_url def _terminate_docker_runner(container_name: str) -> None: client = docker.from_env() try: container = client.containers.get(container_name) except Exception: # noqa: BLE001 return try: container.stop(timeout=5) except Exception: # noqa: BLE001 pass def _terminate_k8s_runner(runner_name: str) -> None: settings = get_settings() _load_k8s_config() core = k8s_client.CoreV1Api() namespace = settings.app_k8s_namespace try: core.delete_namespaced_pod(name=runner_name, namespace=namespace, grace_period_seconds=0) except ApiException as exc: if exc.status != 404: raise try: core.delete_namespaced_service(name=runner_name, namespace=namespace) except ApiException as exc: if exc.status != 404: raise async def terminate_ephemeral_worker(runner_name: str, runtime_mode: str) -> None: terminator = _terminate_k8s_runner if runtime_mode == "k8s" else _terminate_docker_runner await asyncio.to_thread(terminator, runner_name) def _list_docker_runners() -> list[dict[str, str]]: client = docker.from_env() items: list[dict[str, str]] = [] for c in client.containers.list(all=True, filters={"name": "roleforge-worker-"}): items.append( { "runtime_mode": "docker", "runner_name": c.name, "runner_id": c.id, "status": c.status, } ) return items def _list_k8s_runners() -> list[dict[str, str]]: settings = get_settings() _load_k8s_config() core = k8s_client.CoreV1Api() pods = core.list_namespaced_pod( namespace=settings.app_k8s_namespace, label_selector="app=roleforge-runner", ) items: list[dict[str, str]] = [] for pod in pods.items: items.append( { "runtime_mode": "k8s", "runner_name": pod.metadata.name or "", "runner_id": pod.metadata.uid or "", "status": pod.status.phase if pod.status else "unknown", } ) return items async def list_active_runners() -> list[dict[str, str]]: docker_items = await asyncio.to_thread(_list_docker_runners) try: k8s_items = await asyncio.to_thread(_list_k8s_runners) except Exception: # noqa: BLE001 k8s_items = [] return docker_items + k8s_items