logboard/app.py
Сергей Антропов c74e5ec15e feat: добавлен Makefile для управления проектом и обновлен README.md
- Создан Makefile с командами для сборки, запуска, остановки, перезапуска и просмотра логов
- Добавлены команды: build, up, down, restart, logs, clean, status, shell, dev, rebuild
- Обновлен README.md с информацией об авторе и инструкциями по использованию Makefile
- Добавлена таблица команд Makefile для удобства пользователей
- Автор: Сергей Антропов (https://devops.org.ru)
2025-08-16 11:15:56 +03:00

415 lines
15 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import base64
import os
import re
from typing import Optional, List, Dict
import docker
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query, Depends, HTTPException, status, Body
from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from fastapi.staticfiles import StaticFiles
APP_PORT = int(os.getenv("LOGBOARD_PORT", "9001"))
DEFAULT_TAIL = int(os.getenv("LOGBOARD_TAIL", "500"))
BASIC_USER = os.getenv("LOGBOARD_USER", "admin")
BASIC_PASS = os.getenv("LOGBOARD_PASS", "admin")
DEFAULT_PROJECT = os.getenv("COMPOSE_PROJECT_NAME") # filter by compose project
security = HTTPBasic()
app = FastAPI(title="LogBoard+")
# serve snapshots directory (downloadable files)
SNAP_DIR = os.getenv("LOGBOARD_SNAPSHOT_DIR", "./snapshots")
os.makedirs(SNAP_DIR, exist_ok=True)
app.mount("/snapshots", StaticFiles(directory=SNAP_DIR), name="snapshots")
docker_client = docker.from_env()
# ---------- AUTH ----------
def check_basic(creds: HTTPBasicCredentials = Depends(security)):
if creds.username == BASIC_USER and creds.password == BASIC_PASS:
return creds
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials",
headers={"WWW-Authenticate": "Basic"})
def token_from_creds(creds: HTTPBasicCredentials) -> str:
return base64.b64encode(f"{creds.username}:{creds.password}".encode()).decode()
def verify_ws_token(token: str) -> bool:
try:
raw = base64.b64decode(token.encode(), validate=True).decode()
except Exception:
return False
return raw == f"{BASIC_USER}:{BASIC_PASS}"
# ---------- DOCKER HELPERS ----------
def list_containers(project: Optional[str] = None, include_stopped: bool = False) -> List[Dict]:
items = []
for c in docker_client.containers.list(all=include_stopped):
labels = c.labels or {}
proj = labels.get("com.docker.compose.project")
svc = labels.get("com.docker.compose.service") or c.name
if project and proj != project:
continue
items.append({
"id": c.id[:12],
"name": c.name,
"image": (c.image.tags[0] if c.image and c.image.tags else c.image.short_id),
"status": c.status,
"service": svc,
"project": proj,
})
items.sort(key=lambda x: (x.get("project") or "", x.get("service") or "", x.get("name") or ""))
return items
# ---------- HTML ----------
INDEX_PATH = os.getenv("LOGBOARD_INDEX_HTML", "./templates/index.html")
def load_index_html(token: str) -> str:
with open(INDEX_PATH, "r", encoding="utf-8") as f:
html = f.read()
return html.replace("__TOKEN__", token)
# ---------- ROUTES ----------
@app.get("/", response_class=HTMLResponse)
def index(creds: HTTPBasicCredentials = Depends(check_basic)):
token = token_from_creds(creds)
return HTMLResponse(load_index_html(token))
@app.get("/healthz", response_class=PlainTextResponse)
def healthz():
return "ok"
@app.get("/api/services")
def api_services(project: Optional[str] = Query(None), include_stopped: bool = Query(False),
_: HTTPBasicCredentials = Depends(check_basic)):
proj = project or DEFAULT_PROJECT
return JSONResponse(list_containers(project=proj, include_stopped=include_stopped))
@app.post("/api/snapshot")
def api_snapshot(
creds: HTTPBasicCredentials = Depends(check_basic),
container_id: str = Body(..., embed=True),
service: str = Body("", embed=True),
content: str = Body("", embed=True),
):
# Save posted content as a snapshot file
safe_service = re.sub(r"[^a-zA-Z0-9_.-]+", "_", service or container_id[:12])
ts = os.getenv("TZ_TS") or ""
from datetime import datetime
stamp = datetime.now().strftime("%Y%m%d-%H%M%S")
fname = f"{safe_service}-{stamp}.log"
fpath = os.path.join(SNAP_DIR, fname)
with open(fpath, "w", encoding="utf-8") as f:
f.write(content)
url = f"/snapshots/{fname}"
return {"file": fname, "url": url}
# WebSocket: verify token (?token=base64(user:pass))
# WebSocket: verify token (?token=base64(user:pass)). Supports auto-reconnect by service label.
@app.websocket("/ws/logs/{container_id}")
async def ws_logs(ws: WebSocket, container_id: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None,
service: Optional[str] = None, project: Optional[str] = None):
await ws.accept()
if not token or not verify_ws_token(token):
await ws.send_text("ERROR: unauthorized")
await ws.close(); return
def find_by_id_prefix(prefix: str):
for c in docker_client.containers.list(all=True):
if c.id.startswith(prefix):
return c
return None
def find_by_service(service_name: str, project_name: Optional[str] = None):
# pick the "newest" container of that compose service (optionally same project)
found = []
for c in docker_client.containers.list(all=True):
lbl = c.labels or {}
if lbl.get("com.docker.compose.service") == service_name and (project_name is None or lbl.get("com.docker.compose.project")==project_name):
found.append(c)
if not found:
return None
# sort by Created desc
try:
found.sort(key=lambda x: x.attrs.get("Created",""), reverse=True)
except Exception:
pass
return found[0]
# initial resolve
container = None
svc_label = None
proj_label = None
# If service provided, prefer it for resolving container
if service:
container = find_by_service(service, project)
if container is None:
container = find_by_id_prefix(container_id)
if container:
lbls = container.labels or {}
svc_label = service or lbls.get("com.docker.compose.service")
proj_label = project or lbls.get("com.docker.compose.project")
else:
# if cannot resolve anything - and we do have service, try waiting a bit (maybe recreating)
svc_label = service
proj_label = project
# streaming loop with reattach
first_tail = tail
try:
while True:
if container is None and svc_label:
container = find_by_service(svc_label, proj_label)
# if still none, wait and try again
if container is None:
try:
await asyncio.sleep(1.0)
continue
except Exception:
break
if container is None:
await ws.send_text("ERROR: container not found")
break
try:
# On first attach use requested tail; on reattach use tail=0 to avoid duplicate backlog
use_tail = first_tail
first_tail = 0
stream = container.logs(stream=True, follow=True, tail=(use_tail if use_tail>0 else "all"))
# stream loop
for chunk in stream:
if chunk is None:
break
try:
await ws.send_text(chunk.decode(errors="ignore"))
except RuntimeError:
# client side closed
stream.close()
return
# Normal EOF (container stopped or recreated). Try to re-resolve by service label.
stream.close()
# Re-resolve. If same ID and container stopped, wait; if new ID, reattach.
old_id = container.id
container = None
# small backoff
await asyncio.sleep(1.0)
if svc_label:
container = find_by_service(svc_label, proj_label)
if container and container.id == old_id:
# same container (probably stopped) — keep waiting until it comes back
container = None
await asyncio.sleep(1.0)
continue
# else: will loop and attach to new container
continue
else:
# No service label -> break
break
except WebSocketDisconnect:
break
except Exception as e:
try:
await ws.send_text(f"ERROR: {e}")
except Exception:
pass
# try re-resolve and continue
container = None
await asyncio.sleep(1.0)
continue
finally:
try:
await ws.close()
except Exception:
pass
try: await ws.close()
except Exception: pass
except WebSocketDisconnect:
pass
except Exception as e:
try: await ws.send_text(f"ERROR: {e}")
except Exception: pass
try: await ws.close()
except Exception: pass
if __name__ == "__main__":
import uvicorn
print(f"LogBoard+ http://0.0.0.0:{APP_PORT}")
uvicorn.run(app, host="0.0.0.0", port=APP_PORT)
# WebSocket: fan-in by compose service (aggregate all replicas), prefixing with short container id
@app.websocket("/ws/fan/{service_name}")
async def ws_fan(ws: WebSocket, service_name: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None,
project: Optional[str] = None):
await ws.accept()
if not token or not verify_ws_token(token):
await ws.send_text("ERROR: unauthorized")
await ws.close(); return
# Track active streaming tasks by container id
active = {}
def list_by_service(service_name: str, project_name: Optional[str] = None):
found = []
for c in docker_client.containers.list(all=True):
lbl = c.labels or {}
if lbl.get("com.docker.compose.service") == service_name and (project_name is None or lbl.get("com.docker.compose.project")==project_name):
found.append(c)
# sort by Created asc so first lines look ordered-ish
try:
found.sort(key=lambda x: x.attrs.get("Created",""))
except Exception:
pass
return found
async def stream_container(cont):
short = cont.id[:8]
first_tail = tail
while True:
try:
use_tail = first_tail
first_tail = 0
stream = cont.logs(stream=True, follow=True, tail=(use_tail if use_tail>0 else "all"))
for chunk in stream:
if chunk is None:
break
try:
await ws.send_text(f"[{short}] " + chunk.decode(errors="ignore"))
except RuntimeError:
stream.close(); return
stream.close()
# container stopped -> wait and try to find same id again; if gone, exit loop and outer watcher will reassign
await asyncio.sleep(1.0)
except WebSocketDisconnect:
return
except Exception:
await asyncio.sleep(1.0)
continue
async def watcher():
# Periodically reconcile set of containers
try:
while True:
desired = {c.id: c for c in list_by_service(service_name, project)}
# start missing
for cid, cont in desired.items():
if cid not in active:
task = asyncio.create_task(stream_container(cont))
active[cid] = task
# cancel removed
for cid in list(active.keys()):
if cid not in desired:
task = active.pop(cid)
task.cancel()
await asyncio.sleep(2.0)
except asyncio.CancelledError:
pass
watch_task = asyncio.create_task(watcher())
try:
# Keep ws open until disconnected; the tasks stream data
while True:
await asyncio.sleep(1.0)
except WebSocketDisconnect:
pass
finally:
watch_task.cancel()
for t in active.values():
t.cancel()
try:
await ws.close()
except Exception:
pass
# WebSocket: fan-in for multiple compose services (comma-separated), optional project filter.
@app.websocket("/ws/fan_group")
async def ws_fan_group(ws: WebSocket, services: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None,
project: Optional[str] = None):
await ws.accept()
if not token or not verify_ws_token(token):
await ws.send_text("ERROR: unauthorized")
await ws.close(); return
svc_set = {s.strip() for s in services.split(",") if s.strip()}
if not svc_set:
await ws.send_text("ERROR: no services provided")
await ws.close(); return
active = {}
def list_by_services(names, project_name: Optional[str] = None):
res = []
for c in docker_client.containers.list(all=True):
lbl = c.labels or {}
if lbl.get("com.docker.compose.service") in names and (project_name is None or lbl.get("com.docker.compose.project")==project_name):
res.append(c)
try:
res.sort(key=lambda x: x.attrs.get("Created",""))
except Exception:
pass
return res
async def stream_container(cont):
short = cont.id[:8]
svc = (cont.labels or {}).get("com.docker.compose.service","")
first_tail = tail
while True:
try:
use_tail = first_tail
first_tail = 0
stream = cont.logs(stream=True, follow=True, tail=(use_tail if use_tail>0 else "all"))
for chunk in stream:
if chunk is None:
break
line = chunk.decode(errors="ignore")
try:
await ws.send_text(f"[{short} {svc}] " + line)
except RuntimeError:
stream.close(); return
stream.close()
await asyncio.sleep(1.0)
except WebSocketDisconnect:
return
except Exception:
await asyncio.sleep(1.0)
continue
async def watcher():
try:
while True:
desired = {c.id: c for c in list_by_services(svc_set, project)}
for cid, cont in desired.items():
if cid not in active:
task = asyncio.create_task(stream_container(cont))
active[cid] = task
for cid in list(active.keys()):
if cid not in desired:
task = active.pop(cid)
task.cancel()
await asyncio.sleep(2.0)
except asyncio.CancelledError:
pass
watch_task = asyncio.create_task(watcher())
try:
while True:
await asyncio.sleep(1.0)
except WebSocketDisconnect:
pass
finally:
watch_task.cancel()
for t in active.values():
t.cancel()
try:
await ws.close()
except Exception:
pass