- Исправлена ошибка 'obj is not defined' в WebSocket onopen - Упрощена логика обработки WebSocket сообщений (убрано дублирование) - Исправлен вызов addSectionToggleHandlers в buildTabs - WebSocket теперь работает в реальном времени с follow=True - Убрана избыточная логика проверки дублирования строк Изменения в WebSocket: - Добавлен потоковый режим с follow=True для локальных контейнеров - Улучшена обработка ошибок - Убрано автоматическое закрытие соединения Изменения в JavaScript: - Исправлена ошибка с неопределенной переменной obj - Упрощена обработка WebSocket сообщений - Улучшена стабильность работы Автор: Сергей Антропов Сайт: https://devops.org.ru
354 lines
13 KiB
Python
354 lines
13 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
LogBoard+ - WebSocket API
|
||
Автор: Сергей Антропов
|
||
Сайт: https://devops.org.ru
|
||
"""
|
||
|
||
import asyncio
|
||
from typing import Optional
|
||
|
||
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query, Depends
|
||
from fastapi.responses import JSONResponse
|
||
|
||
from core.auth import verify_token, get_current_user
|
||
from core.docker import docker_client, DEFAULT_TAIL, get_remote_containers
|
||
from core.logger import websocket_logger
|
||
from datetime import datetime
|
||
|
||
router = APIRouter()
|
||
|
||
@router.get("/status")
|
||
def api_websocket_status(current_user: str = Depends(get_current_user)):
|
||
"""Получить статус WebSocket соединений"""
|
||
try:
|
||
# Проверяем доступность Docker
|
||
docker_client.ping()
|
||
|
||
# Получаем список активных контейнеров
|
||
containers = docker_client.containers.list()
|
||
|
||
# Проверяем, есть ли контейнеры для подключения
|
||
if containers:
|
||
return {
|
||
"status": "available",
|
||
"message": "WebSocket соединения доступны",
|
||
"containers_count": len(containers),
|
||
"timestamp": datetime.now().isoformat()
|
||
}
|
||
else:
|
||
return {
|
||
"status": "no_containers",
|
||
"message": "Нет доступных контейнеров для подключения",
|
||
"containers_count": 0,
|
||
"timestamp": datetime.now().isoformat()
|
||
}
|
||
|
||
except Exception as e:
|
||
return {
|
||
"status": "error",
|
||
"message": f"Ошибка проверки WebSocket статуса: {str(e)}",
|
||
"containers_count": 0,
|
||
"timestamp": datetime.now().isoformat()
|
||
}
|
||
|
||
@router.websocket("/logs/{container_id}")
|
||
async def ws_logs(ws: WebSocket, container_id: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None,
|
||
service: Optional[str] = None, project: Optional[str] = None):
|
||
"""WebSocket для получения логов контейнера (локального или удаленного)"""
|
||
|
||
# Принимаем соединение
|
||
await ws.accept()
|
||
|
||
# Проверяем токен
|
||
if not token:
|
||
await ws.send_text("ERROR: token required")
|
||
await ws.close()
|
||
return
|
||
|
||
username = verify_token(token)
|
||
if not username:
|
||
await ws.send_text("ERROR: invalid token")
|
||
await ws.close()
|
||
return
|
||
|
||
try:
|
||
# Проверяем, является ли это удаленным контейнером
|
||
if container_id.startswith('remote-'):
|
||
# Обрабатываем удаленный контейнер
|
||
parts = container_id.split('-', 2)
|
||
if len(parts) != 3:
|
||
await ws.send_text("ERROR: invalid remote container ID format")
|
||
return
|
||
|
||
hostname = parts[1]
|
||
container_name = parts[2]
|
||
|
||
# Отправляем начальное сообщение
|
||
await ws.send_text(f"Connected to remote container: {container_name} on {hostname}")
|
||
|
||
# Получаем логи удаленного контейнера
|
||
try:
|
||
from app.api.v1.endpoints.logs import get_remote_logs
|
||
logs = get_remote_logs(container_id, tail=tail)
|
||
if logs:
|
||
await ws.send_text('\n'.join(logs))
|
||
else:
|
||
await ws.send_text("No logs available for remote container")
|
||
|
||
# Для удаленных контейнеров пока просто отправляем начальные логи
|
||
# TODO: Реализовать мониторинг файлов логов в реальном времени
|
||
websocket_logger.info(f"Remote WebSocket connection established for {container_name} on {hostname}")
|
||
|
||
except Exception as e:
|
||
await ws.send_text(f"ERROR: cannot get remote logs - {e}")
|
||
return
|
||
|
||
else:
|
||
# Обрабатываем локальный контейнер
|
||
container = None
|
||
try:
|
||
for c in docker_client.containers.list(all=True):
|
||
if c.id.startswith(container_id):
|
||
container = c
|
||
break
|
||
except Exception as e:
|
||
await ws.send_text(f"ERROR: cannot list containers - {e}")
|
||
return
|
||
|
||
if container is None:
|
||
await ws.send_text("ERROR: container not found")
|
||
return
|
||
|
||
# Отправляем начальное сообщение
|
||
await ws.send_text(f"Connected to container: {container.name}")
|
||
|
||
# Отправляем начальные логи
|
||
try:
|
||
websocket_logger.info(f"Getting logs for container {container.name} (ID: {container.id[:12]})")
|
||
initial_logs = container.logs(tail=tail).decode(errors="ignore")
|
||
if initial_logs:
|
||
await ws.send_text(initial_logs)
|
||
else:
|
||
await ws.send_text("No logs available")
|
||
except Exception as e:
|
||
websocket_logger.error(f"Error getting initial logs for {container.name}: {e}")
|
||
await ws.send_text(f"ERROR getting initial logs: {e}")
|
||
|
||
# Устанавливаем потоковое соединение для получения новых логов
|
||
websocket_logger.info(f"WebSocket connection established for {container.name}")
|
||
|
||
try:
|
||
# Получаем логи в реальном времени с follow=True
|
||
stream = container.logs(stream=True, follow=True, tail=0)
|
||
for chunk in stream:
|
||
if chunk is None:
|
||
break
|
||
try:
|
||
await ws.send_text(chunk.decode(errors="ignore"))
|
||
except WebSocketDisconnect:
|
||
stream.close()
|
||
return
|
||
except Exception as e:
|
||
websocket_logger.error(f"Error streaming logs for {container.name}: {e}")
|
||
break
|
||
stream.close()
|
||
except Exception as e:
|
||
websocket_logger.error(f"Error setting up log stream for {container.name}: {e}")
|
||
|
||
except WebSocketDisconnect:
|
||
websocket_logger.info(f"WebSocket client disconnected")
|
||
except Exception as e:
|
||
websocket_logger.error(f"WebSocket error: {e}")
|
||
try:
|
||
await ws.send_text(f"ERROR: {e}")
|
||
except:
|
||
pass
|
||
|
||
@router.websocket("/fan/{service_name}")
|
||
async def ws_fan(ws: WebSocket, service_name: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None,
|
||
project: Optional[str] = None):
|
||
"""WebSocket: fan-in by compose service (aggregate all replicas), prefixing with short container id"""
|
||
await ws.accept()
|
||
|
||
# Проверяем токен
|
||
if not token:
|
||
await ws.send_text("ERROR: token required")
|
||
await ws.close()
|
||
return
|
||
|
||
username = verify_token(token)
|
||
if not username:
|
||
await ws.send_text("ERROR: invalid token")
|
||
await ws.close()
|
||
return
|
||
|
||
# Track active streaming tasks by container id
|
||
active = {}
|
||
|
||
def list_by_service(service_name: str, project_name: Optional[str] = None):
|
||
found = []
|
||
for c in docker_client.containers.list(all=True):
|
||
lbl = c.labels or {}
|
||
if lbl.get("com.docker.compose.service") == service_name and (project_name is None or lbl.get("com.docker.compose.project")==project_name):
|
||
found.append(c)
|
||
# sort by Created asc so first lines look ordered-ish
|
||
try:
|
||
found.sort(key=lambda x: x.attrs.get("Created",""))
|
||
except Exception:
|
||
pass
|
||
return found
|
||
|
||
async def stream_container(cont):
|
||
short = cont.id[:8]
|
||
first_tail = tail
|
||
while True:
|
||
try:
|
||
use_tail = first_tail
|
||
first_tail = 0
|
||
stream = cont.logs(stream=True, follow=True, tail=(use_tail if use_tail>0 else "all"))
|
||
for chunk in stream:
|
||
if chunk is None:
|
||
break
|
||
try:
|
||
await ws.send_text(f"[{short}] " + chunk.decode(errors="ignore"))
|
||
except RuntimeError:
|
||
stream.close(); return
|
||
stream.close()
|
||
# container stopped -> wait and try to find same id again; if gone, exit loop and outer watcher will reassign
|
||
await asyncio.sleep(1.0)
|
||
except WebSocketDisconnect:
|
||
return
|
||
except Exception:
|
||
await asyncio.sleep(1.0)
|
||
continue
|
||
|
||
async def watcher():
|
||
# Periodically reconcile set of containers
|
||
try:
|
||
while True:
|
||
desired = {c.id: c for c in list_by_service(service_name, project)}
|
||
# start missing
|
||
for cid, cont in desired.items():
|
||
if cid not in active:
|
||
task = asyncio.create_task(stream_container(cont))
|
||
active[cid] = task
|
||
# cancel removed
|
||
for cid in list(active.keys()):
|
||
if cid not in desired:
|
||
task = active.pop(cid)
|
||
task.cancel()
|
||
await asyncio.sleep(2.0)
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
watch_task = asyncio.create_task(watcher())
|
||
try:
|
||
# Keep ws open until disconnected; the tasks stream data
|
||
while True:
|
||
await asyncio.sleep(1.0)
|
||
except WebSocketDisconnect:
|
||
pass
|
||
finally:
|
||
watch_task.cancel()
|
||
for t in active.values():
|
||
t.cancel()
|
||
try:
|
||
await ws.close()
|
||
except Exception:
|
||
pass
|
||
|
||
@router.websocket("/fan_group")
|
||
async def ws_fan_group(ws: WebSocket, services: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None,
|
||
project: Optional[str] = None):
|
||
"""WebSocket: fan-in for multiple compose services (comma-separated), optional project filter."""
|
||
await ws.accept()
|
||
|
||
# Проверяем токен
|
||
if not token:
|
||
await ws.send_text("ERROR: token required")
|
||
await ws.close()
|
||
return
|
||
|
||
username = verify_token(token)
|
||
if not username:
|
||
await ws.send_text("ERROR: invalid token")
|
||
await ws.close()
|
||
return
|
||
|
||
svc_set = {s.strip() for s in services.split(",") if s.strip()}
|
||
if not svc_set:
|
||
await ws.send_text("ERROR: no services provided")
|
||
await ws.close(); return
|
||
|
||
active = {}
|
||
|
||
def list_by_services(names, project_name: Optional[str] = None):
|
||
res = []
|
||
for c in docker_client.containers.list(all=True):
|
||
lbl = c.labels or {}
|
||
if lbl.get("com.docker.compose.service") in names and (project_name is None or lbl.get("com.docker.compose.project")==project_name):
|
||
res.append(c)
|
||
try:
|
||
res.sort(key=lambda x: x.attrs.get("Created",""))
|
||
except Exception:
|
||
pass
|
||
return res
|
||
|
||
async def stream_container(cont):
|
||
short = cont.id[:8]
|
||
svc = (cont.labels or {}).get("com.docker.compose.service","")
|
||
first_tail = tail
|
||
while True:
|
||
try:
|
||
use_tail = first_tail
|
||
first_tail = 0
|
||
stream = cont.logs(stream=True, follow=True, tail=(use_tail if use_tail>0 else "all"))
|
||
for chunk in stream:
|
||
if chunk is None:
|
||
break
|
||
line = chunk.decode(errors="ignore")
|
||
try:
|
||
await ws.send_text(f"[{short} {svc}] " + line)
|
||
except RuntimeError:
|
||
stream.close(); return
|
||
stream.close()
|
||
await asyncio.sleep(1.0)
|
||
except WebSocketDisconnect:
|
||
return
|
||
except Exception:
|
||
await asyncio.sleep(1.0)
|
||
continue
|
||
|
||
async def watcher():
|
||
try:
|
||
while True:
|
||
desired = {c.id: c for c in list_by_services(svc_set, project)}
|
||
for cid, cont in desired.items():
|
||
if cid not in active:
|
||
task = asyncio.create_task(stream_container(cont))
|
||
active[cid] = task
|
||
for cid in list(active.keys()):
|
||
if cid not in desired:
|
||
task = active.pop(cid)
|
||
task.cancel()
|
||
await asyncio.sleep(2.0)
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
watch_task = asyncio.create_task(watcher())
|
||
try:
|
||
while True:
|
||
await asyncio.sleep(1.0)
|
||
except WebSocketDisconnect:
|
||
pass
|
||
finally:
|
||
watch_task.cancel()
|
||
for t in active.values():
|
||
t.cancel()
|
||
try:
|
||
await ws.close()
|
||
except Exception:
|
||
pass
|