#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ LogBoard+ - WebSocket API Автор: Сергей Антропов Сайт: https://devops.org.ru """ import asyncio from typing import Optional from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query, Depends from fastapi.responses import JSONResponse from core.auth import verify_token, get_current_user from core.docker import docker_client, DEFAULT_TAIL, get_remote_containers from core.logger import websocket_logger from datetime import datetime router = APIRouter() @router.get("/status") def api_websocket_status(current_user: str = Depends(get_current_user)): """Получить статус WebSocket соединений""" try: # Проверяем доступность Docker docker_client.ping() # Получаем список активных контейнеров containers = docker_client.containers.list() # Проверяем, есть ли контейнеры для подключения if containers: return { "status": "available", "message": "WebSocket соединения доступны", "containers_count": len(containers), "timestamp": datetime.now().isoformat() } else: return { "status": "no_containers", "message": "Нет доступных контейнеров для подключения", "containers_count": 0, "timestamp": datetime.now().isoformat() } except Exception as e: return { "status": "error", "message": f"Ошибка проверки WebSocket статуса: {str(e)}", "containers_count": 0, "timestamp": datetime.now().isoformat() } @router.websocket("/logs/{container_id}") async def ws_logs(ws: WebSocket, container_id: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None, service: Optional[str] = None, project: Optional[str] = None): """WebSocket для получения логов контейнера (локального или удаленного)""" # Принимаем соединение await ws.accept() # Проверяем токен if not token: await ws.send_text("ERROR: token required") await ws.close() return username = verify_token(token) if not username: await ws.send_text("ERROR: invalid token") await ws.close() return try: # Проверяем, является ли это удаленным контейнером if container_id.startswith('remote-'): # Обрабатываем удаленный контейнер parts = container_id.split('-', 2) if len(parts) != 3: await ws.send_text("ERROR: invalid remote container ID format") return hostname = parts[1] container_name = parts[2] # Отправляем начальное сообщение await ws.send_text(f"Connected to remote container: {container_name} on {hostname}") # Получаем логи удаленного контейнера try: from app.api.v1.endpoints.logs import get_remote_logs logs = get_remote_logs(container_id, tail=tail) if logs: await ws.send_text('\n'.join(logs)) else: await ws.send_text("No logs available for remote container") except Exception as e: await ws.send_text(f"ERROR: cannot get remote logs - {e}") return else: # Обрабатываем локальный контейнер container = None try: for c in docker_client.containers.list(all=True): if c.id.startswith(container_id): container = c break except Exception as e: await ws.send_text(f"ERROR: cannot list containers - {e}") return if container is None: await ws.send_text("ERROR: container not found") return # Отправляем начальное сообщение await ws.send_text(f"Connected to container: {container.name}") # Получаем логи (только последние строки, без follow) try: websocket_logger.info(f"Getting logs for container {container.name} (ID: {container.id[:12]})") logs = container.logs(tail=tail).decode(errors="ignore") if logs: await ws.send_text(logs) else: await ws.send_text("No logs available") except Exception as e: websocket_logger.error(f"Error getting logs for {container.name}: {e}") await ws.send_text(f"ERROR getting logs: {e}") # Простое WebSocket соединение - только отправляем логи один раз websocket_logger.info(f"WebSocket connection established for {container.name}") except WebSocketDisconnect: websocket_logger.info(f"WebSocket client disconnected") except Exception as e: websocket_logger.error(f"WebSocket error: {e}") try: await ws.send_text(f"ERROR: {e}") except: pass finally: try: websocket_logger.info(f"Closing WebSocket connection") await ws.close() except: pass @router.websocket("/fan/{service_name}") async def ws_fan(ws: WebSocket, service_name: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None, project: Optional[str] = None): """WebSocket: fan-in by compose service (aggregate all replicas), prefixing with short container id""" await ws.accept() # Проверяем токен if not token: await ws.send_text("ERROR: token required") await ws.close() return username = verify_token(token) if not username: await ws.send_text("ERROR: invalid token") await ws.close() return # Track active streaming tasks by container id active = {} def list_by_service(service_name: str, project_name: Optional[str] = None): found = [] for c in docker_client.containers.list(all=True): lbl = c.labels or {} if lbl.get("com.docker.compose.service") == service_name and (project_name is None or lbl.get("com.docker.compose.project")==project_name): found.append(c) # sort by Created asc so first lines look ordered-ish try: found.sort(key=lambda x: x.attrs.get("Created","")) except Exception: pass return found async def stream_container(cont): short = cont.id[:8] first_tail = tail while True: try: use_tail = first_tail first_tail = 0 stream = cont.logs(stream=True, follow=True, tail=(use_tail if use_tail>0 else "all")) for chunk in stream: if chunk is None: break try: await ws.send_text(f"[{short}] " + chunk.decode(errors="ignore")) except RuntimeError: stream.close(); return stream.close() # container stopped -> wait and try to find same id again; if gone, exit loop and outer watcher will reassign await asyncio.sleep(1.0) except WebSocketDisconnect: return except Exception: await asyncio.sleep(1.0) continue async def watcher(): # Periodically reconcile set of containers try: while True: desired = {c.id: c for c in list_by_service(service_name, project)} # start missing for cid, cont in desired.items(): if cid not in active: task = asyncio.create_task(stream_container(cont)) active[cid] = task # cancel removed for cid in list(active.keys()): if cid not in desired: task = active.pop(cid) task.cancel() await asyncio.sleep(2.0) except asyncio.CancelledError: pass watch_task = asyncio.create_task(watcher()) try: # Keep ws open until disconnected; the tasks stream data while True: await asyncio.sleep(1.0) except WebSocketDisconnect: pass finally: watch_task.cancel() for t in active.values(): t.cancel() try: await ws.close() except Exception: pass @router.websocket("/fan_group") async def ws_fan_group(ws: WebSocket, services: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None, project: Optional[str] = None): """WebSocket: fan-in for multiple compose services (comma-separated), optional project filter.""" await ws.accept() # Проверяем токен if not token: await ws.send_text("ERROR: token required") await ws.close() return username = verify_token(token) if not username: await ws.send_text("ERROR: invalid token") await ws.close() return svc_set = {s.strip() for s in services.split(",") if s.strip()} if not svc_set: await ws.send_text("ERROR: no services provided") await ws.close(); return active = {} def list_by_services(names, project_name: Optional[str] = None): res = [] for c in docker_client.containers.list(all=True): lbl = c.labels or {} if lbl.get("com.docker.compose.service") in names and (project_name is None or lbl.get("com.docker.compose.project")==project_name): res.append(c) try: res.sort(key=lambda x: x.attrs.get("Created","")) except Exception: pass return res async def stream_container(cont): short = cont.id[:8] svc = (cont.labels or {}).get("com.docker.compose.service","") first_tail = tail while True: try: use_tail = first_tail first_tail = 0 stream = cont.logs(stream=True, follow=True, tail=(use_tail if use_tail>0 else "all")) for chunk in stream: if chunk is None: break line = chunk.decode(errors="ignore") try: await ws.send_text(f"[{short} {svc}] " + line) except RuntimeError: stream.close(); return stream.close() await asyncio.sleep(1.0) except WebSocketDisconnect: return except Exception: await asyncio.sleep(1.0) continue async def watcher(): try: while True: desired = {c.id: c for c in list_by_services(svc_set, project)} for cid, cont in desired.items(): if cid not in active: task = asyncio.create_task(stream_container(cont)) active[cid] = task for cid in list(active.keys()): if cid not in desired: task = active.pop(cid) task.cancel() await asyncio.sleep(2.0) except asyncio.CancelledError: pass watch_task = asyncio.create_task(watcher()) try: while True: await asyncio.sleep(1.0) except WebSocketDisconnect: pass finally: watch_task.cancel() for t in active.values(): t.cancel() try: await ws.close() except Exception: pass