- Удален Basic Auth, заменен на современную JWT авторизацию - Добавлена страница входа с красивым интерфейсом - Обновлен фронтенд для работы с JWT токенами - Добавлены новые зависимости: PyJWT, passlib[bcrypt], jinja2 - Создан тестовый скрипт для проверки авторизации - Добавлено руководство по миграции - Обновлена документация и README - Улучшен дизайн поля ввода пароля на странице входа Автор: Сергей Антропов Сайт: https://devops.org.ru
827 lines
32 KiB
Python
827 lines
32 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
LogBoard+ - Веб-панель для просмотра логов микросервисов
|
||
Автор: Сергей Антропов
|
||
Сайт: https://devops.org.ru
|
||
"""
|
||
|
||
import asyncio
|
||
import base64
|
||
import json
|
||
import os
|
||
import re
|
||
import secrets
|
||
from datetime import datetime, timedelta
|
||
from typing import Optional, List, Dict, Union
|
||
from pathlib import Path
|
||
|
||
import docker
|
||
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query, Depends, HTTPException, status, Body, Request, Response
|
||
from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse, RedirectResponse
|
||
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
||
from fastapi.staticfiles import StaticFiles
|
||
from fastapi.templating import Jinja2Templates
|
||
from pydantic import BaseModel
|
||
import jwt
|
||
from passlib.context import CryptContext
|
||
|
||
# Настройки приложения
|
||
APP_PORT = int(os.getenv("LOGBOARD_PORT", "9001"))
|
||
DEFAULT_TAIL = int(os.getenv("LOGBOARD_TAIL", "500"))
|
||
DEFAULT_PROJECT = os.getenv("COMPOSE_PROJECT_NAME")
|
||
DEFAULT_PROJECTS = os.getenv("LOGBOARD_PROJECTS")
|
||
SKIP_UNHEALTHY = os.getenv("LOGBOARD_SKIP_UNHEALTHY", "true").lower() == "true"
|
||
CONTAINER_LIST_TIMEOUT = int(os.getenv("LOGBOARD_CONTAINER_LIST_TIMEOUT", "10"))
|
||
CONTAINER_INFO_TIMEOUT = int(os.getenv("LOGBOARD_CONTAINER_INFO_TIMEOUT", "3"))
|
||
HEALTH_CHECK_TIMEOUT = int(os.getenv("LOGBOARD_HEALTH_CHECK_TIMEOUT", "2"))
|
||
|
||
# Настройки безопасности
|
||
SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key-here-change-in-production")
|
||
ALGORITHM = "HS256"
|
||
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("SESSION_TIMEOUT", "3600")) // 60 # 1 час по умолчанию
|
||
|
||
# Настройки пользователей
|
||
ADMIN_USERNAME = os.getenv("LOGBOARD_USER", "admin")
|
||
ADMIN_PASSWORD = os.getenv("LOGBOARD_PASS", "admin")
|
||
|
||
# Инициализация FastAPI
|
||
app = FastAPI(
|
||
title="LogBoard+",
|
||
description="Веб-панель для просмотра логов микросервисов",
|
||
version="1.0.0"
|
||
)
|
||
|
||
# Инициализация шаблонов
|
||
templates = Jinja2Templates(directory="templates")
|
||
|
||
# Инициализация безопасности
|
||
security = HTTPBearer()
|
||
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
||
|
||
# Инициализация Docker клиента
|
||
docker_client = docker.from_env()
|
||
|
||
# serve snapshots directory (downloadable files)
|
||
SNAP_DIR = os.getenv("LOGBOARD_SNAPSHOT_DIR", "./snapshots")
|
||
os.makedirs(SNAP_DIR, exist_ok=True)
|
||
app.mount("/snapshots", StaticFiles(directory=SNAP_DIR), name="snapshots")
|
||
|
||
# Модели данных
|
||
class UserLogin(BaseModel):
|
||
username: str
|
||
password: str
|
||
|
||
class Token(BaseModel):
|
||
access_token: str
|
||
token_type: str
|
||
|
||
class TokenData(BaseModel):
|
||
username: Optional[str] = None
|
||
|
||
# Функции для работы с паролями
|
||
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
||
"""Проверяет пароль"""
|
||
return pwd_context.verify(plain_password, hashed_password)
|
||
|
||
def get_password_hash(password: str) -> str:
|
||
"""Хеширует пароль"""
|
||
return pwd_context.hash(password)
|
||
|
||
# Функции для работы с JWT токенами
|
||
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
|
||
"""Создает JWT токен"""
|
||
to_encode = data.copy()
|
||
if expires_delta:
|
||
expire = datetime.utcnow() + expires_delta
|
||
else:
|
||
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
||
to_encode.update({"exp": expire})
|
||
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
||
return encoded_jwt
|
||
|
||
def verify_token(token: str) -> Optional[str]:
|
||
"""Проверяет JWT токен и возвращает имя пользователя"""
|
||
try:
|
||
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
||
username: str = payload.get("sub")
|
||
if username is None:
|
||
return None
|
||
return username
|
||
except jwt.PyJWTError:
|
||
return None
|
||
|
||
# Функция для проверки аутентификации
|
||
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str:
|
||
"""Получает текущего пользователя из токена"""
|
||
token = credentials.credentials
|
||
username = verify_token(token)
|
||
if username is None:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="Недействительный токен",
|
||
headers={"WWW-Authenticate": "Bearer"},
|
||
)
|
||
return username
|
||
|
||
# Функция для проверки пользователя
|
||
def authenticate_user(username: str, password: str) -> bool:
|
||
"""Аутентифицирует пользователя"""
|
||
if username == ADMIN_USERNAME:
|
||
# Для простоты используем прямое сравнение паролей
|
||
# В продакшене рекомендуется использовать хешированные пароли
|
||
return password == ADMIN_PASSWORD
|
||
return False
|
||
|
||
# ---------- DOCKER HELPERS ----------
|
||
def load_excluded_containers() -> List[str]:
|
||
"""
|
||
Загружает список исключенных контейнеров из JSON файла
|
||
Автор: Сергей Антропов
|
||
Сайт: https://devops.org.ru
|
||
"""
|
||
try:
|
||
with open("excluded_containers.json", "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
return data.get("excluded_containers", [])
|
||
except FileNotFoundError:
|
||
print("⚠️ Файл excluded_containers.json не найден, используем пустой список")
|
||
return []
|
||
except json.JSONDecodeError as e:
|
||
print(f"❌ Ошибка парсинга excluded_containers.json: {e}")
|
||
return []
|
||
except Exception as e:
|
||
print(f"❌ Ошибка загрузки excluded_containers.json: {e}")
|
||
return []
|
||
|
||
def save_excluded_containers(containers: List[str]) -> bool:
|
||
"""
|
||
Сохраняет список исключенных контейнеров в JSON файл
|
||
Автор: Сергей Антропов
|
||
Сайт: https://devops.org.ru
|
||
"""
|
||
try:
|
||
data = {
|
||
"excluded_containers": containers,
|
||
"description": "Список контейнеров, которые генерируют слишком много логов и исключаются из отображения"
|
||
}
|
||
with open("excluded_containers.json", "w", encoding="utf-8") as f:
|
||
json.dump(data, f, indent=2, ensure_ascii=False)
|
||
return True
|
||
except Exception as e:
|
||
print(f"❌ Ошибка сохранения excluded_containers.json: {e}")
|
||
return False
|
||
|
||
def get_all_projects() -> List[str]:
|
||
"""
|
||
Получает список всех проектов Docker Compose с учетом исключенных контейнеров
|
||
Автор: Сергей Антропов
|
||
Сайт: https://devops.org.ru
|
||
"""
|
||
projects = set()
|
||
excluded_containers = load_excluded_containers()
|
||
|
||
try:
|
||
containers = docker_client.containers.list(all=True)
|
||
|
||
# Словарь для подсчета контейнеров по проектам
|
||
project_containers = {}
|
||
standalone_containers = []
|
||
|
||
for c in containers:
|
||
try:
|
||
# Пропускаем исключенные контейнеры
|
||
if c.name in excluded_containers:
|
||
continue
|
||
|
||
labels = c.labels or {}
|
||
project = labels.get("com.docker.compose.project")
|
||
|
||
if project:
|
||
if project not in project_containers:
|
||
project_containers[project] = 0
|
||
project_containers[project] += 1
|
||
else:
|
||
standalone_containers.append(c.name)
|
||
|
||
except Exception:
|
||
continue
|
||
|
||
# Добавляем проекты, у которых есть хотя бы один неисключенный контейнер
|
||
for project, count in project_containers.items():
|
||
if count > 0:
|
||
projects.add(project)
|
||
|
||
# Добавляем standalone, если есть неисключенные контейнеры без проекта
|
||
if standalone_containers:
|
||
projects.add("standalone")
|
||
|
||
except Exception as e:
|
||
print(f"❌ Ошибка получения списка проектов: {e}")
|
||
return []
|
||
|
||
result = sorted(list(projects))
|
||
print(f"📋 Доступные проекты (с учетом исключенных контейнеров): {result}")
|
||
return result
|
||
|
||
def list_containers(projects: Optional[List[str]] = None, include_stopped: bool = False) -> List[Dict]:
|
||
"""
|
||
Получает список контейнеров с поддержкой множественных проектов
|
||
Автор: Сергей Антропов
|
||
Сайт: https://devops.org.ru
|
||
"""
|
||
# Загружаем список исключенных контейнеров из JSON файла
|
||
excluded_containers = load_excluded_containers()
|
||
|
||
print(f"🚫 Список исключенных контейнеров: {excluded_containers}")
|
||
|
||
items = []
|
||
excluded_count = 0
|
||
|
||
try:
|
||
# Получаем список контейнеров с базовой обработкой ошибок
|
||
containers = docker_client.containers.list(all=include_stopped)
|
||
|
||
for c in containers:
|
||
try:
|
||
# Базовая информация о контейнере (без health check)
|
||
basic_info = {
|
||
"id": c.id[:12],
|
||
"name": c.name,
|
||
"status": c.status,
|
||
"image": "unknown",
|
||
"service": c.name,
|
||
"project": None,
|
||
"health": None,
|
||
"ports": [],
|
||
"url": None,
|
||
}
|
||
|
||
# Безопасно получаем метки
|
||
try:
|
||
labels = c.labels or {}
|
||
basic_info["project"] = labels.get("com.docker.compose.project")
|
||
basic_info["service"] = labels.get("com.docker.compose.service") or c.name
|
||
except Exception:
|
||
pass # Используем значения по умолчанию
|
||
|
||
# Безопасно получаем информацию об образе
|
||
try:
|
||
if c.image and c.image.tags:
|
||
basic_info["image"] = c.image.tags[0]
|
||
elif c.image:
|
||
basic_info["image"] = c.image.short_id
|
||
except Exception:
|
||
pass # Оставляем "unknown"
|
||
|
||
# Безопасно получаем информацию о портах
|
||
try:
|
||
ports = c.ports or {}
|
||
if ports:
|
||
basic_info["ports"] = list(ports.keys())
|
||
|
||
# Пытаемся найти HTTP/HTTPS порт для создания URL
|
||
for port_mapping in ports.values():
|
||
if port_mapping:
|
||
for mapping in port_mapping:
|
||
if isinstance(mapping, dict) and mapping.get("HostPort"):
|
||
host_port = mapping["HostPort"]
|
||
# Проверяем, что это HTTP порт (80, 443, 8080, 3000, etc.)
|
||
host_port_int = int(host_port)
|
||
if (host_port_int in [80, 443] or
|
||
(3000 <= host_port_int <= 4000) or
|
||
(8000 <= host_port_int <= 9000)):
|
||
protocol = "https" if host_port == "443" else "http"
|
||
basic_info["url"] = f"{protocol}://localhost:{host_port}"
|
||
break
|
||
if basic_info["url"]:
|
||
break
|
||
except Exception:
|
||
pass # Оставляем пустые значения
|
||
|
||
# Фильтрация по проектам
|
||
if projects:
|
||
# Если проект не указан, считаем его standalone
|
||
container_project = basic_info["project"] or "standalone"
|
||
if container_project not in projects:
|
||
continue
|
||
|
||
# Фильтрация исключенных контейнеров
|
||
if basic_info["name"] in excluded_containers:
|
||
excluded_count += 1
|
||
print(f"⚠️ Пропускаем исключенный контейнер: {basic_info['name']}")
|
||
continue
|
||
|
||
# Добавляем контейнер в список
|
||
items.append(basic_info)
|
||
|
||
except Exception as e:
|
||
# Пропускаем контейнеры с критическими ошибками
|
||
print(f"⚠️ Пропускаем проблемный контейнер {c.name if hasattr(c, 'name') else 'unknown'} (ID: {c.id[:12]}): {e}")
|
||
continue
|
||
|
||
except Exception as e:
|
||
print(f"❌ Ошибка получения списка контейнеров: {e}")
|
||
return []
|
||
|
||
# Сортируем по проекту, сервису и имени
|
||
items.sort(key=lambda x: (x.get("project") or "", x.get("service") or "", x.get("name") or ""))
|
||
|
||
# Подсчитываем статистику по проектам
|
||
project_stats = {}
|
||
for item in items:
|
||
project = item.get("project") or "standalone"
|
||
if project not in project_stats:
|
||
project_stats[project] = {"visible": 0, "excluded": 0}
|
||
project_stats[project]["visible"] += 1
|
||
|
||
# Подсчитываем исключенные контейнеры по проектам
|
||
for c in containers:
|
||
try:
|
||
if c.name in excluded_containers:
|
||
labels = c.labels or {}
|
||
project = labels.get("com.docker.compose.project") or "standalone"
|
||
if project not in project_stats:
|
||
project_stats[project] = {"visible": 0, "excluded": 0}
|
||
project_stats[project]["excluded"] += 1
|
||
except Exception:
|
||
continue
|
||
|
||
print(f"📊 Статистика: найдено {len(items)} контейнеров, исключено {excluded_count} контейнеров")
|
||
for project, stats in project_stats.items():
|
||
print(f" 📦 {project}: {stats['visible']} видимых, {stats['excluded']} исключенных")
|
||
|
||
return items
|
||
|
||
# ---------- HTML ----------
|
||
INDEX_PATH = os.getenv("LOGBOARD_INDEX_HTML", "./templates/index.html")
|
||
def load_index_html() -> str:
|
||
"""Загружает HTML шаблон главной страницы"""
|
||
with open(INDEX_PATH, "r", encoding="utf-8") as f:
|
||
return f.read()
|
||
|
||
def load_login_html() -> str:
|
||
"""Загружает HTML шаблон страницы входа"""
|
||
login_path = "./templates/login.html"
|
||
with open(login_path, "r", encoding="utf-8") as f:
|
||
return f.read()
|
||
|
||
# ---------- ROUTES ----------
|
||
@app.get("/", response_class=HTMLResponse)
|
||
async def index(request: Request):
|
||
"""Главная страница приложения"""
|
||
# Проверяем наличие токена в cookie
|
||
access_token = request.cookies.get("access_token")
|
||
if not access_token:
|
||
return RedirectResponse(url="/login")
|
||
|
||
# Проверяем валидность токена
|
||
username = verify_token(access_token)
|
||
if not username:
|
||
return RedirectResponse(url="/login")
|
||
|
||
return templates.TemplateResponse("index.html", {"request": request})
|
||
|
||
@app.get("/login", response_class=HTMLResponse)
|
||
async def login_page(request: Request):
|
||
"""Страница входа"""
|
||
return templates.TemplateResponse("login.html", {"request": request})
|
||
|
||
@app.post("/api/auth/login", response_model=Token)
|
||
async def login(user_data: UserLogin, response: Response):
|
||
"""API для входа в систему"""
|
||
if authenticate_user(user_data.username, user_data.password):
|
||
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
||
access_token = create_access_token(
|
||
data={"sub": user_data.username}, expires_delta=access_token_expires
|
||
)
|
||
|
||
# Устанавливаем cookie с токеном
|
||
response.set_cookie(
|
||
key="access_token",
|
||
value=access_token,
|
||
httponly=True,
|
||
secure=False, # Установите True для HTTPS
|
||
samesite="lax",
|
||
max_age=ACCESS_TOKEN_EXPIRE_MINUTES * 60
|
||
)
|
||
|
||
return {"access_token": access_token, "token_type": "bearer"}
|
||
else:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="Неверное имя пользователя или пароль",
|
||
headers={"WWW-Authenticate": "Bearer"},
|
||
)
|
||
|
||
@app.post("/api/auth/logout")
|
||
async def logout(response: Response):
|
||
"""API для выхода из системы"""
|
||
response.delete_cookie(key="access_token")
|
||
return {"message": "Успешный выход из системы"}
|
||
|
||
@app.get("/api/auth/me")
|
||
async def get_current_user_info(current_user: str = Depends(get_current_user)):
|
||
"""Получить информацию о текущем пользователе"""
|
||
return {"username": current_user}
|
||
|
||
@app.get("/healthz", response_class=PlainTextResponse)
|
||
def healthz():
|
||
"""Health check endpoint"""
|
||
return "ok"
|
||
|
||
@app.get("/api/logs/stats/{container_id}")
|
||
def api_logs_stats(container_id: str, current_user: str = Depends(get_current_user)):
|
||
"""Получить статистику логов контейнера"""
|
||
try:
|
||
# Ищем контейнер
|
||
container = None
|
||
for c in docker_client.containers.list(all=True):
|
||
if c.id.startswith(container_id):
|
||
container = c
|
||
break
|
||
|
||
if container is None:
|
||
return JSONResponse({"error": "Container not found"}, status_code=404)
|
||
|
||
# Получаем логи
|
||
logs = container.logs(tail=1000).decode(errors="ignore")
|
||
|
||
# Подсчитываем статистику
|
||
stats = {"debug": 0, "info": 0, "warn": 0, "error": 0}
|
||
|
||
for line in logs.split('\n'):
|
||
if not line.strip():
|
||
continue
|
||
|
||
line_lower = line.lower()
|
||
if 'level=debug' in line_lower or 'debug' in line_lower:
|
||
stats["debug"] += 1
|
||
elif 'level=info' in line_lower or 'info' in line_lower:
|
||
stats["info"] += 1
|
||
elif 'level=warning' in line_lower or 'level=warn' in line_lower or 'warning' in line_lower or 'warn' in line_lower:
|
||
stats["warn"] += 1
|
||
elif 'level=error' in line_lower or 'error' in line_lower:
|
||
stats["error"] += 1
|
||
|
||
return JSONResponse(
|
||
content=stats,
|
||
headers={
|
||
"Cache-Control": "no-cache, no-store, must-revalidate",
|
||
"Pragma": "no-cache",
|
||
"Expires": "0"
|
||
}
|
||
)
|
||
|
||
except Exception as e:
|
||
print(f"Error getting log stats for {container_id}: {e}")
|
||
return JSONResponse({"error": str(e)}, status_code=500)
|
||
|
||
@app.get("/api/excluded-containers")
|
||
def api_get_excluded_containers(current_user: str = Depends(get_current_user)):
|
||
"""Получить список исключенных контейнеров"""
|
||
return JSONResponse(
|
||
content={"excluded_containers": load_excluded_containers()},
|
||
headers={
|
||
"Cache-Control": "no-cache, no-store, must-revalidate",
|
||
"Pragma": "no-cache",
|
||
"Expires": "0"
|
||
}
|
||
)
|
||
|
||
@app.post("/api/excluded-containers")
|
||
def api_update_excluded_containers(
|
||
containers: List[str] = Body(...),
|
||
current_user: str = Depends(get_current_user)
|
||
):
|
||
"""Обновить список исключенных контейнеров"""
|
||
success = save_excluded_containers(containers)
|
||
if success:
|
||
return JSONResponse(
|
||
content={"status": "success", "message": "Список исключенных контейнеров обновлен"},
|
||
headers={
|
||
"Cache-Control": "no-cache, no-store, must-revalidate",
|
||
"Pragma": "no-cache",
|
||
"Expires": "0"
|
||
}
|
||
)
|
||
else:
|
||
raise HTTPException(status_code=500, detail="Ошибка сохранения списка")
|
||
|
||
@app.get("/api/projects")
|
||
def api_projects(current_user: str = Depends(get_current_user)):
|
||
"""Получить список всех проектов Docker Compose"""
|
||
return JSONResponse(
|
||
content=get_all_projects(),
|
||
headers={
|
||
"Cache-Control": "no-cache, no-store, must-revalidate",
|
||
"Pragma": "no-cache",
|
||
"Expires": "0"
|
||
}
|
||
)
|
||
|
||
@app.get("/api/services")
|
||
def api_services(
|
||
projects: Optional[str] = Query(None),
|
||
include_stopped: bool = Query(False),
|
||
current_user: str = Depends(get_current_user)
|
||
):
|
||
"""Получить список контейнеров с поддержкой множественных проектов"""
|
||
project_list = None
|
||
if projects:
|
||
project_list = [p.strip() for p in projects.split(",") if p.strip()]
|
||
elif DEFAULT_PROJECTS:
|
||
project_list = [p.strip() for p in DEFAULT_PROJECTS.split(",") if p.strip()]
|
||
elif DEFAULT_PROJECT:
|
||
project_list = [DEFAULT_PROJECT]
|
||
|
||
return JSONResponse(
|
||
content=list_containers(projects=project_list, include_stopped=include_stopped),
|
||
headers={
|
||
"Cache-Control": "no-cache, no-store, must-revalidate",
|
||
"Pragma": "no-cache",
|
||
"Expires": "0"
|
||
}
|
||
)
|
||
|
||
@app.post("/api/snapshot")
|
||
def api_snapshot(
|
||
current_user: str = Depends(get_current_user),
|
||
container_id: str = Body(..., embed=True),
|
||
service: str = Body("", embed=True),
|
||
content: str = Body("", embed=True),
|
||
):
|
||
"""Сохранить снимок логов"""
|
||
# Save posted content as a snapshot file
|
||
safe_service = re.sub(r"[^a-zA-Z0-9_.-]+", "_", service or container_id[:12])
|
||
ts = os.getenv("TZ_TS") or ""
|
||
from datetime import datetime
|
||
stamp = datetime.now().strftime("%Y%m%d-%H%M%S")
|
||
fname = f"{safe_service}-{stamp}.log"
|
||
fpath = os.path.join(SNAP_DIR, fname)
|
||
with open(fpath, "w", encoding="utf-8") as f:
|
||
f.write(content)
|
||
url = f"/snapshots/{fname}"
|
||
return {"file": fname, "url": url}
|
||
|
||
# WebSocket: verify token (?token=base64(user:pass))
|
||
@app.websocket("/ws/logs/{container_id}")
|
||
async def ws_logs(ws: WebSocket, container_id: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None,
|
||
service: Optional[str] = None, project: Optional[str] = None):
|
||
"""WebSocket для получения логов контейнера"""
|
||
|
||
# Принимаем соединение
|
||
await ws.accept()
|
||
|
||
# Проверяем токен
|
||
if not token:
|
||
await ws.send_text("ERROR: token required")
|
||
await ws.close()
|
||
return
|
||
|
||
username = verify_token(token)
|
||
if not username:
|
||
await ws.send_text("ERROR: invalid token")
|
||
await ws.close()
|
||
return
|
||
|
||
try:
|
||
# Простой поиск контейнера по ID
|
||
container = None
|
||
try:
|
||
for c in docker_client.containers.list(all=True):
|
||
if c.id.startswith(container_id):
|
||
container = c
|
||
break
|
||
except Exception as e:
|
||
await ws.send_text(f"ERROR: cannot list containers - {e}")
|
||
return
|
||
|
||
if container is None:
|
||
await ws.send_text("ERROR: container not found")
|
||
return
|
||
|
||
# Отправляем начальное сообщение
|
||
await ws.send_text(f"Connected to container: {container.name}")
|
||
|
||
# Получаем логи (только последние строки, без follow)
|
||
try:
|
||
print(f"Getting logs for container {container.name} (ID: {container.id[:12]})")
|
||
logs = container.logs(tail=tail).decode(errors="ignore")
|
||
if logs:
|
||
await ws.send_text(logs)
|
||
else:
|
||
await ws.send_text("No logs available")
|
||
except Exception as e:
|
||
print(f"Error getting logs for {container.name}: {e}")
|
||
await ws.send_text(f"ERROR getting logs: {e}")
|
||
|
||
# Простое WebSocket соединение - только отправляем логи один раз
|
||
print(f"WebSocket connection established for {container.name}")
|
||
|
||
except WebSocketDisconnect:
|
||
print(f"WebSocket client disconnected for container {container.name}")
|
||
except Exception as e:
|
||
print(f"WebSocket error for {container.name}: {e}")
|
||
try:
|
||
await ws.send_text(f"ERROR: {e}")
|
||
except:
|
||
pass
|
||
finally:
|
||
try:
|
||
print(f"Closing WebSocket connection for container {container.name}")
|
||
await ws.close()
|
||
except:
|
||
pass
|
||
|
||
# WebSocket: fan-in by compose service (aggregate all replicas), prefixing with short container id
|
||
@app.websocket("/ws/fan/{service_name}")
|
||
async def ws_fan(ws: WebSocket, service_name: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None,
|
||
project: Optional[str] = None):
|
||
await ws.accept()
|
||
|
||
# Проверяем токен
|
||
if not token:
|
||
await ws.send_text("ERROR: token required")
|
||
await ws.close()
|
||
return
|
||
|
||
username = verify_token(token)
|
||
if not username:
|
||
await ws.send_text("ERROR: invalid token")
|
||
await ws.close()
|
||
return
|
||
|
||
# Track active streaming tasks by container id
|
||
active = {}
|
||
|
||
def list_by_service(service_name: str, project_name: Optional[str] = None):
|
||
found = []
|
||
for c in docker_client.containers.list(all=True):
|
||
lbl = c.labels or {}
|
||
if lbl.get("com.docker.compose.service") == service_name and (project_name is None or lbl.get("com.docker.compose.project")==project_name):
|
||
found.append(c)
|
||
# sort by Created asc so first lines look ordered-ish
|
||
try:
|
||
found.sort(key=lambda x: x.attrs.get("Created",""))
|
||
except Exception:
|
||
pass
|
||
return found
|
||
|
||
async def stream_container(cont):
|
||
short = cont.id[:8]
|
||
first_tail = tail
|
||
while True:
|
||
try:
|
||
use_tail = first_tail
|
||
first_tail = 0
|
||
stream = cont.logs(stream=True, follow=True, tail=(use_tail if use_tail>0 else "all"))
|
||
for chunk in stream:
|
||
if chunk is None:
|
||
break
|
||
try:
|
||
await ws.send_text(f"[{short}] " + chunk.decode(errors="ignore"))
|
||
except RuntimeError:
|
||
stream.close(); return
|
||
stream.close()
|
||
# container stopped -> wait and try to find same id again; if gone, exit loop and outer watcher will reassign
|
||
await asyncio.sleep(1.0)
|
||
except WebSocketDisconnect:
|
||
return
|
||
except Exception:
|
||
await asyncio.sleep(1.0)
|
||
continue
|
||
|
||
async def watcher():
|
||
# Periodically reconcile set of containers
|
||
try:
|
||
while True:
|
||
desired = {c.id: c for c in list_by_service(service_name, project)}
|
||
# start missing
|
||
for cid, cont in desired.items():
|
||
if cid not in active:
|
||
task = asyncio.create_task(stream_container(cont))
|
||
active[cid] = task
|
||
# cancel removed
|
||
for cid in list(active.keys()):
|
||
if cid not in desired:
|
||
task = active.pop(cid)
|
||
task.cancel()
|
||
await asyncio.sleep(2.0)
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
watch_task = asyncio.create_task(watcher())
|
||
try:
|
||
# Keep ws open until disconnected; the tasks stream data
|
||
while True:
|
||
await asyncio.sleep(1.0)
|
||
except WebSocketDisconnect:
|
||
pass
|
||
finally:
|
||
watch_task.cancel()
|
||
for t in active.values():
|
||
t.cancel()
|
||
try:
|
||
await ws.close()
|
||
except Exception:
|
||
pass
|
||
|
||
# WebSocket: fan-in for multiple compose services (comma-separated), optional project filter.
|
||
@app.websocket("/ws/fan_group")
|
||
async def ws_fan_group(ws: WebSocket, services: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None,
|
||
project: Optional[str] = None):
|
||
await ws.accept()
|
||
|
||
# Проверяем токен
|
||
if not token:
|
||
await ws.send_text("ERROR: token required")
|
||
await ws.close()
|
||
return
|
||
|
||
username = verify_token(token)
|
||
if not username:
|
||
await ws.send_text("ERROR: invalid token")
|
||
await ws.close()
|
||
return
|
||
|
||
svc_set = {s.strip() for s in services.split(",") if s.strip()}
|
||
if not svc_set:
|
||
await ws.send_text("ERROR: no services provided")
|
||
await ws.close(); return
|
||
|
||
active = {}
|
||
|
||
def list_by_services(names, project_name: Optional[str] = None):
|
||
res = []
|
||
for c in docker_client.containers.list(all=True):
|
||
lbl = c.labels or {}
|
||
if lbl.get("com.docker.compose.service") in names and (project_name is None or lbl.get("com.docker.compose.project")==project_name):
|
||
res.append(c)
|
||
try:
|
||
res.sort(key=lambda x: x.attrs.get("Created",""))
|
||
except Exception:
|
||
pass
|
||
return res
|
||
|
||
async def stream_container(cont):
|
||
short = cont.id[:8]
|
||
svc = (cont.labels or {}).get("com.docker.compose.service","")
|
||
first_tail = tail
|
||
while True:
|
||
try:
|
||
use_tail = first_tail
|
||
first_tail = 0
|
||
stream = cont.logs(stream=True, follow=True, tail=(use_tail if use_tail>0 else "all"))
|
||
for chunk in stream:
|
||
if chunk is None:
|
||
break
|
||
line = chunk.decode(errors="ignore")
|
||
try:
|
||
await ws.send_text(f"[{short} {svc}] " + line)
|
||
except RuntimeError:
|
||
stream.close(); return
|
||
stream.close()
|
||
await asyncio.sleep(1.0)
|
||
except WebSocketDisconnect:
|
||
return
|
||
except Exception:
|
||
await asyncio.sleep(1.0)
|
||
continue
|
||
|
||
async def watcher():
|
||
try:
|
||
while True:
|
||
desired = {c.id: c for c in list_by_services(svc_set, project)}
|
||
for cid, cont in desired.items():
|
||
if cid not in active:
|
||
task = asyncio.create_task(stream_container(cont))
|
||
active[cid] = task
|
||
for cid in list(active.keys()):
|
||
if cid not in desired:
|
||
task = active.pop(cid)
|
||
task.cancel()
|
||
await asyncio.sleep(2.0)
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
watch_task = asyncio.create_task(watcher())
|
||
try:
|
||
while True:
|
||
await asyncio.sleep(1.0)
|
||
except WebSocketDisconnect:
|
||
pass
|
||
finally:
|
||
watch_task.cancel()
|
||
for t in active.values():
|
||
t.cancel()
|
||
try:
|
||
await ws.close()
|
||
except Exception:
|
||
pass
|
||
|
||
if __name__ == "__main__":
|
||
import uvicorn
|
||
print(f"LogBoard+ http://0.0.0.0:{APP_PORT}")
|
||
uvicorn.run(app, host="0.0.0.0", port=APP_PORT)
|