import asyncio import logging from fastapi import APIRouter, WebSocket, WebSocketDisconnect from redis.asyncio import Redis from websockets.asyncio.client import connect as ws_connect from websockets.exceptions import ConnectionClosedError from app.core.config import get_settings from app.db.pool import get_pool router = APIRouter(tags=["realtime"]) _logger = logging.getLogger(__name__) # Default websockets library open_timeout is 10s — ephemeral runners often need longer to listen. RUNNER_WS_OPEN_TIMEOUT_SEC = 60.0 def _runner_http_url_to_ws_base(url: str) -> str: u = (url or "").strip() if u.startswith("https://"): return "wss://" + u[8:] if u.startswith("http://"): return "ws://" + u[7:] return u async def _send_ws_text_safe(websocket: WebSocket, text: str) -> None: try: await websocket.send_text(text) except Exception: # noqa: BLE001 — best-effort notice before closing pass async def pipe_runner_log_stream(websocket: WebSocket, runner_url: str, runner_run_id: str) -> None: """Accept client WS and proxy lines from RoleForge runner `/ws/runs/{id}` without crashing on timeouts.""" base = _runner_http_url_to_ws_base(runner_url).rstrip("/") uri = f"{base}/ws/runs/{runner_run_id}" await websocket.accept() try: async with ws_connect( uri, open_timeout=RUNNER_WS_OPEN_TIMEOUT_SEC, ping_interval=20, ping_timeout=120, close_timeout=10, ) as upstream: async for message in upstream: if isinstance(message, str): await websocket.send_text(message) else: await websocket.send_text(message.decode(errors="replace")) except WebSocketDisconnect: pass except asyncio.CancelledError: raise except TimeoutError: _logger.warning("Runner WebSocket connect timed out (%ss): %s", RUNNER_WS_OPEN_TIMEOUT_SEC, uri) await _send_ws_text_safe( websocket, "[roleforge] Runner log stream timed out — logs may still arrive via HTTP polling.", ) except OSError as exc: _logger.warning("Runner WebSocket unreachable %s: %s", uri, exc) await _send_ws_text_safe( websocket, f"[roleforge] Cannot reach runner log stream ({exc}). Try HTTP log polling.", ) except ConnectionClosedError as exc: _logger.info("Runner WebSocket closed abruptly (%s): %s", uri, exc) except Exception as exc: # noqa: BLE001 _logger.exception("Runner WebSocket proxy failed for %s", uri) await _send_ws_text_safe(websocket, f"[roleforge] Log stream error: {exc!s}") @router.websocket("/ws/jobs/{job_id}") async def ws_job_logs(websocket: WebSocket, job_id: str) -> None: pool = get_pool() async with pool.acquire() as conn: row = await conn.fetchrow( "select runner_url, runner_run_id from jobs where id=$1::uuid", job_id, ) if row and row["runner_url"] and row["runner_run_id"]: await pipe_runner_log_stream(websocket, str(row["runner_url"]), str(row["runner_run_id"])) return await websocket.accept() settings = get_settings() redis = Redis.from_url(settings.redis_url, decode_responses=True) pubsub = redis.pubsub() channel = f"job:{job_id}:logs" await pubsub.subscribe(channel) try: while True: message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0) if message and message.get("data"): await websocket.send_text(message["data"]) await asyncio.sleep(0.05) except WebSocketDisconnect: pass finally: await pubsub.unsubscribe(channel) await pubsub.close() await redis.close() @router.websocket("/ws/tests/{test_id}") async def ws_test_logs(websocket: WebSocket, test_id: str) -> None: pool = get_pool() async with pool.acquire() as conn: row = await conn.fetchrow( "select runner_url, runner_run_id from test_runs where id=$1::uuid", test_id, ) if row and row["runner_url"] and row["runner_run_id"]: await pipe_runner_log_stream(websocket, str(row["runner_url"]), str(row["runner_run_id"])) return await websocket.accept() settings = get_settings() redis = Redis.from_url(settings.redis_url, decode_responses=True) pubsub = redis.pubsub() channel = f"test:{test_id}:logs" await pubsub.subscribe(channel) try: while True: message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0) if message and message.get("data"): await websocket.send_text(message["data"]) await asyncio.sleep(0.05) except WebSocketDisconnect: pass finally: await pubsub.unsubscribe(channel) await pubsub.close() await redis.close()