#!/usr/bin/env python3 # -*- coding: utf-8 -*- import asyncio import base64 import os import re from typing import Optional, List, Dict import docker from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query, Depends, HTTPException, status, Body from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse from fastapi.security import HTTPBasic, HTTPBasicCredentials from fastapi.staticfiles import StaticFiles APP_PORT = int(os.getenv("LOGBOARD_PORT", "9001")) DEFAULT_TAIL = int(os.getenv("LOGBOARD_TAIL", "500")) BASIC_USER = os.getenv("LOGBOARD_USER", "admin") BASIC_PASS = os.getenv("LOGBOARD_PASS", "admin") DEFAULT_PROJECT = os.getenv("COMPOSE_PROJECT_NAME") # filter by compose project security = HTTPBasic() app = FastAPI(title="LogBoard+") # serve snapshots directory (downloadable files) SNAP_DIR = os.getenv("LOGBOARD_SNAPSHOT_DIR", "./snapshots") os.makedirs(SNAP_DIR, exist_ok=True) app.mount("/snapshots", StaticFiles(directory=SNAP_DIR), name="snapshots") docker_client = docker.from_env() # ---------- AUTH ---------- def check_basic(creds: HTTPBasicCredentials = Depends(security)): if creds.username == BASIC_USER and creds.password == BASIC_PASS: return creds raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials", headers={"WWW-Authenticate": "Basic"}) def token_from_creds(creds: HTTPBasicCredentials) -> str: return base64.b64encode(f"{creds.username}:{creds.password}".encode()).decode() def verify_ws_token(token: str) -> bool: try: raw = base64.b64decode(token.encode(), validate=True).decode() except Exception: return False return raw == f"{BASIC_USER}:{BASIC_PASS}" # ---------- DOCKER HELPERS ---------- def list_containers(project: Optional[str] = None, include_stopped: bool = False) -> List[Dict]: items = [] for c in docker_client.containers.list(all=include_stopped): labels = c.labels or {} proj = labels.get("com.docker.compose.project") svc = labels.get("com.docker.compose.service") or c.name if project and proj != project: continue items.append({ "id": c.id[:12], "name": c.name, "image": (c.image.tags[0] if c.image and c.image.tags else c.image.short_id), "status": c.status, "service": svc, "project": proj, }) items.sort(key=lambda x: (x.get("project") or "", x.get("service") or "", x.get("name") or "")) return items # ---------- HTML ---------- INDEX_PATH = os.getenv("LOGBOARD_INDEX_HTML", "./templates/index.html") def load_index_html(token: str) -> str: with open(INDEX_PATH, "r", encoding="utf-8") as f: html = f.read() return html.replace("__TOKEN__", token) # ---------- ROUTES ---------- @app.get("/", response_class=HTMLResponse) def index(creds: HTTPBasicCredentials = Depends(check_basic)): token = token_from_creds(creds) return HTMLResponse(load_index_html(token)) @app.get("/healthz", response_class=PlainTextResponse) def healthz(): return "ok" @app.get("/api/services") def api_services(project: Optional[str] = Query(None), include_stopped: bool = Query(False), _: HTTPBasicCredentials = Depends(check_basic)): proj = project or DEFAULT_PROJECT return JSONResponse(list_containers(project=proj, include_stopped=include_stopped)) @app.post("/api/snapshot") def api_snapshot( creds: HTTPBasicCredentials = Depends(check_basic), container_id: str = Body(..., embed=True), service: str = Body("", embed=True), content: str = Body("", embed=True), ): # Save posted content as a snapshot file safe_service = re.sub(r"[^a-zA-Z0-9_.-]+", "_", service or container_id[:12]) ts = os.getenv("TZ_TS") or "" from datetime import datetime stamp = datetime.now().strftime("%Y%m%d-%H%M%S") fname = f"{safe_service}-{stamp}.log" fpath = os.path.join(SNAP_DIR, fname) with open(fpath, "w", encoding="utf-8") as f: f.write(content) url = f"/snapshots/{fname}" return {"file": fname, "url": url} # WebSocket: verify token (?token=base64(user:pass)) # WebSocket: verify token (?token=base64(user:pass)). Supports auto-reconnect by service label. @app.websocket("/ws/logs/{container_id}") async def ws_logs(ws: WebSocket, container_id: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None, service: Optional[str] = None, project: Optional[str] = None): await ws.accept() if not token or not verify_ws_token(token): await ws.send_text("ERROR: unauthorized") await ws.close(); return def find_by_id_prefix(prefix: str): for c in docker_client.containers.list(all=True): if c.id.startswith(prefix): return c return None def find_by_service(service_name: str, project_name: Optional[str] = None): # pick the "newest" container of that compose service (optionally same project) found = [] for c in docker_client.containers.list(all=True): lbl = c.labels or {} if lbl.get("com.docker.compose.service") == service_name and (project_name is None or lbl.get("com.docker.compose.project")==project_name): found.append(c) if not found: return None # sort by Created desc try: found.sort(key=lambda x: x.attrs.get("Created",""), reverse=True) except Exception: pass return found[0] # initial resolve container = None svc_label = None proj_label = None # If service provided, prefer it for resolving container if service: container = find_by_service(service, project) if container is None: container = find_by_id_prefix(container_id) if container: lbls = container.labels or {} svc_label = service or lbls.get("com.docker.compose.service") proj_label = project or lbls.get("com.docker.compose.project") else: # if cannot resolve anything - and we do have service, try waiting a bit (maybe recreating) svc_label = service proj_label = project # streaming loop with reattach first_tail = tail try: while True: if container is None and svc_label: container = find_by_service(svc_label, proj_label) # if still none, wait and try again if container is None: try: await asyncio.sleep(1.0) continue except Exception: break if container is None: await ws.send_text("ERROR: container not found") break try: # On first attach use requested tail; on reattach use tail=0 to avoid duplicate backlog use_tail = first_tail first_tail = 0 stream = container.logs(stream=True, follow=True, tail=(use_tail if use_tail>0 else "all")) # stream loop for chunk in stream: if chunk is None: break try: await ws.send_text(chunk.decode(errors="ignore")) except RuntimeError: # client side closed stream.close() return # Normal EOF (container stopped or recreated). Try to re-resolve by service label. stream.close() # Re-resolve. If same ID and container stopped, wait; if new ID, reattach. old_id = container.id container = None # small backoff await asyncio.sleep(1.0) if svc_label: container = find_by_service(svc_label, proj_label) if container and container.id == old_id: # same container (probably stopped) — keep waiting until it comes back container = None await asyncio.sleep(1.0) continue # else: will loop and attach to new container continue else: # No service label -> break break except WebSocketDisconnect: break except Exception as e: try: await ws.send_text(f"ERROR: {e}") except Exception: pass # try re-resolve and continue container = None await asyncio.sleep(1.0) continue except WebSocketDisconnect: pass except Exception as e: try: await ws.send_text(f"ERROR: {e}") except Exception: pass finally: try: await ws.close() except Exception: pass if __name__ == "__main__": import uvicorn print(f"LogBoard+ http://0.0.0.0:{APP_PORT}") uvicorn.run(app, host="0.0.0.0", port=APP_PORT) # WebSocket: fan-in by compose service (aggregate all replicas), prefixing with short container id @app.websocket("/ws/fan/{service_name}") async def ws_fan(ws: WebSocket, service_name: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None, project: Optional[str] = None): await ws.accept() if not token or not verify_ws_token(token): await ws.send_text("ERROR: unauthorized") await ws.close(); return # Track active streaming tasks by container id active = {} def list_by_service(service_name: str, project_name: Optional[str] = None): found = [] for c in docker_client.containers.list(all=True): lbl = c.labels or {} if lbl.get("com.docker.compose.service") == service_name and (project_name is None or lbl.get("com.docker.compose.project")==project_name): found.append(c) # sort by Created asc so first lines look ordered-ish try: found.sort(key=lambda x: x.attrs.get("Created","")) except Exception: pass return found async def stream_container(cont): short = cont.id[:8] first_tail = tail while True: try: use_tail = first_tail first_tail = 0 stream = cont.logs(stream=True, follow=True, tail=(use_tail if use_tail>0 else "all")) for chunk in stream: if chunk is None: break try: await ws.send_text(f"[{short}] " + chunk.decode(errors="ignore")) except RuntimeError: stream.close(); return stream.close() # container stopped -> wait and try to find same id again; if gone, exit loop and outer watcher will reassign await asyncio.sleep(1.0) except WebSocketDisconnect: return except Exception: await asyncio.sleep(1.0) continue async def watcher(): # Periodically reconcile set of containers try: while True: desired = {c.id: c for c in list_by_service(service_name, project)} # start missing for cid, cont in desired.items(): if cid not in active: task = asyncio.create_task(stream_container(cont)) active[cid] = task # cancel removed for cid in list(active.keys()): if cid not in desired: task = active.pop(cid) task.cancel() await asyncio.sleep(2.0) except asyncio.CancelledError: pass watch_task = asyncio.create_task(watcher()) try: # Keep ws open until disconnected; the tasks stream data while True: await asyncio.sleep(1.0) except WebSocketDisconnect: pass finally: watch_task.cancel() for t in active.values(): t.cancel() try: await ws.close() except Exception: pass # WebSocket: fan-in for multiple compose services (comma-separated), optional project filter. @app.websocket("/ws/fan_group") async def ws_fan_group(ws: WebSocket, services: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None, project: Optional[str] = None): await ws.accept() if not token or not verify_ws_token(token): await ws.send_text("ERROR: unauthorized") await ws.close(); return svc_set = {s.strip() for s in services.split(",") if s.strip()} if not svc_set: await ws.send_text("ERROR: no services provided") await ws.close(); return active = {} def list_by_services(names, project_name: Optional[str] = None): res = [] for c in docker_client.containers.list(all=True): lbl = c.labels or {} if lbl.get("com.docker.compose.service") in names and (project_name is None or lbl.get("com.docker.compose.project")==project_name): res.append(c) try: res.sort(key=lambda x: x.attrs.get("Created","")) except Exception: pass return res async def stream_container(cont): short = cont.id[:8] svc = (cont.labels or {}).get("com.docker.compose.service","") first_tail = tail while True: try: use_tail = first_tail first_tail = 0 stream = cont.logs(stream=True, follow=True, tail=(use_tail if use_tail>0 else "all")) for chunk in stream: if chunk is None: break line = chunk.decode(errors="ignore") try: await ws.send_text(f"[{short} {svc}] " + line) except RuntimeError: stream.close(); return stream.close() await asyncio.sleep(1.0) except WebSocketDisconnect: return except Exception: await asyncio.sleep(1.0) continue async def watcher(): try: while True: desired = {c.id: c for c in list_by_services(svc_set, project)} for cid, cont in desired.items(): if cid not in active: task = asyncio.create_task(stream_container(cont)) active[cid] = task for cid in list(active.keys()): if cid not in desired: task = active.pop(cid) task.cancel() await asyncio.sleep(2.0) except asyncio.CancelledError: pass watch_task = asyncio.create_task(watcher()) try: while True: await asyncio.sleep(1.0) except WebSocketDisconnect: pass finally: watch_task.cancel() for t in active.values(): t.cancel() try: await ws.close() except Exception: pass