237 lines
8.4 KiB
Python
237 lines
8.4 KiB
Python
import asyncio
|
|
import logging
|
|
import re
|
|
import secrets
|
|
|
|
import docker
|
|
import httpx
|
|
from kubernetes import client as k8s_client
|
|
from kubernetes import config as k8s_config
|
|
from kubernetes.client.exceptions import ApiException
|
|
|
|
from app.core.config import get_settings
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _runner_env(settings) -> dict[str, str]:
|
|
return {
|
|
"DATABASE_DSN": settings.database_dsn,
|
|
"REDIS_URL": settings.redis_url,
|
|
"CELERY_BROKER_URL": settings.celery_broker_url,
|
|
"CELERY_RESULT_BACKEND": settings.celery_result_backend,
|
|
"APP_SECRET_KEY": settings.app_secret_key,
|
|
"APP_ACCESS_TOKEN_EXPIRE_MIN": str(settings.app_access_token_expire_min),
|
|
"APP_REFRESH_TOKEN_EXPIRE_DAYS": str(settings.app_refresh_token_expire_days),
|
|
"APP_RUNNER_IMAGE": settings.app_runner_image,
|
|
"APP_DOCKER_NETWORK": settings.app_docker_network,
|
|
"APP_K8S_NAMESPACE": settings.app_k8s_namespace,
|
|
"APP_K8S_SERVICE_ACCOUNT": settings.app_k8s_service_account,
|
|
}
|
|
|
|
|
|
def _start_docker_runner(queue_name: str) -> tuple[str, str, str]:
|
|
settings = get_settings()
|
|
client = docker.from_env()
|
|
container_name = f"roleforge-worker-{queue_name.replace('.', '-')}"
|
|
volumes_cfg: dict[str, dict[str, str]] = {}
|
|
if settings.app_runner_mount_docker_socket:
|
|
volumes_cfg["/var/run/docker.sock"] = {"bind": "/var/run/docker.sock", "mode": "rw"}
|
|
kw: dict = {
|
|
"image": settings.app_runner_image,
|
|
"command": ["uvicorn", "app.runner.main:app", "--host", "0.0.0.0", "--port", "9000"],
|
|
"environment": _runner_env(settings),
|
|
"detach": True,
|
|
"auto_remove": True,
|
|
"network": settings.app_docker_network,
|
|
"name": container_name,
|
|
"volumes": volumes_cfg or None,
|
|
}
|
|
if settings.app_runner_privileged:
|
|
kw["privileged"] = True
|
|
kw = {k: v for k, v in kw.items() if v is not None}
|
|
container = client.containers.run(**kw)
|
|
_logger.info(
|
|
"Started ephemeral runner container %s on network %s (image=%s)",
|
|
container_name,
|
|
settings.app_docker_network,
|
|
settings.app_runner_image,
|
|
)
|
|
return container.id, container_name, f"http://{container_name}:9000"
|
|
|
|
|
|
def _sanitize_dns_name(value: str) -> str:
|
|
normalized = re.sub(r"[^a-z0-9-]", "-", value.lower())
|
|
normalized = re.sub(r"-+", "-", normalized).strip("-")
|
|
return normalized[:40] or "runner"
|
|
|
|
|
|
def _load_k8s_config() -> None:
|
|
try:
|
|
k8s_config.load_incluster_config()
|
|
except Exception: # noqa: BLE001
|
|
k8s_config.load_kube_config()
|
|
|
|
|
|
def _start_k8s_runner(queue_name: str) -> tuple[str, str, str]:
|
|
settings = get_settings()
|
|
_load_k8s_config()
|
|
core = k8s_client.CoreV1Api()
|
|
|
|
suffix = secrets.token_hex(3)
|
|
base_name = f"roleforge-runner-{_sanitize_dns_name(queue_name)}-{suffix}"
|
|
namespace = settings.app_k8s_namespace
|
|
labels = {"app": "roleforge-runner", "runner-name": base_name}
|
|
|
|
env = [k8s_client.V1EnvVar(name=k, value=v) for k, v in _runner_env(settings).items()]
|
|
container = k8s_client.V1Container(
|
|
name="runner",
|
|
image=settings.app_runner_image,
|
|
command=["uvicorn", "app.runner.main:app", "--host", "0.0.0.0", "--port", "9000"],
|
|
env=env,
|
|
ports=[k8s_client.V1ContainerPort(container_port=9000)],
|
|
)
|
|
pod = k8s_client.V1Pod(
|
|
metadata=k8s_client.V1ObjectMeta(name=base_name, labels=labels),
|
|
spec=k8s_client.V1PodSpec(
|
|
containers=[container],
|
|
restart_policy="Never",
|
|
service_account_name=settings.app_k8s_service_account,
|
|
),
|
|
)
|
|
service = k8s_client.V1Service(
|
|
metadata=k8s_client.V1ObjectMeta(name=base_name, labels=labels),
|
|
spec=k8s_client.V1ServiceSpec(
|
|
selector={"runner-name": base_name},
|
|
ports=[k8s_client.V1ServicePort(name="http", port=9000, target_port=9000)],
|
|
),
|
|
)
|
|
|
|
core.create_namespaced_pod(namespace=namespace, body=pod)
|
|
try:
|
|
core.create_namespaced_service(namespace=namespace, body=service)
|
|
except ApiException as exc:
|
|
if exc.status != 409:
|
|
raise
|
|
|
|
for _ in range(60):
|
|
current = core.read_namespaced_pod(name=base_name, namespace=namespace)
|
|
if current.status and current.status.phase == "Running":
|
|
conditions = current.status.conditions or []
|
|
if any(c.type == "Ready" and c.status == "True" for c in conditions):
|
|
break
|
|
if current.status and current.status.phase in {"Failed", "Succeeded"}:
|
|
raise RuntimeError(f"k8s runner pod terminated early: {current.status.phase}")
|
|
import time
|
|
|
|
time.sleep(1.0)
|
|
|
|
pod_uid = current.metadata.uid or base_name
|
|
base_url = f"http://{base_name}.{namespace}.svc.cluster.local:9000"
|
|
return pod_uid, base_name, base_url
|
|
|
|
|
|
async def launch_ephemeral_worker(queue_name: str, runtime_mode: str = "docker") -> tuple[str, str, str]:
|
|
starter = _start_k8s_runner if runtime_mode == "k8s" else _start_docker_runner
|
|
container_id, container_name, base_url = await asyncio.to_thread(starter, queue_name)
|
|
healthy = False
|
|
async with httpx.AsyncClient(timeout=3.0) as client:
|
|
for attempt in range(80):
|
|
try:
|
|
response = await client.get(f"{base_url}/healthz")
|
|
if response.status_code == 200:
|
|
healthy = True
|
|
break
|
|
except Exception: # noqa: BLE001
|
|
pass
|
|
await asyncio.sleep(0.5)
|
|
if not healthy:
|
|
_logger.error("Ephemeral runner never became healthy at %s (queue=%s)", base_url, queue_name)
|
|
terminator = _terminate_k8s_runner if runtime_mode == "k8s" else _terminate_docker_runner
|
|
await asyncio.to_thread(terminator, container_name)
|
|
raise RuntimeError(
|
|
f"Ephemeral runner did not respond at {base_url}/healthz. "
|
|
"Ensure the API container mounts /var/run/docker.sock, APP_DOCKER_NETWORK matches docker compose "
|
|
"(e.g. roleforge_default), and APP_RUNNER_IMAGE is runnable."
|
|
)
|
|
return container_id, container_name, base_url
|
|
|
|
|
|
def _terminate_docker_runner(container_name: str) -> None:
|
|
client = docker.from_env()
|
|
try:
|
|
container = client.containers.get(container_name)
|
|
except Exception: # noqa: BLE001
|
|
return
|
|
try:
|
|
container.stop(timeout=5)
|
|
except Exception: # noqa: BLE001
|
|
pass
|
|
|
|
|
|
def _terminate_k8s_runner(runner_name: str) -> None:
|
|
settings = get_settings()
|
|
_load_k8s_config()
|
|
core = k8s_client.CoreV1Api()
|
|
namespace = settings.app_k8s_namespace
|
|
try:
|
|
core.delete_namespaced_pod(name=runner_name, namespace=namespace, grace_period_seconds=0)
|
|
except ApiException as exc:
|
|
if exc.status != 404:
|
|
raise
|
|
try:
|
|
core.delete_namespaced_service(name=runner_name, namespace=namespace)
|
|
except ApiException as exc:
|
|
if exc.status != 404:
|
|
raise
|
|
|
|
|
|
async def terminate_ephemeral_worker(runner_name: str, runtime_mode: str) -> None:
|
|
terminator = _terminate_k8s_runner if runtime_mode == "k8s" else _terminate_docker_runner
|
|
await asyncio.to_thread(terminator, runner_name)
|
|
|
|
|
|
def _list_docker_runners() -> list[dict[str, str]]:
|
|
client = docker.from_env()
|
|
items: list[dict[str, str]] = []
|
|
for c in client.containers.list(all=True, filters={"name": "roleforge-worker-"}):
|
|
items.append(
|
|
{
|
|
"runtime_mode": "docker",
|
|
"runner_name": c.name,
|
|
"runner_id": c.id,
|
|
"status": c.status,
|
|
}
|
|
)
|
|
return items
|
|
|
|
|
|
def _list_k8s_runners() -> list[dict[str, str]]:
|
|
settings = get_settings()
|
|
_load_k8s_config()
|
|
core = k8s_client.CoreV1Api()
|
|
pods = core.list_namespaced_pod(
|
|
namespace=settings.app_k8s_namespace,
|
|
label_selector="app=roleforge-runner",
|
|
)
|
|
items: list[dict[str, str]] = []
|
|
for pod in pods.items:
|
|
items.append(
|
|
{
|
|
"runtime_mode": "k8s",
|
|
"runner_name": pod.metadata.name or "",
|
|
"runner_id": pod.metadata.uid or "",
|
|
"status": pod.status.phase if pod.status else "unknown",
|
|
}
|
|
)
|
|
return items
|
|
|
|
|
|
async def list_active_runners() -> list[dict[str, str]]:
|
|
docker_items = await asyncio.to_thread(_list_docker_runners)
|
|
try:
|
|
k8s_items = await asyncio.to_thread(_list_k8s_runners)
|
|
except Exception: # noqa: BLE001
|
|
k8s_items = []
|
|
return docker_items + k8s_items
|