139 lines
4.9 KiB
Python
139 lines
4.9 KiB
Python
import asyncio
|
|
import logging
|
|
|
|
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
|
|
from redis.asyncio import Redis
|
|
from websockets.asyncio.client import connect as ws_connect
|
|
from websockets.exceptions import ConnectionClosedError
|
|
|
|
from app.core.config import get_settings
|
|
from app.db.pool import get_pool
|
|
|
|
router = APIRouter(tags=["realtime"])
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
# Default websockets library open_timeout is 10s — ephemeral runners often need longer to listen.
|
|
RUNNER_WS_OPEN_TIMEOUT_SEC = 60.0
|
|
|
|
|
|
def _runner_http_url_to_ws_base(url: str) -> str:
|
|
u = (url or "").strip()
|
|
if u.startswith("https://"):
|
|
return "wss://" + u[8:]
|
|
if u.startswith("http://"):
|
|
return "ws://" + u[7:]
|
|
return u
|
|
|
|
|
|
async def _send_ws_text_safe(websocket: WebSocket, text: str) -> None:
|
|
try:
|
|
await websocket.send_text(text)
|
|
except Exception: # noqa: BLE001 — best-effort notice before closing
|
|
pass
|
|
|
|
|
|
async def pipe_runner_log_stream(websocket: WebSocket, runner_url: str, runner_run_id: str) -> None:
|
|
"""Accept client WS and proxy lines from RoleForge runner `/ws/runs/{id}` without crashing on timeouts."""
|
|
base = _runner_http_url_to_ws_base(runner_url).rstrip("/")
|
|
uri = f"{base}/ws/runs/{runner_run_id}"
|
|
await websocket.accept()
|
|
try:
|
|
async with ws_connect(
|
|
uri,
|
|
open_timeout=RUNNER_WS_OPEN_TIMEOUT_SEC,
|
|
ping_interval=20,
|
|
ping_timeout=120,
|
|
close_timeout=10,
|
|
) as upstream:
|
|
async for message in upstream:
|
|
if isinstance(message, str):
|
|
await websocket.send_text(message)
|
|
else:
|
|
await websocket.send_text(message.decode(errors="replace"))
|
|
except WebSocketDisconnect:
|
|
pass
|
|
except asyncio.CancelledError:
|
|
raise
|
|
except TimeoutError:
|
|
_logger.warning("Runner WebSocket connect timed out (%ss): %s", RUNNER_WS_OPEN_TIMEOUT_SEC, uri)
|
|
await _send_ws_text_safe(
|
|
websocket,
|
|
"[roleforge] Runner log stream timed out — logs may still arrive via HTTP polling.",
|
|
)
|
|
except OSError as exc:
|
|
_logger.warning("Runner WebSocket unreachable %s: %s", uri, exc)
|
|
await _send_ws_text_safe(
|
|
websocket,
|
|
f"[roleforge] Cannot reach runner log stream ({exc}). Try HTTP log polling.",
|
|
)
|
|
except ConnectionClosedError as exc:
|
|
_logger.info("Runner WebSocket closed abruptly (%s): %s", uri, exc)
|
|
except Exception as exc: # noqa: BLE001
|
|
_logger.exception("Runner WebSocket proxy failed for %s", uri)
|
|
await _send_ws_text_safe(websocket, f"[roleforge] Log stream error: {exc!s}")
|
|
|
|
|
|
@router.websocket("/ws/jobs/{job_id}")
|
|
async def ws_job_logs(websocket: WebSocket, job_id: str) -> None:
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow(
|
|
"select runner_url, runner_run_id from jobs where id=$1::uuid",
|
|
job_id,
|
|
)
|
|
if row and row["runner_url"] and row["runner_run_id"]:
|
|
await pipe_runner_log_stream(websocket, str(row["runner_url"]), str(row["runner_run_id"]))
|
|
return
|
|
|
|
await websocket.accept()
|
|
settings = get_settings()
|
|
redis = Redis.from_url(settings.redis_url, decode_responses=True)
|
|
pubsub = redis.pubsub()
|
|
channel = f"job:{job_id}:logs"
|
|
await pubsub.subscribe(channel)
|
|
try:
|
|
while True:
|
|
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
|
|
if message and message.get("data"):
|
|
await websocket.send_text(message["data"])
|
|
await asyncio.sleep(0.05)
|
|
except WebSocketDisconnect:
|
|
pass
|
|
finally:
|
|
await pubsub.unsubscribe(channel)
|
|
await pubsub.close()
|
|
await redis.close()
|
|
|
|
|
|
@router.websocket("/ws/tests/{test_id}")
|
|
async def ws_test_logs(websocket: WebSocket, test_id: str) -> None:
|
|
pool = get_pool()
|
|
async with pool.acquire() as conn:
|
|
row = await conn.fetchrow(
|
|
"select runner_url, runner_run_id from test_runs where id=$1::uuid",
|
|
test_id,
|
|
)
|
|
if row and row["runner_url"] and row["runner_run_id"]:
|
|
await pipe_runner_log_stream(websocket, str(row["runner_url"]), str(row["runner_run_id"]))
|
|
return
|
|
|
|
await websocket.accept()
|
|
settings = get_settings()
|
|
redis = Redis.from_url(settings.redis_url, decode_responses=True)
|
|
pubsub = redis.pubsub()
|
|
channel = f"test:{test_id}:logs"
|
|
await pubsub.subscribe(channel)
|
|
try:
|
|
while True:
|
|
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
|
|
if message and message.get("data"):
|
|
await websocket.send_text(message["data"])
|
|
await asyncio.sleep(0.05)
|
|
except WebSocketDisconnect:
|
|
pass
|
|
finally:
|
|
await pubsub.unsubscribe(channel)
|
|
await pubsub.close()
|
|
await redis.close()
|