Files
RoleForge/app/services/dynamic_worker.py
Sergey Antropoff 1d2301fb09 first commit
2026-04-30 08:59:31 +03:00

237 lines
8.4 KiB
Python

import asyncio
import logging
import re
import secrets
import docker
import httpx
from kubernetes import client as k8s_client
from kubernetes import config as k8s_config
from kubernetes.client.exceptions import ApiException
from app.core.config import get_settings
_logger = logging.getLogger(__name__)
def _runner_env(settings) -> dict[str, str]:
return {
"DATABASE_DSN": settings.database_dsn,
"REDIS_URL": settings.redis_url,
"CELERY_BROKER_URL": settings.celery_broker_url,
"CELERY_RESULT_BACKEND": settings.celery_result_backend,
"APP_SECRET_KEY": settings.app_secret_key,
"APP_ACCESS_TOKEN_EXPIRE_MIN": str(settings.app_access_token_expire_min),
"APP_REFRESH_TOKEN_EXPIRE_DAYS": str(settings.app_refresh_token_expire_days),
"APP_RUNNER_IMAGE": settings.app_runner_image,
"APP_DOCKER_NETWORK": settings.app_docker_network,
"APP_K8S_NAMESPACE": settings.app_k8s_namespace,
"APP_K8S_SERVICE_ACCOUNT": settings.app_k8s_service_account,
}
def _start_docker_runner(queue_name: str) -> tuple[str, str, str]:
settings = get_settings()
client = docker.from_env()
container_name = f"roleforge-worker-{queue_name.replace('.', '-')}"
volumes_cfg: dict[str, dict[str, str]] = {}
if settings.app_runner_mount_docker_socket:
volumes_cfg["/var/run/docker.sock"] = {"bind": "/var/run/docker.sock", "mode": "rw"}
kw: dict = {
"image": settings.app_runner_image,
"command": ["uvicorn", "app.runner.main:app", "--host", "0.0.0.0", "--port", "9000"],
"environment": _runner_env(settings),
"detach": True,
"auto_remove": True,
"network": settings.app_docker_network,
"name": container_name,
"volumes": volumes_cfg or None,
}
if settings.app_runner_privileged:
kw["privileged"] = True
kw = {k: v for k, v in kw.items() if v is not None}
container = client.containers.run(**kw)
_logger.info(
"Started ephemeral runner container %s on network %s (image=%s)",
container_name,
settings.app_docker_network,
settings.app_runner_image,
)
return container.id, container_name, f"http://{container_name}:9000"
def _sanitize_dns_name(value: str) -> str:
normalized = re.sub(r"[^a-z0-9-]", "-", value.lower())
normalized = re.sub(r"-+", "-", normalized).strip("-")
return normalized[:40] or "runner"
def _load_k8s_config() -> None:
try:
k8s_config.load_incluster_config()
except Exception: # noqa: BLE001
k8s_config.load_kube_config()
def _start_k8s_runner(queue_name: str) -> tuple[str, str, str]:
settings = get_settings()
_load_k8s_config()
core = k8s_client.CoreV1Api()
suffix = secrets.token_hex(3)
base_name = f"roleforge-runner-{_sanitize_dns_name(queue_name)}-{suffix}"
namespace = settings.app_k8s_namespace
labels = {"app": "roleforge-runner", "runner-name": base_name}
env = [k8s_client.V1EnvVar(name=k, value=v) for k, v in _runner_env(settings).items()]
container = k8s_client.V1Container(
name="runner",
image=settings.app_runner_image,
command=["uvicorn", "app.runner.main:app", "--host", "0.0.0.0", "--port", "9000"],
env=env,
ports=[k8s_client.V1ContainerPort(container_port=9000)],
)
pod = k8s_client.V1Pod(
metadata=k8s_client.V1ObjectMeta(name=base_name, labels=labels),
spec=k8s_client.V1PodSpec(
containers=[container],
restart_policy="Never",
service_account_name=settings.app_k8s_service_account,
),
)
service = k8s_client.V1Service(
metadata=k8s_client.V1ObjectMeta(name=base_name, labels=labels),
spec=k8s_client.V1ServiceSpec(
selector={"runner-name": base_name},
ports=[k8s_client.V1ServicePort(name="http", port=9000, target_port=9000)],
),
)
core.create_namespaced_pod(namespace=namespace, body=pod)
try:
core.create_namespaced_service(namespace=namespace, body=service)
except ApiException as exc:
if exc.status != 409:
raise
for _ in range(60):
current = core.read_namespaced_pod(name=base_name, namespace=namespace)
if current.status and current.status.phase == "Running":
conditions = current.status.conditions or []
if any(c.type == "Ready" and c.status == "True" for c in conditions):
break
if current.status and current.status.phase in {"Failed", "Succeeded"}:
raise RuntimeError(f"k8s runner pod terminated early: {current.status.phase}")
import time
time.sleep(1.0)
pod_uid = current.metadata.uid or base_name
base_url = f"http://{base_name}.{namespace}.svc.cluster.local:9000"
return pod_uid, base_name, base_url
async def launch_ephemeral_worker(queue_name: str, runtime_mode: str = "docker") -> tuple[str, str, str]:
starter = _start_k8s_runner if runtime_mode == "k8s" else _start_docker_runner
container_id, container_name, base_url = await asyncio.to_thread(starter, queue_name)
healthy = False
async with httpx.AsyncClient(timeout=3.0) as client:
for attempt in range(80):
try:
response = await client.get(f"{base_url}/healthz")
if response.status_code == 200:
healthy = True
break
except Exception: # noqa: BLE001
pass
await asyncio.sleep(0.5)
if not healthy:
_logger.error("Ephemeral runner never became healthy at %s (queue=%s)", base_url, queue_name)
terminator = _terminate_k8s_runner if runtime_mode == "k8s" else _terminate_docker_runner
await asyncio.to_thread(terminator, container_name)
raise RuntimeError(
f"Ephemeral runner did not respond at {base_url}/healthz. "
"Ensure the API container mounts /var/run/docker.sock, APP_DOCKER_NETWORK matches docker compose "
"(e.g. roleforge_default), and APP_RUNNER_IMAGE is runnable."
)
return container_id, container_name, base_url
def _terminate_docker_runner(container_name: str) -> None:
client = docker.from_env()
try:
container = client.containers.get(container_name)
except Exception: # noqa: BLE001
return
try:
container.stop(timeout=5)
except Exception: # noqa: BLE001
pass
def _terminate_k8s_runner(runner_name: str) -> None:
settings = get_settings()
_load_k8s_config()
core = k8s_client.CoreV1Api()
namespace = settings.app_k8s_namespace
try:
core.delete_namespaced_pod(name=runner_name, namespace=namespace, grace_period_seconds=0)
except ApiException as exc:
if exc.status != 404:
raise
try:
core.delete_namespaced_service(name=runner_name, namespace=namespace)
except ApiException as exc:
if exc.status != 404:
raise
async def terminate_ephemeral_worker(runner_name: str, runtime_mode: str) -> None:
terminator = _terminate_k8s_runner if runtime_mode == "k8s" else _terminate_docker_runner
await asyncio.to_thread(terminator, runner_name)
def _list_docker_runners() -> list[dict[str, str]]:
client = docker.from_env()
items: list[dict[str, str]] = []
for c in client.containers.list(all=True, filters={"name": "roleforge-worker-"}):
items.append(
{
"runtime_mode": "docker",
"runner_name": c.name,
"runner_id": c.id,
"status": c.status,
}
)
return items
def _list_k8s_runners() -> list[dict[str, str]]:
settings = get_settings()
_load_k8s_config()
core = k8s_client.CoreV1Api()
pods = core.list_namespaced_pod(
namespace=settings.app_k8s_namespace,
label_selector="app=roleforge-runner",
)
items: list[dict[str, str]] = []
for pod in pods.items:
items.append(
{
"runtime_mode": "k8s",
"runner_name": pod.metadata.name or "",
"runner_id": pod.metadata.uid or "",
"status": pod.status.phase if pod.status else "unknown",
}
)
return items
async def list_active_runners() -> list[dict[str, str]]:
docker_items = await asyncio.to_thread(_list_docker_runners)
try:
k8s_items = await asyncio.to_thread(_list_k8s_runners)
except Exception: # noqa: BLE001
k8s_items = []
return docker_items + k8s_items