logboard/app/api/v1/endpoints/websocket.py
2025-08-20 17:04:13 +03:00

309 lines
11 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
LogBoard+ - WebSocket API
Автор: Сергей Антропов
Сайт: https://devops.org.ru
"""
import asyncio
from typing import Optional
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query, Depends
from fastapi.responses import JSONResponse
from app.core.auth import verify_token, get_current_user
from app.core.docker import docker_client, DEFAULT_TAIL
from datetime import datetime
router = APIRouter()
@router.get("/status")
def api_websocket_status(current_user: str = Depends(get_current_user)):
"""Получить статус WebSocket соединений"""
try:
# Проверяем доступность Docker
docker_client.ping()
# Получаем список активных контейнеров
containers = docker_client.containers.list()
# Проверяем, есть ли контейнеры для подключения
if containers:
return {
"status": "available",
"message": "WebSocket соединения доступны",
"containers_count": len(containers),
"timestamp": datetime.now().isoformat()
}
else:
return {
"status": "no_containers",
"message": "Нет доступных контейнеров для подключения",
"containers_count": 0,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return {
"status": "error",
"message": f"Ошибка проверки WebSocket статуса: {str(e)}",
"containers_count": 0,
"timestamp": datetime.now().isoformat()
}
@router.websocket("/logs/{container_id}")
async def ws_logs(ws: WebSocket, container_id: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None,
service: Optional[str] = None, project: Optional[str] = None):
"""WebSocket для получения логов контейнера"""
# Принимаем соединение
await ws.accept()
# Проверяем токен
if not token:
await ws.send_text("ERROR: token required")
await ws.close()
return
username = verify_token(token)
if not username:
await ws.send_text("ERROR: invalid token")
await ws.close()
return
try:
# Простой поиск контейнера по ID
container = None
try:
for c in docker_client.containers.list(all=True):
if c.id.startswith(container_id):
container = c
break
except Exception as e:
await ws.send_text(f"ERROR: cannot list containers - {e}")
return
if container is None:
await ws.send_text("ERROR: container not found")
return
# Отправляем начальное сообщение
await ws.send_text(f"Connected to container: {container.name}")
# Получаем логи (только последние строки, без follow)
try:
print(f"Getting logs for container {container.name} (ID: {container.id[:12]})")
logs = container.logs(tail=tail).decode(errors="ignore")
if logs:
await ws.send_text(logs)
else:
await ws.send_text("No logs available")
except Exception as e:
print(f"Error getting logs for {container.name}: {e}")
await ws.send_text(f"ERROR getting logs: {e}")
# Простое WebSocket соединение - только отправляем логи один раз
print(f"WebSocket connection established for {container.name}")
except WebSocketDisconnect:
print(f"WebSocket client disconnected for container {container.name}")
except Exception as e:
print(f"WebSocket error for {container.name}: {e}")
try:
await ws.send_text(f"ERROR: {e}")
except:
pass
finally:
try:
print(f"Closing WebSocket connection for container {container.name}")
await ws.close()
except:
pass
@router.websocket("/fan/{service_name}")
async def ws_fan(ws: WebSocket, service_name: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None,
project: Optional[str] = None):
"""WebSocket: fan-in by compose service (aggregate all replicas), prefixing with short container id"""
await ws.accept()
# Проверяем токен
if not token:
await ws.send_text("ERROR: token required")
await ws.close()
return
username = verify_token(token)
if not username:
await ws.send_text("ERROR: invalid token")
await ws.close()
return
# Track active streaming tasks by container id
active = {}
def list_by_service(service_name: str, project_name: Optional[str] = None):
found = []
for c in docker_client.containers.list(all=True):
lbl = c.labels or {}
if lbl.get("com.docker.compose.service") == service_name and (project_name is None or lbl.get("com.docker.compose.project")==project_name):
found.append(c)
# sort by Created asc so first lines look ordered-ish
try:
found.sort(key=lambda x: x.attrs.get("Created",""))
except Exception:
pass
return found
async def stream_container(cont):
short = cont.id[:8]
first_tail = tail
while True:
try:
use_tail = first_tail
first_tail = 0
stream = cont.logs(stream=True, follow=True, tail=(use_tail if use_tail>0 else "all"))
for chunk in stream:
if chunk is None:
break
try:
await ws.send_text(f"[{short}] " + chunk.decode(errors="ignore"))
except RuntimeError:
stream.close(); return
stream.close()
# container stopped -> wait and try to find same id again; if gone, exit loop and outer watcher will reassign
await asyncio.sleep(1.0)
except WebSocketDisconnect:
return
except Exception:
await asyncio.sleep(1.0)
continue
async def watcher():
# Periodically reconcile set of containers
try:
while True:
desired = {c.id: c for c in list_by_service(service_name, project)}
# start missing
for cid, cont in desired.items():
if cid not in active:
task = asyncio.create_task(stream_container(cont))
active[cid] = task
# cancel removed
for cid in list(active.keys()):
if cid not in desired:
task = active.pop(cid)
task.cancel()
await asyncio.sleep(2.0)
except asyncio.CancelledError:
pass
watch_task = asyncio.create_task(watcher())
try:
# Keep ws open until disconnected; the tasks stream data
while True:
await asyncio.sleep(1.0)
except WebSocketDisconnect:
pass
finally:
watch_task.cancel()
for t in active.values():
t.cancel()
try:
await ws.close()
except Exception:
pass
@router.websocket("/fan_group")
async def ws_fan_group(ws: WebSocket, services: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None,
project: Optional[str] = None):
"""WebSocket: fan-in for multiple compose services (comma-separated), optional project filter."""
await ws.accept()
# Проверяем токен
if not token:
await ws.send_text("ERROR: token required")
await ws.close()
return
username = verify_token(token)
if not username:
await ws.send_text("ERROR: invalid token")
await ws.close()
return
svc_set = {s.strip() for s in services.split(",") if s.strip()}
if not svc_set:
await ws.send_text("ERROR: no services provided")
await ws.close(); return
active = {}
def list_by_services(names, project_name: Optional[str] = None):
res = []
for c in docker_client.containers.list(all=True):
lbl = c.labels or {}
if lbl.get("com.docker.compose.service") in names and (project_name is None or lbl.get("com.docker.compose.project")==project_name):
res.append(c)
try:
res.sort(key=lambda x: x.attrs.get("Created",""))
except Exception:
pass
return res
async def stream_container(cont):
short = cont.id[:8]
svc = (cont.labels or {}).get("com.docker.compose.service","")
first_tail = tail
while True:
try:
use_tail = first_tail
first_tail = 0
stream = cont.logs(stream=True, follow=True, tail=(use_tail if use_tail>0 else "all"))
for chunk in stream:
if chunk is None:
break
line = chunk.decode(errors="ignore")
try:
await ws.send_text(f"[{short} {svc}] " + line)
except RuntimeError:
stream.close(); return
stream.close()
await asyncio.sleep(1.0)
except WebSocketDisconnect:
return
except Exception:
await asyncio.sleep(1.0)
continue
async def watcher():
try:
while True:
desired = {c.id: c for c in list_by_services(svc_set, project)}
for cid, cont in desired.items():
if cid not in active:
task = asyncio.create_task(stream_container(cont))
active[cid] = task
for cid in list(active.keys()):
if cid not in desired:
task = active.pop(cid)
task.cancel()
await asyncio.sleep(2.0)
except asyncio.CancelledError:
pass
watch_task = asyncio.create_task(watcher())
try:
while True:
await asyncio.sleep(1.0)
except WebSocketDisconnect:
pass
finally:
watch_task.cancel()
for t in active.values():
t.cancel()
try:
await ws.close()
except Exception:
pass