Files
RoleForge/app/routers/ws.py
Sergey Antropoff 1d2301fb09 first commit
2026-04-30 08:59:31 +03:00

139 lines
4.9 KiB
Python

import asyncio
import logging
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from redis.asyncio import Redis
from websockets.asyncio.client import connect as ws_connect
from websockets.exceptions import ConnectionClosedError
from app.core.config import get_settings
from app.db.pool import get_pool
router = APIRouter(tags=["realtime"])
_logger = logging.getLogger(__name__)
# Default websockets library open_timeout is 10s — ephemeral runners often need longer to listen.
RUNNER_WS_OPEN_TIMEOUT_SEC = 60.0
def _runner_http_url_to_ws_base(url: str) -> str:
u = (url or "").strip()
if u.startswith("https://"):
return "wss://" + u[8:]
if u.startswith("http://"):
return "ws://" + u[7:]
return u
async def _send_ws_text_safe(websocket: WebSocket, text: str) -> None:
try:
await websocket.send_text(text)
except Exception: # noqa: BLE001 — best-effort notice before closing
pass
async def pipe_runner_log_stream(websocket: WebSocket, runner_url: str, runner_run_id: str) -> None:
"""Accept client WS and proxy lines from RoleForge runner `/ws/runs/{id}` without crashing on timeouts."""
base = _runner_http_url_to_ws_base(runner_url).rstrip("/")
uri = f"{base}/ws/runs/{runner_run_id}"
await websocket.accept()
try:
async with ws_connect(
uri,
open_timeout=RUNNER_WS_OPEN_TIMEOUT_SEC,
ping_interval=20,
ping_timeout=120,
close_timeout=10,
) as upstream:
async for message in upstream:
if isinstance(message, str):
await websocket.send_text(message)
else:
await websocket.send_text(message.decode(errors="replace"))
except WebSocketDisconnect:
pass
except asyncio.CancelledError:
raise
except TimeoutError:
_logger.warning("Runner WebSocket connect timed out (%ss): %s", RUNNER_WS_OPEN_TIMEOUT_SEC, uri)
await _send_ws_text_safe(
websocket,
"[roleforge] Runner log stream timed out — logs may still arrive via HTTP polling.",
)
except OSError as exc:
_logger.warning("Runner WebSocket unreachable %s: %s", uri, exc)
await _send_ws_text_safe(
websocket,
f"[roleforge] Cannot reach runner log stream ({exc}). Try HTTP log polling.",
)
except ConnectionClosedError as exc:
_logger.info("Runner WebSocket closed abruptly (%s): %s", uri, exc)
except Exception as exc: # noqa: BLE001
_logger.exception("Runner WebSocket proxy failed for %s", uri)
await _send_ws_text_safe(websocket, f"[roleforge] Log stream error: {exc!s}")
@router.websocket("/ws/jobs/{job_id}")
async def ws_job_logs(websocket: WebSocket, job_id: str) -> None:
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"select runner_url, runner_run_id from jobs where id=$1::uuid",
job_id,
)
if row and row["runner_url"] and row["runner_run_id"]:
await pipe_runner_log_stream(websocket, str(row["runner_url"]), str(row["runner_run_id"]))
return
await websocket.accept()
settings = get_settings()
redis = Redis.from_url(settings.redis_url, decode_responses=True)
pubsub = redis.pubsub()
channel = f"job:{job_id}:logs"
await pubsub.subscribe(channel)
try:
while True:
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
if message and message.get("data"):
await websocket.send_text(message["data"])
await asyncio.sleep(0.05)
except WebSocketDisconnect:
pass
finally:
await pubsub.unsubscribe(channel)
await pubsub.close()
await redis.close()
@router.websocket("/ws/tests/{test_id}")
async def ws_test_logs(websocket: WebSocket, test_id: str) -> None:
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow(
"select runner_url, runner_run_id from test_runs where id=$1::uuid",
test_id,
)
if row and row["runner_url"] and row["runner_run_id"]:
await pipe_runner_log_stream(websocket, str(row["runner_url"]), str(row["runner_run_id"]))
return
await websocket.accept()
settings = get_settings()
redis = Redis.from_url(settings.redis_url, decode_responses=True)
pubsub = redis.pubsub()
channel = f"test:{test_id}:logs"
await pubsub.subscribe(channel)
try:
while True:
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
if message and message.get("data"):
await websocket.send_text(message["data"])
await asyncio.sleep(0.05)
except WebSocketDisconnect:
pass
finally:
await pubsub.unsubscribe(channel)
await pubsub.close()
await redis.close()