#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ LogBoard+ - Веб-панель для просмотра логов микросервисов Автор: Сергей Антропов Сайт: https://devops.org.ru """ import asyncio import base64 import json import os import re import secrets from datetime import datetime, timedelta from typing import Optional, List, Dict, Union from pathlib import Path import docker from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query, Depends, HTTPException, status, Body, Request, Response from fastapi.exceptions import RequestValidationError from starlette.exceptions import HTTPException as StarletteHTTPException from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse, RedirectResponse from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from pydantic import BaseModel import jwt from passlib.context import CryptContext # Настройки приложения APP_PORT = int(os.getenv("LOGBOARD_PORT", "9001")) DEFAULT_TAIL = int(os.getenv("LOGBOARD_TAIL", "500")) DEFAULT_PROJECT = os.getenv("COMPOSE_PROJECT_NAME") DEFAULT_PROJECTS = os.getenv("LOGBOARD_PROJECTS") SKIP_UNHEALTHY = os.getenv("LOGBOARD_SKIP_UNHEALTHY", "true").lower() == "true" CONTAINER_LIST_TIMEOUT = int(os.getenv("LOGBOARD_CONTAINER_LIST_TIMEOUT", "10")) CONTAINER_INFO_TIMEOUT = int(os.getenv("LOGBOARD_CONTAINER_INFO_TIMEOUT", "3")) HEALTH_CHECK_TIMEOUT = int(os.getenv("LOGBOARD_HEALTH_CHECK_TIMEOUT", "2")) # Настройки безопасности SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key-here-change-in-production") ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("SESSION_TIMEOUT", "3600")) // 60 # 1 час по умолчанию # Настройки пользователей ADMIN_USERNAME = os.getenv("LOGBOARD_USER", "admin") ADMIN_PASSWORD = os.getenv("LOGBOARD_PASS", "admin") # Инициализация FastAPI app = FastAPI( title="LogBoard+", description="Веб-панель для просмотра логов микросервисов", version="1.0.0" ) # Обработчики исключений @app.exception_handler(404) async def not_found_handler(request: Request, exc: HTTPException): """Обработчик ошибки 404 - Страница не найдена""" return templates.TemplateResponse("error.html", { "request": request, "error_code": 404, "error_title": "Страница не найдена", "error_message": "Запрашиваемая страница не существует или была перемещена." }, status_code=404) @app.exception_handler(401) async def unauthorized_handler(request: Request, exc: HTTPException): """Обработчик ошибки 401 - Не авторизован""" return templates.TemplateResponse("error.html", { "request": request, "error_code": 401, "error_title": "Требуется авторизация", "error_message": "Для доступа к этой странице необходимо войти в систему." }, status_code=401) @app.exception_handler(403) async def forbidden_handler(request: Request, exc: HTTPException): """Обработчик ошибки 403 - Доступ запрещен""" return templates.TemplateResponse("error.html", { "request": request, "error_code": 403, "error_title": "Доступ запрещен", "error_message": "У вас нет прав для доступа к этой странице." }, status_code=403) @app.exception_handler(500) async def internal_server_error_handler(request: Request, exc: HTTPException): """Обработчик ошибки 500 - Внутренняя ошибка сервера""" return templates.TemplateResponse("error.html", { "request": request, "error_code": 500, "error_title": "Внутренняя ошибка сервера", "error_message": "Произошла непредвиденная ошибка. Попробуйте обновить страницу или обратитесь к администратору." }, status_code=500) @app.exception_handler(HTTPException) async def http_exception_handler(request: Request, exc: HTTPException): """Общий обработчик HTTP исключений""" # Для API маршрутов возвращаем JSON ответ if request.url.path.startswith("/api/"): if exc.status_code == 401: return JSONResponse( status_code=401, content={ "error": "unauthorized", "message": "Требуется авторизация", "details": "Для доступа к этому API необходимо войти в систему." }, headers={"WWW-Authenticate": "Bearer"} ) elif exc.status_code == 403: return JSONResponse( status_code=403, content={ "error": "forbidden", "message": "Доступ запрещен", "details": "У вас нет прав для доступа к этому API." } ) else: return JSONResponse( status_code=exc.status_code, content={ "error": f"http_{exc.status_code}", "message": exc.detail or "Произошла ошибка при обработке запроса.", "details": f"URL: {request.url.path}" } ) # Для обычных страниц возвращаем HTML if exc.status_code == 401: title = "Требуется авторизация" message = "Для доступа к этой странице необходимо войти в систему." elif exc.status_code == 403: title = "Доступ запрещен" message = "У вас нет прав для доступа к этой странице." elif exc.status_code == 404: title = "Страница не найдена" message = "Запрашиваемая страница не существует или была перемещена." elif exc.status_code == 500: title = "Внутренняя ошибка сервера" message = "Произошла непредвиденная ошибка. Попробуйте обновить страницу или обратитесь к администратору." else: title = f"Ошибка {exc.status_code}" message = exc.detail or "Произошла ошибка при обработке запроса." return templates.TemplateResponse("error.html", { "request": request, "error_code": exc.status_code, "error_title": title, "error_message": message }, status_code=exc.status_code) @app.exception_handler(StarletteHTTPException) async def starlette_http_exception_handler(request: Request, exc: StarletteHTTPException): """Обработчик Starlette HTTP исключений (включая ошибки безопасности)""" # Для API маршрутов возвращаем JSON ответ if request.url.path.startswith("/api/"): if exc.status_code == 401: return JSONResponse( status_code=401, content={ "error": "unauthorized", "message": "Требуется авторизация", "details": "Для доступа к этому API необходимо войти в систему." }, headers={"WWW-Authenticate": "Bearer"} ) elif exc.status_code == 403: return JSONResponse( status_code=403, content={ "error": "forbidden", "message": "Доступ запрещен", "details": "У вас нет прав для доступа к этому API." } ) else: return JSONResponse( status_code=exc.status_code, content={ "error": f"http_{exc.status_code}", "message": exc.detail or "Произошла ошибка при обработке запроса.", "details": f"URL: {request.url.path}" } ) # Для обычных страниц возвращаем HTML if exc.status_code == 401: title = "Требуется авторизация" message = "Для доступа к этой странице необходимо войти в систему." elif exc.status_code == 403: title = "Доступ запрещен" message = "У вас нет прав для доступа к этой странице." elif exc.status_code == 404: title = "Страница не найдена" message = "Запрашиваемая страница не существует или была перемещена." elif exc.status_code == 500: title = "Внутренняя ошибка сервера" message = "Произошла непредвиденная ошибка. Попробуйте обновить страницу или обратитесь к администратору." else: title = f"Ошибка {exc.status_code}" message = exc.detail or "Произошла ошибка при обработке запроса." return templates.TemplateResponse("error.html", { "request": request, "error_code": exc.status_code, "error_title": title, "error_message": message }, status_code=exc.status_code) @app.exception_handler(Exception) async def general_exception_handler(request: Request, exc: Exception): """Общий обработчик всех исключений""" import traceback # Логируем ошибку print(f"❌ Необработанная ошибка: {exc}") print(f"❌ URL: {request.url.path}") print(f"❌ Traceback: {traceback.format_exc()}") return templates.TemplateResponse("error.html", { "request": request, "error_code": 500, "error_title": "Внутренняя ошибка сервера", "error_message": "Произошла непредвиденная ошибка. Попробуйте обновить страницу или обратитесь к администратору." }, status_code=500) # Инициализация шаблонов templates = Jinja2Templates(directory="templates") # Инициализация безопасности security = HTTPBearer() pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # Инициализация Docker клиента docker_client = docker.from_env() # serve snapshots directory (downloadable files) SNAP_DIR = os.getenv("LOGBOARD_SNAPSHOT_DIR", "./snapshots") os.makedirs(SNAP_DIR, exist_ok=True) app.mount("/snapshots", StaticFiles(directory=SNAP_DIR), name="snapshots") # Модели данных class UserLogin(BaseModel): username: str password: str class Token(BaseModel): access_token: str token_type: str class TokenData(BaseModel): username: Optional[str] = None # Функции для работы с паролями def verify_password(plain_password: str, hashed_password: str) -> bool: """Проверяет пароль""" return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: """Хеширует пароль""" return pwd_context.hash(password) # Функции для работы с JWT токенами def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): """Создает JWT токен""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def verify_token(token: str) -> Optional[str]: """Проверяет JWT токен и возвращает имя пользователя""" try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: return None return username except jwt.PyJWTError: return None # Функция для проверки аутентификации async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str: """Получает текущего пользователя из токена""" token = credentials.credentials username = verify_token(token) if username is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Недействительный токен. Требуется авторизация.", headers={"WWW-Authenticate": "Bearer"}, ) return username # Функция для проверки пользователя def authenticate_user(username: str, password: str) -> bool: """Аутентифицирует пользователя""" if username == ADMIN_USERNAME: # Для простоты используем прямое сравнение паролей # В продакшене рекомендуется использовать хешированные пароли return password == ADMIN_PASSWORD return False # ---------- DOCKER HELPERS ---------- def load_excluded_containers() -> List[str]: """ Загружает список исключенных контейнеров из JSON файла Автор: Сергей Антропов Сайт: https://devops.org.ru """ try: with open("excluded_containers.json", "r", encoding="utf-8") as f: data = json.load(f) return data.get("excluded_containers", []) except FileNotFoundError: print("⚠️ Файл excluded_containers.json не найден, используем пустой список") return [] except json.JSONDecodeError as e: print(f"❌ Ошибка парсинга excluded_containers.json: {e}") return [] except Exception as e: print(f"❌ Ошибка загрузки excluded_containers.json: {e}") return [] def save_excluded_containers(containers: List[str]) -> bool: """ Сохраняет список исключенных контейнеров в JSON файл Автор: Сергей Антропов Сайт: https://devops.org.ru """ try: data = { "excluded_containers": containers, "description": "Список контейнеров, которые генерируют слишком много логов и исключаются из отображения" } with open("excluded_containers.json", "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) return True except Exception as e: print(f"❌ Ошибка сохранения excluded_containers.json: {e}") return False def get_all_projects() -> List[str]: """ Получает список всех проектов Docker Compose с учетом исключенных контейнеров Автор: Сергей Антропов Сайт: https://devops.org.ru """ projects = set() excluded_containers = load_excluded_containers() try: containers = docker_client.containers.list(all=True) # Словарь для подсчета контейнеров по проектам project_containers = {} standalone_containers = [] for c in containers: try: # Пропускаем исключенные контейнеры if c.name in excluded_containers: continue labels = c.labels or {} project = labels.get("com.docker.compose.project") if project: if project not in project_containers: project_containers[project] = 0 project_containers[project] += 1 else: standalone_containers.append(c.name) except Exception: continue # Добавляем проекты, у которых есть хотя бы один неисключенный контейнер for project, count in project_containers.items(): if count > 0: projects.add(project) # Добавляем standalone, если есть неисключенные контейнеры без проекта if standalone_containers: projects.add("standalone") except Exception as e: print(f"❌ Ошибка получения списка проектов: {e}") return [] result = sorted(list(projects)) print(f"📋 Доступные проекты (с учетом исключенных контейнеров): {result}") return result def list_containers(projects: Optional[List[str]] = None, include_stopped: bool = False) -> List[Dict]: """ Получает список контейнеров с поддержкой множественных проектов Автор: Сергей Антропов Сайт: https://devops.org.ru """ # Загружаем список исключенных контейнеров из JSON файла excluded_containers = load_excluded_containers() print(f"🚫 Список исключенных контейнеров: {excluded_containers}") items = [] excluded_count = 0 try: # Получаем список контейнеров с базовой обработкой ошибок containers = docker_client.containers.list(all=include_stopped) for c in containers: try: # Базовая информация о контейнере (без health check) basic_info = { "id": c.id[:12], "name": c.name, "status": c.status, "image": "unknown", "service": c.name, "project": None, "health": None, "ports": [], "url": None, } # Безопасно получаем метки try: labels = c.labels or {} basic_info["project"] = labels.get("com.docker.compose.project") basic_info["service"] = labels.get("com.docker.compose.service") or c.name except Exception: pass # Используем значения по умолчанию # Безопасно получаем информацию об образе try: if c.image and c.image.tags: basic_info["image"] = c.image.tags[0] elif c.image: basic_info["image"] = c.image.short_id except Exception: pass # Оставляем "unknown" # Безопасно получаем информацию о портах try: ports = c.ports or {} if ports: basic_info["ports"] = list(ports.keys()) # Пытаемся найти HTTP/HTTPS порт для создания URL for port_mapping in ports.values(): if port_mapping: for mapping in port_mapping: if isinstance(mapping, dict) and mapping.get("HostPort"): host_port = mapping["HostPort"] # Проверяем, что это HTTP порт (80, 443, 8080, 3000, etc.) host_port_int = int(host_port) if (host_port_int in [80, 443] or (3000 <= host_port_int <= 4000) or (8000 <= host_port_int <= 9000)): protocol = "https" if host_port == "443" else "http" basic_info["url"] = f"{protocol}://localhost:{host_port}" break if basic_info["url"]: break except Exception: pass # Оставляем пустые значения # Фильтрация по проектам if projects: # Если проект не указан, считаем его standalone container_project = basic_info["project"] or "standalone" if container_project not in projects: continue # Фильтрация исключенных контейнеров if basic_info["name"] in excluded_containers: excluded_count += 1 print(f"⚠️ Пропускаем исключенный контейнер: {basic_info['name']}") continue # Добавляем контейнер в список items.append(basic_info) except Exception as e: # Пропускаем контейнеры с критическими ошибками print(f"⚠️ Пропускаем проблемный контейнер {c.name if hasattr(c, 'name') else 'unknown'} (ID: {c.id[:12]}): {e}") continue except Exception as e: print(f"❌ Ошибка получения списка контейнеров: {e}") return [] # Сортируем по проекту, сервису и имени items.sort(key=lambda x: (x.get("project") or "", x.get("service") or "", x.get("name") or "")) # Подсчитываем статистику по проектам project_stats = {} for item in items: project = item.get("project") or "standalone" if project not in project_stats: project_stats[project] = {"visible": 0, "excluded": 0} project_stats[project]["visible"] += 1 # Подсчитываем исключенные контейнеры по проектам for c in containers: try: if c.name in excluded_containers: labels = c.labels or {} project = labels.get("com.docker.compose.project") or "standalone" if project not in project_stats: project_stats[project] = {"visible": 0, "excluded": 0} project_stats[project]["excluded"] += 1 except Exception: continue print(f"📊 Статистика: найдено {len(items)} контейнеров, исключено {excluded_count} контейнеров") for project, stats in project_stats.items(): print(f" 📦 {project}: {stats['visible']} видимых, {stats['excluded']} исключенных") return items # ---------- HTML ---------- INDEX_PATH = os.getenv("LOGBOARD_INDEX_HTML", "./templates/index.html") def load_index_html() -> str: """Загружает HTML шаблон главной страницы""" with open(INDEX_PATH, "r", encoding="utf-8") as f: return f.read() def load_login_html() -> str: """Загружает HTML шаблон страницы входа""" login_path = "./templates/login.html" with open(login_path, "r", encoding="utf-8") as f: return f.read() # ---------- ROUTES ---------- @app.get("/", response_class=HTMLResponse) async def index(request: Request): """Главная страница приложения""" # Проверяем наличие токена в cookie access_token = request.cookies.get("access_token") if not access_token: return RedirectResponse(url="/login") # Проверяем валидность токена username = verify_token(access_token) if not username: return RedirectResponse(url="/login") return templates.TemplateResponse("index.html", {"request": request}) @app.get("/login", response_class=HTMLResponse) async def login_page(request: Request): """Страница входа""" return templates.TemplateResponse("login.html", {"request": request}) @app.post("/api/auth/login", response_model=Token) async def login(user_data: UserLogin, response: Response): """API для входа в систему""" if authenticate_user(user_data.username, user_data.password): access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user_data.username}, expires_delta=access_token_expires ) # Устанавливаем cookie с токеном response.set_cookie( key="access_token", value=access_token, httponly=True, secure=False, # Установите True для HTTPS samesite="lax", max_age=ACCESS_TOKEN_EXPIRE_MINUTES * 60 ) return {"access_token": access_token, "token_type": "bearer"} else: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Неверное имя пользователя или пароль. Проверьте учетные данные и попробуйте снова.", headers={"WWW-Authenticate": "Bearer"}, ) @app.post("/api/auth/logout") async def logout(response: Response): """API для выхода из системы""" response.delete_cookie(key="access_token") return {"message": "Успешный выход из системы"} @app.get("/api/auth/me") async def get_current_user_info(current_user: str = Depends(get_current_user)): """Получить информацию о текущем пользователе""" return {"username": current_user} @app.get("/healthz", response_class=PlainTextResponse) def healthz(): """Health check endpoint""" return "ok" # Маршруты для тестирования страниц ошибок (только в режиме разработки) @app.get("/test/error/404") async def test_404_error(): """Тест страницы ошибки 404""" raise HTTPException(status_code=404, detail="Тестовая ошибка 404") @app.get("/test/error/401") async def test_401_error(): """Тест страницы ошибки 401""" raise HTTPException(status_code=401, detail="Тестовая ошибка 401") @app.get("/test/error/403") async def test_403_error(): """Тест страницы ошибки 403""" raise HTTPException(status_code=403, detail="Тестовая ошибка 403") @app.get("/test/error/500") async def test_500_error(): """Тест страницы ошибки 500""" raise HTTPException(status_code=500, detail="Тестовая ошибка 500") @app.get("/test/error/general") async def test_general_error(): """Тест общей ошибки""" raise Exception("Тестовая общая ошибка") @app.get("/api/logs/stats/{container_id}") def api_logs_stats(container_id: str, current_user: str = Depends(get_current_user)): """Получить статистику логов контейнера""" try: # Ищем контейнер container = None for c in docker_client.containers.list(all=True): if c.id.startswith(container_id): container = c break if container is None: return JSONResponse({"error": "Container not found"}, status_code=404) # Получаем логи logs = container.logs(tail=1000).decode(errors="ignore") # Подсчитываем статистику stats = {"debug": 0, "info": 0, "warn": 0, "error": 0} for line in logs.split('\n'): if not line.strip(): continue line_lower = line.lower() if 'level=debug' in line_lower or 'debug' in line_lower: stats["debug"] += 1 elif 'level=info' in line_lower or 'info' in line_lower: stats["info"] += 1 elif 'level=warning' in line_lower or 'level=warn' in line_lower or 'warning' in line_lower or 'warn' in line_lower: stats["warn"] += 1 elif 'level=error' in line_lower or 'error' in line_lower: stats["error"] += 1 return JSONResponse( content=stats, headers={ "Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", "Expires": "0" } ) except Exception as e: print(f"Error getting log stats for {container_id}: {e}") return JSONResponse({"error": str(e)}, status_code=500) @app.get("/api/excluded-containers") def api_get_excluded_containers(current_user: str = Depends(get_current_user)): """Получить список исключенных контейнеров""" return JSONResponse( content={"excluded_containers": load_excluded_containers()}, headers={ "Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", "Expires": "0" } ) @app.post("/api/excluded-containers") def api_update_excluded_containers( containers: List[str] = Body(...), current_user: str = Depends(get_current_user) ): """Обновить список исключенных контейнеров""" success = save_excluded_containers(containers) if success: return JSONResponse( content={"status": "success", "message": "Список исключенных контейнеров обновлен"}, headers={ "Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", "Expires": "0" } ) else: raise HTTPException( status_code=500, detail="Ошибка сохранения списка исключенных контейнеров. Попробуйте еще раз или обратитесь к администратору." ) @app.get("/api/projects") def api_projects(current_user: str = Depends(get_current_user)): """Получить список всех проектов Docker Compose""" return JSONResponse( content=get_all_projects(), headers={ "Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", "Expires": "0" } ) @app.get("/api/services") def api_services( projects: Optional[str] = Query(None), include_stopped: bool = Query(False), current_user: str = Depends(get_current_user) ): """Получить список контейнеров с поддержкой множественных проектов""" project_list = None if projects: project_list = [p.strip() for p in projects.split(",") if p.strip()] elif DEFAULT_PROJECTS: project_list = [p.strip() for p in DEFAULT_PROJECTS.split(",") if p.strip()] elif DEFAULT_PROJECT: project_list = [DEFAULT_PROJECT] return JSONResponse( content=list_containers(projects=project_list, include_stopped=include_stopped), headers={ "Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", "Expires": "0" } ) @app.post("/api/snapshot") def api_snapshot( current_user: str = Depends(get_current_user), container_id: str = Body(..., embed=True), service: str = Body("", embed=True), content: str = Body("", embed=True), ): """Сохранить снимок логов""" # Save posted content as a snapshot file safe_service = re.sub(r"[^a-zA-Z0-9_.-]+", "_", service or container_id[:12]) ts = os.getenv("TZ_TS") or "" from datetime import datetime stamp = datetime.now().strftime("%Y%m%d-%H%M%S") fname = f"{safe_service}-{stamp}.log" fpath = os.path.join(SNAP_DIR, fname) with open(fpath, "w", encoding="utf-8") as f: f.write(content) url = f"/snapshots/{fname}" return {"file": fname, "url": url} # WebSocket: verify token (?token=base64(user:pass)) @app.websocket("/ws/logs/{container_id}") async def ws_logs(ws: WebSocket, container_id: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None, service: Optional[str] = None, project: Optional[str] = None): """WebSocket для получения логов контейнера""" # Принимаем соединение await ws.accept() # Проверяем токен if not token: await ws.send_text("ERROR: token required") await ws.close() return username = verify_token(token) if not username: await ws.send_text("ERROR: invalid token") await ws.close() return try: # Простой поиск контейнера по ID container = None try: for c in docker_client.containers.list(all=True): if c.id.startswith(container_id): container = c break except Exception as e: await ws.send_text(f"ERROR: cannot list containers - {e}") return if container is None: await ws.send_text("ERROR: container not found") return # Отправляем начальное сообщение await ws.send_text(f"Connected to container: {container.name}") # Получаем логи (только последние строки, без follow) try: print(f"Getting logs for container {container.name} (ID: {container.id[:12]})") logs = container.logs(tail=tail).decode(errors="ignore") if logs: await ws.send_text(logs) else: await ws.send_text("No logs available") except Exception as e: print(f"Error getting logs for {container.name}: {e}") await ws.send_text(f"ERROR getting logs: {e}") # Простое WebSocket соединение - только отправляем логи один раз print(f"WebSocket connection established for {container.name}") except WebSocketDisconnect: print(f"WebSocket client disconnected for container {container.name}") except Exception as e: print(f"WebSocket error for {container.name}: {e}") try: await ws.send_text(f"ERROR: {e}") except: pass finally: try: print(f"Closing WebSocket connection for container {container.name}") await ws.close() except: pass # WebSocket: fan-in by compose service (aggregate all replicas), prefixing with short container id @app.websocket("/ws/fan/{service_name}") async def ws_fan(ws: WebSocket, service_name: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None, project: Optional[str] = None): await ws.accept() # Проверяем токен if not token: await ws.send_text("ERROR: token required") await ws.close() return username = verify_token(token) if not username: await ws.send_text("ERROR: invalid token") await ws.close() return # Track active streaming tasks by container id active = {} def list_by_service(service_name: str, project_name: Optional[str] = None): found = [] for c in docker_client.containers.list(all=True): lbl = c.labels or {} if lbl.get("com.docker.compose.service") == service_name and (project_name is None or lbl.get("com.docker.compose.project")==project_name): found.append(c) # sort by Created asc so first lines look ordered-ish try: found.sort(key=lambda x: x.attrs.get("Created","")) except Exception: pass return found async def stream_container(cont): short = cont.id[:8] first_tail = tail while True: try: use_tail = first_tail first_tail = 0 stream = cont.logs(stream=True, follow=True, tail=(use_tail if use_tail>0 else "all")) for chunk in stream: if chunk is None: break try: await ws.send_text(f"[{short}] " + chunk.decode(errors="ignore")) except RuntimeError: stream.close(); return stream.close() # container stopped -> wait and try to find same id again; if gone, exit loop and outer watcher will reassign await asyncio.sleep(1.0) except WebSocketDisconnect: return except Exception: await asyncio.sleep(1.0) continue async def watcher(): # Periodically reconcile set of containers try: while True: desired = {c.id: c for c in list_by_service(service_name, project)} # start missing for cid, cont in desired.items(): if cid not in active: task = asyncio.create_task(stream_container(cont)) active[cid] = task # cancel removed for cid in list(active.keys()): if cid not in desired: task = active.pop(cid) task.cancel() await asyncio.sleep(2.0) except asyncio.CancelledError: pass watch_task = asyncio.create_task(watcher()) try: # Keep ws open until disconnected; the tasks stream data while True: await asyncio.sleep(1.0) except WebSocketDisconnect: pass finally: watch_task.cancel() for t in active.values(): t.cancel() try: await ws.close() except Exception: pass # WebSocket: fan-in for multiple compose services (comma-separated), optional project filter. @app.websocket("/ws/fan_group") async def ws_fan_group(ws: WebSocket, services: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None, project: Optional[str] = None): await ws.accept() # Проверяем токен if not token: await ws.send_text("ERROR: token required") await ws.close() return username = verify_token(token) if not username: await ws.send_text("ERROR: invalid token") await ws.close() return svc_set = {s.strip() for s in services.split(",") if s.strip()} if not svc_set: await ws.send_text("ERROR: no services provided") await ws.close(); return active = {} def list_by_services(names, project_name: Optional[str] = None): res = [] for c in docker_client.containers.list(all=True): lbl = c.labels or {} if lbl.get("com.docker.compose.service") in names and (project_name is None or lbl.get("com.docker.compose.project")==project_name): res.append(c) try: res.sort(key=lambda x: x.attrs.get("Created","")) except Exception: pass return res async def stream_container(cont): short = cont.id[:8] svc = (cont.labels or {}).get("com.docker.compose.service","") first_tail = tail while True: try: use_tail = first_tail first_tail = 0 stream = cont.logs(stream=True, follow=True, tail=(use_tail if use_tail>0 else "all")) for chunk in stream: if chunk is None: break line = chunk.decode(errors="ignore") try: await ws.send_text(f"[{short} {svc}] " + line) except RuntimeError: stream.close(); return stream.close() await asyncio.sleep(1.0) except WebSocketDisconnect: return except Exception: await asyncio.sleep(1.0) continue async def watcher(): try: while True: desired = {c.id: c for c in list_by_services(svc_set, project)} for cid, cont in desired.items(): if cid not in active: task = asyncio.create_task(stream_container(cont)) active[cid] = task for cid in list(active.keys()): if cid not in desired: task = active.pop(cid) task.cancel() await asyncio.sleep(2.0) except asyncio.CancelledError: pass watch_task = asyncio.create_task(watcher()) try: while True: await asyncio.sleep(1.0) except WebSocketDisconnect: pass finally: watch_task.cancel() for t in active.values(): t.cancel() try: await ws.close() except Exception: pass if __name__ == "__main__": import uvicorn print(f"LogBoard+ http://0.0.0.0:{APP_PORT}") uvicorn.run(app, host="0.0.0.0", port=APP_PORT)