#!/usr/bin/env python3 # -*- coding: utf-8 -*- import asyncio import base64 import os import re from typing import Optional, List, Dict import docker from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query, Depends, HTTPException, status, Body from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse from fastapi.security import HTTPBasic, HTTPBasicCredentials from fastapi.staticfiles import StaticFiles APP_PORT = int(os.getenv("LOGBOARD_PORT", "9001")) DEFAULT_TAIL = int(os.getenv("LOGBOARD_TAIL", "500")) BASIC_USER = os.getenv("LOGBOARD_USER", "admin") BASIC_PASS = os.getenv("LOGBOARD_PASS", "admin") DEFAULT_PROJECT = os.getenv("COMPOSE_PROJECT_NAME") # filter by compose project DEFAULT_PROJECTS = os.getenv("LOGBOARD_PROJECTS") # multiple projects filter SKIP_UNHEALTHY = os.getenv("LOGBOARD_SKIP_UNHEALTHY", "true").lower() == "true" CONTAINER_LIST_TIMEOUT = int(os.getenv("LOGBOARD_CONTAINER_LIST_TIMEOUT", "10")) CONTAINER_INFO_TIMEOUT = int(os.getenv("LOGBOARD_CONTAINER_INFO_TIMEOUT", "3")) HEALTH_CHECK_TIMEOUT = int(os.getenv("LOGBOARD_HEALTH_CHECK_TIMEOUT", "2")) security = HTTPBasic() app = FastAPI(title="LogBoard+") # serve snapshots directory (downloadable files) SNAP_DIR = os.getenv("LOGBOARD_SNAPSHOT_DIR", "./snapshots") os.makedirs(SNAP_DIR, exist_ok=True) app.mount("/snapshots", StaticFiles(directory=SNAP_DIR), name="snapshots") docker_client = docker.from_env() # ---------- AUTH ---------- def check_basic(creds: HTTPBasicCredentials = Depends(security)): if creds.username == BASIC_USER and creds.password == BASIC_PASS: return creds raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials", headers={"WWW-Authenticate": "Basic"}) def token_from_creds(creds: HTTPBasicCredentials) -> str: return base64.b64encode(f"{creds.username}:{creds.password}".encode()).decode() def verify_ws_token(token: str) -> bool: try: raw = base64.b64decode(token.encode(), validate=True).decode() except Exception: return False return raw == f"{BASIC_USER}:{BASIC_PASS}" # ---------- DOCKER HELPERS ---------- def get_all_projects() -> List[str]: """ Получает список всех проектов Docker Compose Автор: Сергей Антропов Сайт: https://devops.org.ru """ projects = set() try: containers = docker_client.containers.list(all=True) for c in containers: try: labels = c.labels or {} project = labels.get("com.docker.compose.project") if project: projects.add(project) except Exception: continue # Добавляем контейнеры без проекта как "standalone" standalone_count = 0 for c in containers: try: labels = c.labels or {} if not labels.get("com.docker.compose.project"): standalone_count += 1 except Exception: continue if standalone_count > 0: projects.add("standalone") except Exception as e: print(f"❌ Ошибка получения списка проектов: {e}") return [] return sorted(list(projects)) def list_containers(projects: Optional[List[str]] = None, include_stopped: bool = False) -> List[Dict]: """ Получает список контейнеров с поддержкой множественных проектов Автор: Сергей Антропов Сайт: https://devops.org.ru """ # Список контейнеров, которые генерируют слишком много логов excluded_containers = [ "buildx_buildkit_multiarch-builder0", "buildx_buildkit_multiarch-builder1", "buildx_buildkit_multiarch-builder2", "buildx_buildkit_multiarch-builder3", "buildx_buildkit_multiarch-builder4", "buildx_buildkit_multiarch-builder5", "buildx_buildkit_multiarch-builder6", "buildx_buildkit_multiarch-builder7", "buildx_buildkit_multiarch-builder8", "buildx_buildkit_multiarch-builder9" ] items = [] try: # Получаем список контейнеров с базовой обработкой ошибок containers = docker_client.containers.list(all=include_stopped) for c in containers: try: # Базовая информация о контейнере (без health check) basic_info = { "id": c.id[:12], "name": c.name, "status": c.status, "image": "unknown", "service": c.name, "project": None, "health": None, } # Безопасно получаем метки try: labels = c.labels or {} basic_info["project"] = labels.get("com.docker.compose.project") basic_info["service"] = labels.get("com.docker.compose.service") or c.name except Exception: pass # Используем значения по умолчанию # Безопасно получаем информацию об образе try: if c.image and c.image.tags: basic_info["image"] = c.image.tags[0] elif c.image: basic_info["image"] = c.image.short_id except Exception: pass # Оставляем "unknown" # Фильтрация по проектам if projects: # Если проект не указан, считаем его standalone container_project = basic_info["project"] or "standalone" if container_project not in projects: continue # Фильтрация исключенных контейнеров if basic_info["name"] in excluded_containers: print(f"⚠️ Пропускаем исключенный контейнер: {basic_info['name']}") continue # Добавляем контейнер в список items.append(basic_info) except Exception as e: # Пропускаем контейнеры с критическими ошибками print(f"⚠️ Пропускаем проблемный контейнер {c.name if hasattr(c, 'name') else 'unknown'} (ID: {c.id[:12]}): {e}") continue except Exception as e: print(f"❌ Ошибка получения списка контейнеров: {e}") return [] # Сортируем по проекту, сервису и имени items.sort(key=lambda x: (x.get("project") or "", x.get("service") or "", x.get("name") or "")) return items # ---------- HTML ---------- INDEX_PATH = os.getenv("LOGBOARD_INDEX_HTML", "./templates/index.html") def load_index_html(token: str) -> str: with open(INDEX_PATH, "r", encoding="utf-8") as f: html = f.read() return html.replace("__TOKEN__", token) # ---------- ROUTES ---------- @app.get("/", response_class=HTMLResponse) def index(creds: HTTPBasicCredentials = Depends(check_basic)): token = token_from_creds(creds) return HTMLResponse( content=load_index_html(token), headers={ "Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", "Expires": "0" } ) @app.get("/healthz", response_class=PlainTextResponse) def healthz(): return "ok" @app.get("/api/logs/stats/{container_id}") def api_logs_stats(container_id: str, _: HTTPBasicCredentials = Depends(check_basic)): """Получить статистику логов контейнера""" try: # Ищем контейнер container = None for c in docker_client.containers.list(all=True): if c.id.startswith(container_id): container = c break if container is None: return JSONResponse({"error": "Container not found"}, status_code=404) # Получаем логи logs = container.logs(tail=1000).decode(errors="ignore") # Подсчитываем статистику stats = {"debug": 0, "info": 0, "warn": 0, "error": 0} for line in logs.split('\n'): if not line.strip(): continue line_lower = line.lower() if 'level=debug' in line_lower or 'debug' in line_lower: stats["debug"] += 1 elif 'level=info' in line_lower or 'info' in line_lower: stats["info"] += 1 elif 'level=warning' in line_lower or 'level=warn' in line_lower or 'warning' in line_lower or 'warn' in line_lower: stats["warn"] += 1 elif 'level=error' in line_lower or 'error' in line_lower: stats["error"] += 1 return JSONResponse( content=stats, headers={ "Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", "Expires": "0" } ) except Exception as e: print(f"Error getting log stats for {container_id}: {e}") return JSONResponse({"error": str(e)}, status_code=500) @app.get("/api/projects") def api_projects(_: HTTPBasicCredentials = Depends(check_basic)): """Получить список всех проектов Docker Compose""" return JSONResponse( content=get_all_projects(), headers={ "Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", "Expires": "0" } ) @app.get("/api/services") def api_services(projects: Optional[str] = Query(None), include_stopped: bool = Query(False), _: HTTPBasicCredentials = Depends(check_basic)): """ Получить список контейнеров с поддержкой множественных проектов projects: список проектов через запятую (например: "project1,project2") """ project_list = None if projects: project_list = [p.strip() for p in projects.split(",") if p.strip()] elif DEFAULT_PROJECTS: project_list = [p.strip() for p in DEFAULT_PROJECTS.split(",") if p.strip()] elif DEFAULT_PROJECT: project_list = [DEFAULT_PROJECT] return JSONResponse( content=list_containers(projects=project_list, include_stopped=include_stopped), headers={ "Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", "Expires": "0" } ) @app.post("/api/snapshot") def api_snapshot( creds: HTTPBasicCredentials = Depends(check_basic), container_id: str = Body(..., embed=True), service: str = Body("", embed=True), content: str = Body("", embed=True), ): # Save posted content as a snapshot file safe_service = re.sub(r"[^a-zA-Z0-9_.-]+", "_", service or container_id[:12]) ts = os.getenv("TZ_TS") or "" from datetime import datetime stamp = datetime.now().strftime("%Y%m%d-%H%M%S") fname = f"{safe_service}-{stamp}.log" fpath = os.path.join(SNAP_DIR, fname) with open(fpath, "w", encoding="utf-8") as f: f.write(content) url = f"/snapshots/{fname}" return {"file": fname, "url": url} # WebSocket: verify token (?token=base64(user:pass)) # WebSocket: verify token (?token=base64(user:pass)). Supports auto-reconnect by service label. @app.websocket("/ws/logs/{container_id}") async def ws_logs(ws: WebSocket, container_id: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None, service: Optional[str] = None, project: Optional[str] = None): """Упрощенный WebSocket для получения логов контейнера""" # Принимаем соединение await ws.accept() # Проверяем токен if not token or not verify_ws_token(token): await ws.send_text("ERROR: unauthorized") await ws.close() return try: # Простой поиск контейнера по ID container = None try: for c in docker_client.containers.list(all=True): if c.id.startswith(container_id): container = c break except Exception as e: await ws.send_text(f"ERROR: cannot list containers - {e}") return if container is None: await ws.send_text("ERROR: container not found") return # Отправляем начальное сообщение await ws.send_text(f"Connected to container: {container.name}") # Получаем логи (только последние строки, без follow) try: print(f"Getting logs for container {container.name} (ID: {container.id[:12]})") logs = container.logs(tail=tail).decode(errors="ignore") if logs: await ws.send_text(logs) else: await ws.send_text("No logs available") except Exception as e: print(f"Error getting logs for {container.name}: {e}") await ws.send_text(f"ERROR getting logs: {e}") # Простое WebSocket соединение - только отправляем логи один раз print(f"WebSocket connection established for {container.name}") except WebSocketDisconnect: print(f"WebSocket client disconnected for container {container.name}") except Exception as e: print(f"WebSocket error for {container.name}: {e}") try: await ws.send_text(f"ERROR: {e}") except: pass finally: try: print(f"Closing WebSocket connection for container {container.name}") await ws.close() except: pass if __name__ == "__main__": import uvicorn print(f"LogBoard+ http://0.0.0.0:{APP_PORT}") uvicorn.run(app, host="0.0.0.0", port=APP_PORT) # WebSocket: fan-in by compose service (aggregate all replicas), prefixing with short container id @app.websocket("/ws/fan/{service_name}") async def ws_fan(ws: WebSocket, service_name: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None, project: Optional[str] = None): await ws.accept() if not token or not verify_ws_token(token): await ws.send_text("ERROR: unauthorized") await ws.close(); return # Track active streaming tasks by container id active = {} def list_by_service(service_name: str, project_name: Optional[str] = None): found = [] for c in docker_client.containers.list(all=True): lbl = c.labels or {} if lbl.get("com.docker.compose.service") == service_name and (project_name is None or lbl.get("com.docker.compose.project")==project_name): found.append(c) # sort by Created asc so first lines look ordered-ish try: found.sort(key=lambda x: x.attrs.get("Created","")) except Exception: pass return found async def stream_container(cont): short = cont.id[:8] first_tail = tail while True: try: use_tail = first_tail first_tail = 0 stream = cont.logs(stream=True, follow=True, tail=(use_tail if use_tail>0 else "all")) for chunk in stream: if chunk is None: break try: await ws.send_text(f"[{short}] " + chunk.decode(errors="ignore")) except RuntimeError: stream.close(); return stream.close() # container stopped -> wait and try to find same id again; if gone, exit loop and outer watcher will reassign await asyncio.sleep(1.0) except WebSocketDisconnect: return except Exception: await asyncio.sleep(1.0) continue async def watcher(): # Periodically reconcile set of containers try: while True: desired = {c.id: c for c in list_by_service(service_name, project)} # start missing for cid, cont in desired.items(): if cid not in active: task = asyncio.create_task(stream_container(cont)) active[cid] = task # cancel removed for cid in list(active.keys()): if cid not in desired: task = active.pop(cid) task.cancel() await asyncio.sleep(2.0) except asyncio.CancelledError: pass watch_task = asyncio.create_task(watcher()) try: # Keep ws open until disconnected; the tasks stream data while True: await asyncio.sleep(1.0) except WebSocketDisconnect: pass finally: watch_task.cancel() for t in active.values(): t.cancel() try: await ws.close() except Exception: pass # WebSocket: fan-in for multiple compose services (comma-separated), optional project filter. @app.websocket("/ws/fan_group") async def ws_fan_group(ws: WebSocket, services: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None, project: Optional[str] = None): await ws.accept() if not token or not verify_ws_token(token): await ws.send_text("ERROR: unauthorized") await ws.close(); return svc_set = {s.strip() for s in services.split(",") if s.strip()} if not svc_set: await ws.send_text("ERROR: no services provided") await ws.close(); return active = {} def list_by_services(names, project_name: Optional[str] = None): res = [] for c in docker_client.containers.list(all=True): lbl = c.labels or {} if lbl.get("com.docker.compose.service") in names and (project_name is None or lbl.get("com.docker.compose.project")==project_name): res.append(c) try: res.sort(key=lambda x: x.attrs.get("Created","")) except Exception: pass return res async def stream_container(cont): short = cont.id[:8] svc = (cont.labels or {}).get("com.docker.compose.service","") first_tail = tail while True: try: use_tail = first_tail first_tail = 0 stream = cont.logs(stream=True, follow=True, tail=(use_tail if use_tail>0 else "all")) for chunk in stream: if chunk is None: break line = chunk.decode(errors="ignore") try: await ws.send_text(f"[{short} {svc}] " + line) except RuntimeError: stream.close(); return stream.close() await asyncio.sleep(1.0) except WebSocketDisconnect: return except Exception: await asyncio.sleep(1.0) continue async def watcher(): try: while True: desired = {c.id: c for c in list_by_services(svc_set, project)} for cid, cont in desired.items(): if cid not in active: task = asyncio.create_task(stream_container(cont)) active[cid] = task for cid in list(active.keys()): if cid not in desired: task = active.pop(cid) task.cancel() await asyncio.sleep(2.0) except asyncio.CancelledError: pass watch_task = asyncio.create_task(watcher()) try: while True: await asyncio.sleep(1.0) except WebSocketDisconnect: pass finally: watch_task.cancel() for t in active.values(): t.cancel() try: await ws.close() except Exception: pass