feat: добавлен Makefile для управления проектом и обновлен README.md
- Создан Makefile с командами для сборки, запуска, остановки, перезапуска и просмотра логов - Добавлены команды: build, up, down, restart, logs, clean, status, shell, dev, rebuild - Обновлен README.md с информацией об авторе и инструкциями по использованию Makefile - Добавлена таблица команд Makefile для удобства пользователей - Автор: Сергей Антропов (https://devops.org.ru)
This commit is contained in:
414
app.py
Normal file
414
app.py
Normal file
@@ -0,0 +1,414 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
import asyncio
|
||||
import base64
|
||||
import os
|
||||
import re
|
||||
from typing import Optional, List, Dict
|
||||
|
||||
import docker
|
||||
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query, Depends, HTTPException, status, Body
|
||||
from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse
|
||||
from fastapi.security import HTTPBasic, HTTPBasicCredentials
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
|
||||
APP_PORT = int(os.getenv("LOGBOARD_PORT", "9001"))
|
||||
DEFAULT_TAIL = int(os.getenv("LOGBOARD_TAIL", "500"))
|
||||
BASIC_USER = os.getenv("LOGBOARD_USER", "admin")
|
||||
BASIC_PASS = os.getenv("LOGBOARD_PASS", "admin")
|
||||
DEFAULT_PROJECT = os.getenv("COMPOSE_PROJECT_NAME") # filter by compose project
|
||||
|
||||
security = HTTPBasic()
|
||||
app = FastAPI(title="LogBoard+")
|
||||
|
||||
# serve snapshots directory (downloadable files)
|
||||
SNAP_DIR = os.getenv("LOGBOARD_SNAPSHOT_DIR", "./snapshots")
|
||||
os.makedirs(SNAP_DIR, exist_ok=True)
|
||||
app.mount("/snapshots", StaticFiles(directory=SNAP_DIR), name="snapshots")
|
||||
|
||||
docker_client = docker.from_env()
|
||||
|
||||
# ---------- AUTH ----------
|
||||
def check_basic(creds: HTTPBasicCredentials = Depends(security)):
|
||||
if creds.username == BASIC_USER and creds.password == BASIC_PASS:
|
||||
return creds
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Invalid credentials",
|
||||
headers={"WWW-Authenticate": "Basic"})
|
||||
|
||||
def token_from_creds(creds: HTTPBasicCredentials) -> str:
|
||||
return base64.b64encode(f"{creds.username}:{creds.password}".encode()).decode()
|
||||
|
||||
def verify_ws_token(token: str) -> bool:
|
||||
try:
|
||||
raw = base64.b64decode(token.encode(), validate=True).decode()
|
||||
except Exception:
|
||||
return False
|
||||
return raw == f"{BASIC_USER}:{BASIC_PASS}"
|
||||
|
||||
# ---------- DOCKER HELPERS ----------
|
||||
def list_containers(project: Optional[str] = None, include_stopped: bool = False) -> List[Dict]:
|
||||
items = []
|
||||
for c in docker_client.containers.list(all=include_stopped):
|
||||
labels = c.labels or {}
|
||||
proj = labels.get("com.docker.compose.project")
|
||||
svc = labels.get("com.docker.compose.service") or c.name
|
||||
if project and proj != project:
|
||||
continue
|
||||
items.append({
|
||||
"id": c.id[:12],
|
||||
"name": c.name,
|
||||
"image": (c.image.tags[0] if c.image and c.image.tags else c.image.short_id),
|
||||
"status": c.status,
|
||||
"service": svc,
|
||||
"project": proj,
|
||||
})
|
||||
items.sort(key=lambda x: (x.get("project") or "", x.get("service") or "", x.get("name") or ""))
|
||||
return items
|
||||
|
||||
# ---------- HTML ----------
|
||||
INDEX_PATH = os.getenv("LOGBOARD_INDEX_HTML", "./templates/index.html")
|
||||
def load_index_html(token: str) -> str:
|
||||
with open(INDEX_PATH, "r", encoding="utf-8") as f:
|
||||
html = f.read()
|
||||
return html.replace("__TOKEN__", token)
|
||||
|
||||
# ---------- ROUTES ----------
|
||||
@app.get("/", response_class=HTMLResponse)
|
||||
def index(creds: HTTPBasicCredentials = Depends(check_basic)):
|
||||
token = token_from_creds(creds)
|
||||
return HTMLResponse(load_index_html(token))
|
||||
|
||||
@app.get("/healthz", response_class=PlainTextResponse)
|
||||
def healthz():
|
||||
return "ok"
|
||||
|
||||
@app.get("/api/services")
|
||||
def api_services(project: Optional[str] = Query(None), include_stopped: bool = Query(False),
|
||||
_: HTTPBasicCredentials = Depends(check_basic)):
|
||||
proj = project or DEFAULT_PROJECT
|
||||
return JSONResponse(list_containers(project=proj, include_stopped=include_stopped))
|
||||
|
||||
@app.post("/api/snapshot")
|
||||
def api_snapshot(
|
||||
creds: HTTPBasicCredentials = Depends(check_basic),
|
||||
container_id: str = Body(..., embed=True),
|
||||
service: str = Body("", embed=True),
|
||||
content: str = Body("", embed=True),
|
||||
):
|
||||
# Save posted content as a snapshot file
|
||||
safe_service = re.sub(r"[^a-zA-Z0-9_.-]+", "_", service or container_id[:12])
|
||||
ts = os.getenv("TZ_TS") or ""
|
||||
from datetime import datetime
|
||||
stamp = datetime.now().strftime("%Y%m%d-%H%M%S")
|
||||
fname = f"{safe_service}-{stamp}.log"
|
||||
fpath = os.path.join(SNAP_DIR, fname)
|
||||
with open(fpath, "w", encoding="utf-8") as f:
|
||||
f.write(content)
|
||||
url = f"/snapshots/{fname}"
|
||||
return {"file": fname, "url": url}
|
||||
|
||||
# WebSocket: verify token (?token=base64(user:pass))
|
||||
|
||||
# WebSocket: verify token (?token=base64(user:pass)). Supports auto-reconnect by service label.
|
||||
@app.websocket("/ws/logs/{container_id}")
|
||||
async def ws_logs(ws: WebSocket, container_id: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None,
|
||||
service: Optional[str] = None, project: Optional[str] = None):
|
||||
await ws.accept()
|
||||
if not token or not verify_ws_token(token):
|
||||
await ws.send_text("ERROR: unauthorized")
|
||||
await ws.close(); return
|
||||
|
||||
def find_by_id_prefix(prefix: str):
|
||||
for c in docker_client.containers.list(all=True):
|
||||
if c.id.startswith(prefix):
|
||||
return c
|
||||
return None
|
||||
|
||||
def find_by_service(service_name: str, project_name: Optional[str] = None):
|
||||
# pick the "newest" container of that compose service (optionally same project)
|
||||
found = []
|
||||
for c in docker_client.containers.list(all=True):
|
||||
lbl = c.labels or {}
|
||||
if lbl.get("com.docker.compose.service") == service_name and (project_name is None or lbl.get("com.docker.compose.project")==project_name):
|
||||
found.append(c)
|
||||
if not found:
|
||||
return None
|
||||
# sort by Created desc
|
||||
try:
|
||||
found.sort(key=lambda x: x.attrs.get("Created",""), reverse=True)
|
||||
except Exception:
|
||||
pass
|
||||
return found[0]
|
||||
|
||||
# initial resolve
|
||||
container = None
|
||||
svc_label = None
|
||||
proj_label = None
|
||||
|
||||
# If service provided, prefer it for resolving container
|
||||
if service:
|
||||
container = find_by_service(service, project)
|
||||
if container is None:
|
||||
container = find_by_id_prefix(container_id)
|
||||
|
||||
if container:
|
||||
lbls = container.labels or {}
|
||||
svc_label = service or lbls.get("com.docker.compose.service")
|
||||
proj_label = project or lbls.get("com.docker.compose.project")
|
||||
else:
|
||||
# if cannot resolve anything - and we do have service, try waiting a bit (maybe recreating)
|
||||
svc_label = service
|
||||
proj_label = project
|
||||
|
||||
# streaming loop with reattach
|
||||
first_tail = tail
|
||||
try:
|
||||
while True:
|
||||
if container is None and svc_label:
|
||||
container = find_by_service(svc_label, proj_label)
|
||||
# if still none, wait and try again
|
||||
if container is None:
|
||||
try:
|
||||
await asyncio.sleep(1.0)
|
||||
continue
|
||||
except Exception:
|
||||
break
|
||||
if container is None:
|
||||
await ws.send_text("ERROR: container not found")
|
||||
break
|
||||
|
||||
try:
|
||||
# On first attach use requested tail; on reattach use tail=0 to avoid duplicate backlog
|
||||
use_tail = first_tail
|
||||
first_tail = 0
|
||||
stream = container.logs(stream=True, follow=True, tail=(use_tail if use_tail>0 else "all"))
|
||||
# stream loop
|
||||
for chunk in stream:
|
||||
if chunk is None:
|
||||
break
|
||||
try:
|
||||
await ws.send_text(chunk.decode(errors="ignore"))
|
||||
except RuntimeError:
|
||||
# client side closed
|
||||
stream.close()
|
||||
return
|
||||
# Normal EOF (container stopped or recreated). Try to re-resolve by service label.
|
||||
stream.close()
|
||||
# Re-resolve. If same ID and container stopped, wait; if new ID, reattach.
|
||||
old_id = container.id
|
||||
container = None
|
||||
# small backoff
|
||||
await asyncio.sleep(1.0)
|
||||
if svc_label:
|
||||
container = find_by_service(svc_label, proj_label)
|
||||
if container and container.id == old_id:
|
||||
# same container (probably stopped) — keep waiting until it comes back
|
||||
container = None
|
||||
await asyncio.sleep(1.0)
|
||||
continue
|
||||
# else: will loop and attach to new container
|
||||
continue
|
||||
else:
|
||||
# No service label -> break
|
||||
break
|
||||
except WebSocketDisconnect:
|
||||
break
|
||||
except Exception as e:
|
||||
try:
|
||||
await ws.send_text(f"ERROR: {e}")
|
||||
except Exception:
|
||||
pass
|
||||
# try re-resolve and continue
|
||||
container = None
|
||||
await asyncio.sleep(1.0)
|
||||
continue
|
||||
finally:
|
||||
try:
|
||||
await ws.close()
|
||||
except Exception:
|
||||
pass
|
||||
try: await ws.close()
|
||||
except Exception: pass
|
||||
except WebSocketDisconnect:
|
||||
pass
|
||||
except Exception as e:
|
||||
try: await ws.send_text(f"ERROR: {e}")
|
||||
except Exception: pass
|
||||
try: await ws.close()
|
||||
except Exception: pass
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
print(f"LogBoard+ http://0.0.0.0:{APP_PORT}")
|
||||
uvicorn.run(app, host="0.0.0.0", port=APP_PORT)
|
||||
|
||||
|
||||
# WebSocket: fan-in by compose service (aggregate all replicas), prefixing with short container id
|
||||
@app.websocket("/ws/fan/{service_name}")
|
||||
async def ws_fan(ws: WebSocket, service_name: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None,
|
||||
project: Optional[str] = None):
|
||||
await ws.accept()
|
||||
if not token or not verify_ws_token(token):
|
||||
await ws.send_text("ERROR: unauthorized")
|
||||
await ws.close(); return
|
||||
|
||||
# Track active streaming tasks by container id
|
||||
active = {}
|
||||
|
||||
def list_by_service(service_name: str, project_name: Optional[str] = None):
|
||||
found = []
|
||||
for c in docker_client.containers.list(all=True):
|
||||
lbl = c.labels or {}
|
||||
if lbl.get("com.docker.compose.service") == service_name and (project_name is None or lbl.get("com.docker.compose.project")==project_name):
|
||||
found.append(c)
|
||||
# sort by Created asc so first lines look ordered-ish
|
||||
try:
|
||||
found.sort(key=lambda x: x.attrs.get("Created",""))
|
||||
except Exception:
|
||||
pass
|
||||
return found
|
||||
|
||||
async def stream_container(cont):
|
||||
short = cont.id[:8]
|
||||
first_tail = tail
|
||||
while True:
|
||||
try:
|
||||
use_tail = first_tail
|
||||
first_tail = 0
|
||||
stream = cont.logs(stream=True, follow=True, tail=(use_tail if use_tail>0 else "all"))
|
||||
for chunk in stream:
|
||||
if chunk is None:
|
||||
break
|
||||
try:
|
||||
await ws.send_text(f"[{short}] " + chunk.decode(errors="ignore"))
|
||||
except RuntimeError:
|
||||
stream.close(); return
|
||||
stream.close()
|
||||
# container stopped -> wait and try to find same id again; if gone, exit loop and outer watcher will reassign
|
||||
await asyncio.sleep(1.0)
|
||||
except WebSocketDisconnect:
|
||||
return
|
||||
except Exception:
|
||||
await asyncio.sleep(1.0)
|
||||
continue
|
||||
|
||||
async def watcher():
|
||||
# Periodically reconcile set of containers
|
||||
try:
|
||||
while True:
|
||||
desired = {c.id: c for c in list_by_service(service_name, project)}
|
||||
# start missing
|
||||
for cid, cont in desired.items():
|
||||
if cid not in active:
|
||||
task = asyncio.create_task(stream_container(cont))
|
||||
active[cid] = task
|
||||
# cancel removed
|
||||
for cid in list(active.keys()):
|
||||
if cid not in desired:
|
||||
task = active.pop(cid)
|
||||
task.cancel()
|
||||
await asyncio.sleep(2.0)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
watch_task = asyncio.create_task(watcher())
|
||||
try:
|
||||
# Keep ws open until disconnected; the tasks stream data
|
||||
while True:
|
||||
await asyncio.sleep(1.0)
|
||||
except WebSocketDisconnect:
|
||||
pass
|
||||
finally:
|
||||
watch_task.cancel()
|
||||
for t in active.values():
|
||||
t.cancel()
|
||||
try:
|
||||
await ws.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
# WebSocket: fan-in for multiple compose services (comma-separated), optional project filter.
|
||||
@app.websocket("/ws/fan_group")
|
||||
async def ws_fan_group(ws: WebSocket, services: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None,
|
||||
project: Optional[str] = None):
|
||||
await ws.accept()
|
||||
if not token or not verify_ws_token(token):
|
||||
await ws.send_text("ERROR: unauthorized")
|
||||
await ws.close(); return
|
||||
|
||||
svc_set = {s.strip() for s in services.split(",") if s.strip()}
|
||||
if not svc_set:
|
||||
await ws.send_text("ERROR: no services provided")
|
||||
await ws.close(); return
|
||||
|
||||
active = {}
|
||||
|
||||
def list_by_services(names, project_name: Optional[str] = None):
|
||||
res = []
|
||||
for c in docker_client.containers.list(all=True):
|
||||
lbl = c.labels or {}
|
||||
if lbl.get("com.docker.compose.service") in names and (project_name is None or lbl.get("com.docker.compose.project")==project_name):
|
||||
res.append(c)
|
||||
try:
|
||||
res.sort(key=lambda x: x.attrs.get("Created",""))
|
||||
except Exception:
|
||||
pass
|
||||
return res
|
||||
|
||||
async def stream_container(cont):
|
||||
short = cont.id[:8]
|
||||
svc = (cont.labels or {}).get("com.docker.compose.service","")
|
||||
first_tail = tail
|
||||
while True:
|
||||
try:
|
||||
use_tail = first_tail
|
||||
first_tail = 0
|
||||
stream = cont.logs(stream=True, follow=True, tail=(use_tail if use_tail>0 else "all"))
|
||||
for chunk in stream:
|
||||
if chunk is None:
|
||||
break
|
||||
line = chunk.decode(errors="ignore")
|
||||
try:
|
||||
await ws.send_text(f"[{short} {svc}] " + line)
|
||||
except RuntimeError:
|
||||
stream.close(); return
|
||||
stream.close()
|
||||
await asyncio.sleep(1.0)
|
||||
except WebSocketDisconnect:
|
||||
return
|
||||
except Exception:
|
||||
await asyncio.sleep(1.0)
|
||||
continue
|
||||
|
||||
async def watcher():
|
||||
try:
|
||||
while True:
|
||||
desired = {c.id: c for c in list_by_services(svc_set, project)}
|
||||
for cid, cont in desired.items():
|
||||
if cid not in active:
|
||||
task = asyncio.create_task(stream_container(cont))
|
||||
active[cid] = task
|
||||
for cid in list(active.keys()):
|
||||
if cid not in desired:
|
||||
task = active.pop(cid)
|
||||
task.cancel()
|
||||
await asyncio.sleep(2.0)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
watch_task = asyncio.create_task(watcher())
|
||||
try:
|
||||
while True:
|
||||
await asyncio.sleep(1.0)
|
||||
except WebSocketDisconnect:
|
||||
pass
|
||||
finally:
|
||||
watch_task.cancel()
|
||||
for t in active.values():
|
||||
t.cancel()
|
||||
try:
|
||||
await ws.close()
|
||||
except Exception:
|
||||
pass
|
||||
Reference in New Issue
Block a user