#!/usr/bin/env python3 # -*- coding: utf-8 -*- import asyncio import base64 import json import os import re from typing import Optional, List, Dict import docker from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query, Depends, HTTPException, status, Body from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse from fastapi.security import HTTPBasic, HTTPBasicCredentials from fastapi.staticfiles import StaticFiles APP_PORT = int(os.getenv("LOGBOARD_PORT", "9001")) DEFAULT_TAIL = int(os.getenv("LOGBOARD_TAIL", "500")) BASIC_USER = os.getenv("LOGBOARD_USER", "admin") BASIC_PASS = os.getenv("LOGBOARD_PASS", "admin") DEFAULT_PROJECT = os.getenv("COMPOSE_PROJECT_NAME") # filter by compose project DEFAULT_PROJECTS = os.getenv("LOGBOARD_PROJECTS") # multiple projects filter SKIP_UNHEALTHY = os.getenv("LOGBOARD_SKIP_UNHEALTHY", "true").lower() == "true" CONTAINER_LIST_TIMEOUT = int(os.getenv("LOGBOARD_CONTAINER_LIST_TIMEOUT", "10")) CONTAINER_INFO_TIMEOUT = int(os.getenv("LOGBOARD_CONTAINER_INFO_TIMEOUT", "3")) HEALTH_CHECK_TIMEOUT = int(os.getenv("LOGBOARD_HEALTH_CHECK_TIMEOUT", "2")) security = HTTPBasic() app = FastAPI(title="LogBoard+") # serve snapshots directory (downloadable files) SNAP_DIR = os.getenv("LOGBOARD_SNAPSHOT_DIR", "./snapshots") os.makedirs(SNAP_DIR, exist_ok=True) app.mount("/snapshots", StaticFiles(directory=SNAP_DIR), name="snapshots") docker_client = docker.from_env() # ---------- AUTH ---------- def check_basic(creds: HTTPBasicCredentials = Depends(security)): if creds.username == BASIC_USER and creds.password == BASIC_PASS: return creds raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials", headers={"WWW-Authenticate": "Basic"}) def token_from_creds(creds: HTTPBasicCredentials) -> str: return base64.b64encode(f"{creds.username}:{creds.password}".encode()).decode() def verify_ws_token(token: str) -> bool: try: raw = base64.b64decode(token.encode(), validate=True).decode() except Exception: return False return raw == f"{BASIC_USER}:{BASIC_PASS}" # ---------- DOCKER HELPERS ---------- def load_excluded_containers() -> List[str]: """ Загружает список исключенных контейнеров из JSON файла Автор: Сергей Антропов Сайт: https://devops.org.ru """ try: with open("excluded_containers.json", "r", encoding="utf-8") as f: data = json.load(f) return data.get("excluded_containers", []) except FileNotFoundError: print("⚠️ Файл excluded_containers.json не найден, используем пустой список") return [] except json.JSONDecodeError as e: print(f"❌ Ошибка парсинга excluded_containers.json: {e}") return [] except Exception as e: print(f"❌ Ошибка загрузки excluded_containers.json: {e}") return [] def save_excluded_containers(containers: List[str]) -> bool: """ Сохраняет список исключенных контейнеров в JSON файл Автор: Сергей Антропов Сайт: https://devops.org.ru """ try: data = { "excluded_containers": containers, "description": "Список контейнеров, которые генерируют слишком много логов и исключаются из отображения" } with open("excluded_containers.json", "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) return True except Exception as e: print(f"❌ Ошибка сохранения excluded_containers.json: {e}") return False def get_all_projects() -> List[str]: """ Получает список всех проектов Docker Compose с учетом исключенных контейнеров Автор: Сергей Антропов Сайт: https://devops.org.ru """ projects = set() excluded_containers = load_excluded_containers() try: containers = docker_client.containers.list(all=True) # Словарь для подсчета контейнеров по проектам project_containers = {} standalone_containers = [] for c in containers: try: # Пропускаем исключенные контейнеры if c.name in excluded_containers: continue labels = c.labels or {} project = labels.get("com.docker.compose.project") if project: if project not in project_containers: project_containers[project] = 0 project_containers[project] += 1 else: standalone_containers.append(c.name) except Exception: continue # Добавляем проекты, у которых есть хотя бы один неисключенный контейнер for project, count in project_containers.items(): if count > 0: projects.add(project) # Добавляем standalone, если есть неисключенные контейнеры без проекта if standalone_containers: projects.add("standalone") except Exception as e: print(f"❌ Ошибка получения списка проектов: {e}") return [] result = sorted(list(projects)) print(f"📋 Доступные проекты (с учетом исключенных контейнеров): {result}") return result def list_containers(projects: Optional[List[str]] = None, include_stopped: bool = False) -> List[Dict]: """ Получает список контейнеров с поддержкой множественных проектов Автор: Сергей Антропов Сайт: https://devops.org.ru """ # Загружаем список исключенных контейнеров из JSON файла excluded_containers = load_excluded_containers() print(f"🚫 Список исключенных контейнеров: {excluded_containers}") items = [] excluded_count = 0 try: # Получаем список контейнеров с базовой обработкой ошибок containers = docker_client.containers.list(all=include_stopped) for c in containers: try: # Базовая информация о контейнере (без health check) basic_info = { "id": c.id[:12], "name": c.name, "status": c.status, "image": "unknown", "service": c.name, "project": None, "health": None, "ports": [], "url": None, } # Безопасно получаем метки try: labels = c.labels or {} basic_info["project"] = labels.get("com.docker.compose.project") basic_info["service"] = labels.get("com.docker.compose.service") or c.name except Exception: pass # Используем значения по умолчанию # Безопасно получаем информацию об образе try: if c.image and c.image.tags: basic_info["image"] = c.image.tags[0] elif c.image: basic_info["image"] = c.image.short_id except Exception: pass # Оставляем "unknown" # Безопасно получаем информацию о портах try: ports = c.ports or {} if ports: basic_info["ports"] = list(ports.keys()) # Пытаемся найти HTTP/HTTPS порт для создания URL for port_mapping in ports.values(): if port_mapping: for mapping in port_mapping: if isinstance(mapping, dict) and mapping.get("HostPort"): host_port = mapping["HostPort"] # Проверяем, что это HTTP порт (80, 443, 8080, 3000, etc.) host_port_int = int(host_port) if (host_port_int in [80, 443] or (3000 <= host_port_int <= 4000) or (8000 <= host_port_int <= 9000)): protocol = "https" if host_port == "443" else "http" basic_info["url"] = f"{protocol}://localhost:{host_port}" break if basic_info["url"]: break except Exception: pass # Оставляем пустые значения # Фильтрация по проектам if projects: # Если проект не указан, считаем его standalone container_project = basic_info["project"] or "standalone" if container_project not in projects: continue # Фильтрация исключенных контейнеров if basic_info["name"] in excluded_containers: excluded_count += 1 print(f"⚠️ Пропускаем исключенный контейнер: {basic_info['name']}") continue # Добавляем контейнер в список items.append(basic_info) except Exception as e: # Пропускаем контейнеры с критическими ошибками print(f"⚠️ Пропускаем проблемный контейнер {c.name if hasattr(c, 'name') else 'unknown'} (ID: {c.id[:12]}): {e}") continue except Exception as e: print(f"❌ Ошибка получения списка контейнеров: {e}") return [] # Сортируем по проекту, сервису и имени items.sort(key=lambda x: (x.get("project") or "", x.get("service") or "", x.get("name") or "")) # Подсчитываем статистику по проектам project_stats = {} for item in items: project = item.get("project") or "standalone" if project not in project_stats: project_stats[project] = {"visible": 0, "excluded": 0} project_stats[project]["visible"] += 1 # Подсчитываем исключенные контейнеры по проектам for c in containers: try: if c.name in excluded_containers: labels = c.labels or {} project = labels.get("com.docker.compose.project") or "standalone" if project not in project_stats: project_stats[project] = {"visible": 0, "excluded": 0} project_stats[project]["excluded"] += 1 except Exception: continue print(f"📊 Статистика: найдено {len(items)} контейнеров, исключено {excluded_count} контейнеров") for project, stats in project_stats.items(): print(f" 📦 {project}: {stats['visible']} видимых, {stats['excluded']} исключенных") return items # ---------- HTML ---------- INDEX_PATH = os.getenv("LOGBOARD_INDEX_HTML", "./templates/index.html") def load_index_html(token: str) -> str: with open(INDEX_PATH, "r", encoding="utf-8") as f: html = f.read() return html.replace("__TOKEN__", token) # ---------- ROUTES ---------- @app.get("/", response_class=HTMLResponse) def index(creds: HTTPBasicCredentials = Depends(check_basic)): token = token_from_creds(creds) return HTMLResponse( content=load_index_html(token), headers={ "Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", "Expires": "0" } ) @app.get("/healthz", response_class=PlainTextResponse) def healthz(): return "ok" @app.get("/api/logs/stats/{container_id}") def api_logs_stats(container_id: str, _: HTTPBasicCredentials = Depends(check_basic)): """Получить статистику логов контейнера""" try: # Ищем контейнер container = None for c in docker_client.containers.list(all=True): if c.id.startswith(container_id): container = c break if container is None: return JSONResponse({"error": "Container not found"}, status_code=404) # Получаем логи logs = container.logs(tail=1000).decode(errors="ignore") # Подсчитываем статистику stats = {"debug": 0, "info": 0, "warn": 0, "error": 0} for line in logs.split('\n'): if not line.strip(): continue line_lower = line.lower() if 'level=debug' in line_lower or 'debug' in line_lower: stats["debug"] += 1 elif 'level=info' in line_lower or 'info' in line_lower: stats["info"] += 1 elif 'level=warning' in line_lower or 'level=warn' in line_lower or 'warning' in line_lower or 'warn' in line_lower: stats["warn"] += 1 elif 'level=error' in line_lower or 'error' in line_lower: stats["error"] += 1 return JSONResponse( content=stats, headers={ "Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", "Expires": "0" } ) except Exception as e: print(f"Error getting log stats for {container_id}: {e}") return JSONResponse({"error": str(e)}, status_code=500) @app.get("/api/excluded-containers") def api_get_excluded_containers(_: HTTPBasicCredentials = Depends(check_basic)): """ Получить список исключенных контейнеров """ return JSONResponse( content={"excluded_containers": load_excluded_containers()}, headers={ "Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", "Expires": "0" } ) @app.post("/api/excluded-containers") def api_update_excluded_containers( containers: List[str] = Body(...), _: HTTPBasicCredentials = Depends(check_basic) ): """ Обновить список исключенных контейнеров """ success = save_excluded_containers(containers) if success: return JSONResponse( content={"status": "success", "message": "Список исключенных контейнеров обновлен"}, headers={ "Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", "Expires": "0" } ) else: raise HTTPException(status_code=500, detail="Ошибка сохранения списка") @app.get("/api/projects") def api_projects(_: HTTPBasicCredentials = Depends(check_basic)): """Получить список всех проектов Docker Compose""" return JSONResponse( content=get_all_projects(), headers={ "Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", "Expires": "0" } ) @app.get("/api/services") def api_services(projects: Optional[str] = Query(None), include_stopped: bool = Query(False), _: HTTPBasicCredentials = Depends(check_basic)): """ Получить список контейнеров с поддержкой множественных проектов projects: список проектов через запятую (например: "project1,project2") """ project_list = None if projects: project_list = [p.strip() for p in projects.split(",") if p.strip()] elif DEFAULT_PROJECTS: project_list = [p.strip() for p in DEFAULT_PROJECTS.split(",") if p.strip()] elif DEFAULT_PROJECT: project_list = [DEFAULT_PROJECT] return JSONResponse( content=list_containers(projects=project_list, include_stopped=include_stopped), headers={ "Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", "Expires": "0" } ) @app.post("/api/snapshot") def api_snapshot( creds: HTTPBasicCredentials = Depends(check_basic), container_id: str = Body(..., embed=True), service: str = Body("", embed=True), content: str = Body("", embed=True), ): # Save posted content as a snapshot file safe_service = re.sub(r"[^a-zA-Z0-9_.-]+", "_", service or container_id[:12]) ts = os.getenv("TZ_TS") or "" from datetime import datetime stamp = datetime.now().strftime("%Y%m%d-%H%M%S") fname = f"{safe_service}-{stamp}.log" fpath = os.path.join(SNAP_DIR, fname) with open(fpath, "w", encoding="utf-8") as f: f.write(content) url = f"/snapshots/{fname}" return {"file": fname, "url": url} # WebSocket: verify token (?token=base64(user:pass)) # WebSocket: verify token (?token=base64(user:pass)). Supports auto-reconnect by service label. @app.websocket("/ws/logs/{container_id}") async def ws_logs(ws: WebSocket, container_id: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None, service: Optional[str] = None, project: Optional[str] = None): """Упрощенный WebSocket для получения логов контейнера""" # Принимаем соединение await ws.accept() # Проверяем токен if not token or not verify_ws_token(token): await ws.send_text("ERROR: unauthorized") await ws.close() return try: # Простой поиск контейнера по ID container = None try: for c in docker_client.containers.list(all=True): if c.id.startswith(container_id): container = c break except Exception as e: await ws.send_text(f"ERROR: cannot list containers - {e}") return if container is None: await ws.send_text("ERROR: container not found") return # Отправляем начальное сообщение await ws.send_text(f"Connected to container: {container.name}") # Получаем логи (только последние строки, без follow) try: print(f"Getting logs for container {container.name} (ID: {container.id[:12]})") logs = container.logs(tail=tail).decode(errors="ignore") if logs: await ws.send_text(logs) else: await ws.send_text("No logs available") except Exception as e: print(f"Error getting logs for {container.name}: {e}") await ws.send_text(f"ERROR getting logs: {e}") # Простое WebSocket соединение - только отправляем логи один раз print(f"WebSocket connection established for {container.name}") except WebSocketDisconnect: print(f"WebSocket client disconnected for container {container.name}") except Exception as e: print(f"WebSocket error for {container.name}: {e}") try: await ws.send_text(f"ERROR: {e}") except: pass finally: try: print(f"Closing WebSocket connection for container {container.name}") await ws.close() except: pass if __name__ == "__main__": import uvicorn print(f"LogBoard+ http://0.0.0.0:{APP_PORT}") uvicorn.run(app, host="0.0.0.0", port=APP_PORT) # WebSocket: fan-in by compose service (aggregate all replicas), prefixing with short container id @app.websocket("/ws/fan/{service_name}") async def ws_fan(ws: WebSocket, service_name: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None, project: Optional[str] = None): await ws.accept() if not token or not verify_ws_token(token): await ws.send_text("ERROR: unauthorized") await ws.close(); return # Track active streaming tasks by container id active = {} def list_by_service(service_name: str, project_name: Optional[str] = None): found = [] for c in docker_client.containers.list(all=True): lbl = c.labels or {} if lbl.get("com.docker.compose.service") == service_name and (project_name is None or lbl.get("com.docker.compose.project")==project_name): found.append(c) # sort by Created asc so first lines look ordered-ish try: found.sort(key=lambda x: x.attrs.get("Created","")) except Exception: pass return found async def stream_container(cont): short = cont.id[:8] first_tail = tail while True: try: use_tail = first_tail first_tail = 0 stream = cont.logs(stream=True, follow=True, tail=(use_tail if use_tail>0 else "all")) for chunk in stream: if chunk is None: break try: await ws.send_text(f"[{short}] " + chunk.decode(errors="ignore")) except RuntimeError: stream.close(); return stream.close() # container stopped -> wait and try to find same id again; if gone, exit loop and outer watcher will reassign await asyncio.sleep(1.0) except WebSocketDisconnect: return except Exception: await asyncio.sleep(1.0) continue async def watcher(): # Periodically reconcile set of containers try: while True: desired = {c.id: c for c in list_by_service(service_name, project)} # start missing for cid, cont in desired.items(): if cid not in active: task = asyncio.create_task(stream_container(cont)) active[cid] = task # cancel removed for cid in list(active.keys()): if cid not in desired: task = active.pop(cid) task.cancel() await asyncio.sleep(2.0) except asyncio.CancelledError: pass watch_task = asyncio.create_task(watcher()) try: # Keep ws open until disconnected; the tasks stream data while True: await asyncio.sleep(1.0) except WebSocketDisconnect: pass finally: watch_task.cancel() for t in active.values(): t.cancel() try: await ws.close() except Exception: pass # WebSocket: fan-in for multiple compose services (comma-separated), optional project filter. @app.websocket("/ws/fan_group") async def ws_fan_group(ws: WebSocket, services: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None, project: Optional[str] = None): await ws.accept() if not token or not verify_ws_token(token): await ws.send_text("ERROR: unauthorized") await ws.close(); return svc_set = {s.strip() for s in services.split(",") if s.strip()} if not svc_set: await ws.send_text("ERROR: no services provided") await ws.close(); return active = {} def list_by_services(names, project_name: Optional[str] = None): res = [] for c in docker_client.containers.list(all=True): lbl = c.labels or {} if lbl.get("com.docker.compose.service") in names and (project_name is None or lbl.get("com.docker.compose.project")==project_name): res.append(c) try: res.sort(key=lambda x: x.attrs.get("Created","")) except Exception: pass return res async def stream_container(cont): short = cont.id[:8] svc = (cont.labels or {}).get("com.docker.compose.service","") first_tail = tail while True: try: use_tail = first_tail first_tail = 0 stream = cont.logs(stream=True, follow=True, tail=(use_tail if use_tail>0 else "all")) for chunk in stream: if chunk is None: break line = chunk.decode(errors="ignore") try: await ws.send_text(f"[{short} {svc}] " + line) except RuntimeError: stream.close(); return stream.close() await asyncio.sleep(1.0) except WebSocketDisconnect: return except Exception: await asyncio.sleep(1.0) continue async def watcher(): try: while True: desired = {c.id: c for c in list_by_services(svc_set, project)} for cid, cont in desired.items(): if cid not in active: task = asyncio.create_task(stream_container(cont)) active[cid] = task for cid in list(active.keys()): if cid not in desired: task = active.pop(cid) task.cancel() await asyncio.sleep(2.0) except asyncio.CancelledError: pass watch_task = asyncio.create_task(watcher()) try: while True: await asyncio.sleep(1.0) except WebSocketDisconnect: pass finally: watch_task.cancel() for t in active.values(): t.cancel() try: await ws.close() except Exception: pass