logboard/app.py
Сергей Антропов e80f665470 feat: major improvements and fixes
- Fixed Docker permissions issue by running as root user
- Added DEBUG_MODE support with conditional Swagger docs and auto-reload
- Created start.sh script for conditional Uvicorn execution
- Removed verbose debug logs from WebSocket status endpoint
- Added comprehensive screenshots to documentation
- Enhanced help tooltip with full-screen modal design
- Added theme switcher to error page
- Updated documentation with local development and Docker benefits
- Fixed WebSocket status display issues
- Improved hotkey functionality and documentation
- Added detailed project descriptions for local dev and Docker users

Technical improvements:
- Dockerfile: removed appuser switch, simplified permissions
- docker-compose.yml: kept user: 0:0 for Docker socket access
- app.py: removed debug prints, added DEBUG_MODE support
- templates: enhanced UI/UX with better tooltips and themes
- docs: comprehensive updates with new screenshots and descriptions
2025-08-19 13:01:32 +03:00

1229 lines
51 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
LogBoard+ - Веб-панель для просмотра логов микросервисов
Автор: Сергей Антропов
Сайт: https://devops.org.ru
"""
import asyncio
import base64
import json
import os
import re
import secrets
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Union
from pathlib import Path
import docker
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query, Depends, HTTPException, status, Body, Request, Response
from fastapi.exceptions import RequestValidationError
from starlette.exceptions import HTTPException as StarletteHTTPException
from fastapi.responses import HTMLResponse, JSONResponse, PlainTextResponse, RedirectResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel
import jwt
from passlib.context import CryptContext
# Настройки приложения
APP_PORT = int(os.getenv("LOGBOARD_PORT", "9001"))
DEFAULT_TAIL = int(os.getenv("LOGBOARD_TAIL", "500"))
DEFAULT_PROJECT = os.getenv("COMPOSE_PROJECT_NAME")
DEFAULT_PROJECTS = os.getenv("LOGBOARD_PROJECTS")
SKIP_UNHEALTHY = os.getenv("LOGBOARD_SKIP_UNHEALTHY", "true").lower() == "true"
CONTAINER_LIST_TIMEOUT = int(os.getenv("LOGBOARD_CONTAINER_LIST_TIMEOUT", "10"))
CONTAINER_INFO_TIMEOUT = int(os.getenv("LOGBOARD_CONTAINER_INFO_TIMEOUT", "3"))
HEALTH_CHECK_TIMEOUT = int(os.getenv("LOGBOARD_HEALTH_CHECK_TIMEOUT", "2"))
# Настройки безопасности
SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key-here-change-in-production")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("SESSION_TIMEOUT", "3600")) // 60 # 1 час по умолчанию
# Настройки пользователей
ADMIN_USERNAME = os.getenv("LOGBOARD_USER", "admin")
ADMIN_PASSWORD = os.getenv("LOGBOARD_PASS", "admin")
# Настройки AJAX обновления
AJAX_UPDATE_INTERVAL = int(os.getenv("LOGBOARD_AJAX_UPDATE_INTERVAL", "2000"))
# Настройки режима отладки
DEBUG_MODE = os.getenv("DEBUG_MODE", "false").lower() == "true"
# Инициализация FastAPI
app = FastAPI(
title="LogBoard+",
description="Веб-панель для просмотра логов микросервисов",
version="1.0.0",
docs_url="/docs" if DEBUG_MODE else None,
redoc_url="/redoc" if DEBUG_MODE else None
)
# Обработчики исключений
@app.exception_handler(404)
async def not_found_handler(request: Request, exc: HTTPException):
"""Обработчик ошибки 404 - Страница не найдена"""
return templates.TemplateResponse("error.html", {
"request": request,
"error_code": 404,
"error_title": "Страница не найдена",
"error_message": "Запрашиваемая страница не существует или была перемещена."
}, status_code=404)
@app.exception_handler(401)
async def unauthorized_handler(request: Request, exc: HTTPException):
"""Обработчик ошибки 401 - Не авторизован"""
return templates.TemplateResponse("error.html", {
"request": request,
"error_code": 401,
"error_title": "Требуется авторизация",
"error_message": "Для доступа к этой странице необходимо войти в систему."
}, status_code=401)
@app.exception_handler(403)
async def forbidden_handler(request: Request, exc: HTTPException):
"""Обработчик ошибки 403 - Доступ запрещен"""
return templates.TemplateResponse("error.html", {
"request": request,
"error_code": 403,
"error_title": "Доступ запрещен",
"error_message": "У вас нет прав для доступа к этой странице."
}, status_code=403)
@app.exception_handler(500)
async def internal_server_error_handler(request: Request, exc: HTTPException):
"""Обработчик ошибки 500 - Внутренняя ошибка сервера"""
return templates.TemplateResponse("error.html", {
"request": request,
"error_code": 500,
"error_title": "Внутренняя ошибка сервера",
"error_message": "Произошла непредвиденная ошибка. Попробуйте обновить страницу или обратитесь к администратору."
}, status_code=500)
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
"""Общий обработчик HTTP исключений"""
# Для API маршрутов возвращаем JSON ответ
if request.url.path.startswith("/api/"):
if exc.status_code == 401:
return JSONResponse(
status_code=401,
content={
"error": "unauthorized",
"message": "Требуется авторизация",
"details": "Для доступа к этому API необходимо войти в систему."
},
headers={"WWW-Authenticate": "Bearer"}
)
elif exc.status_code == 403:
return JSONResponse(
status_code=403,
content={
"error": "forbidden",
"message": "Доступ запрещен",
"details": "У вас нет прав для доступа к этому API."
}
)
else:
return JSONResponse(
status_code=exc.status_code,
content={
"error": f"http_{exc.status_code}",
"message": exc.detail or "Произошла ошибка при обработке запроса.",
"details": f"URL: {request.url.path}"
}
)
# Для обычных страниц возвращаем HTML
if exc.status_code == 401:
title = "Требуется авторизация"
message = "Для доступа к этой странице необходимо войти в систему."
elif exc.status_code == 403:
title = "Доступ запрещен"
message = "У вас нет прав для доступа к этой странице."
elif exc.status_code == 404:
title = "Страница не найдена"
message = "Запрашиваемая страница не существует или была перемещена."
elif exc.status_code == 500:
title = "Внутренняя ошибка сервера"
message = "Произошла непредвиденная ошибка. Попробуйте обновить страницу или обратитесь к администратору."
else:
title = f"Ошибка {exc.status_code}"
message = exc.detail or "Произошла ошибка при обработке запроса."
return templates.TemplateResponse("error.html", {
"request": request,
"error_code": exc.status_code,
"error_title": title,
"error_message": message
}, status_code=exc.status_code)
@app.exception_handler(StarletteHTTPException)
async def starlette_http_exception_handler(request: Request, exc: StarletteHTTPException):
"""Обработчик Starlette HTTP исключений (включая ошибки безопасности)"""
# Для API маршрутов возвращаем JSON ответ
if request.url.path.startswith("/api/"):
if exc.status_code == 401:
return JSONResponse(
status_code=401,
content={
"error": "unauthorized",
"message": "Требуется авторизация",
"details": "Для доступа к этому API необходимо войти в систему."
},
headers={"WWW-Authenticate": "Bearer"}
)
elif exc.status_code == 403:
return JSONResponse(
status_code=403,
content={
"error": "forbidden",
"message": "Доступ запрещен",
"details": "У вас нет прав для доступа к этому API."
}
)
else:
return JSONResponse(
status_code=exc.status_code,
content={
"error": f"http_{exc.status_code}",
"message": exc.detail or "Произошла ошибка при обработке запроса.",
"details": f"URL: {request.url.path}"
}
)
# Для обычных страниц возвращаем HTML
if exc.status_code == 401:
title = "Требуется авторизация"
message = "Для доступа к этой странице необходимо войти в систему."
elif exc.status_code == 403:
title = "Доступ запрещен"
message = "У вас нет прав для доступа к этой странице."
elif exc.status_code == 404:
title = "Страница не найдена"
message = "Запрашиваемая страница не существует или была перемещена."
elif exc.status_code == 500:
title = "Внутренняя ошибка сервера"
message = "Произошла непредвиденная ошибка. Попробуйте обновить страницу или обратитесь к администратору."
else:
title = f"Ошибка {exc.status_code}"
message = exc.detail or "Произошла ошибка при обработке запроса."
return templates.TemplateResponse("error.html", {
"request": request,
"error_code": exc.status_code,
"error_title": title,
"error_message": message
}, status_code=exc.status_code)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
"""Общий обработчик всех исключений"""
import traceback
# Логируем ошибку
print(f"❌ Необработанная ошибка: {exc}")
print(f"❌ URL: {request.url.path}")
print(f"❌ Traceback: {traceback.format_exc()}")
return templates.TemplateResponse("error.html", {
"request": request,
"error_code": 500,
"error_title": "Внутренняя ошибка сервера",
"error_message": "Произошла непредвиденная ошибка. Попробуйте обновить страницу или обратитесь к администратору."
}, status_code=500)
# Инициализация шаблонов
templates = Jinja2Templates(directory="templates")
# Инициализация безопасности
security = HTTPBearer()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Инициализация Docker клиента
docker_client = docker.from_env()
# serve snapshots directory (downloadable files)
SNAP_DIR = os.getenv("LOGBOARD_SNAPSHOT_DIR", "./snapshots")
os.makedirs(SNAP_DIR, exist_ok=True)
app.mount("/snapshots", StaticFiles(directory=SNAP_DIR), name="snapshots")
# Модели данных
class UserLogin(BaseModel):
username: str
password: str
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
# Функции для работы с паролями
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Проверяет пароль"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Хеширует пароль"""
return pwd_context.hash(password)
# Функции для работы с JWT токенами
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
"""Создает JWT токен"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(token: str) -> Optional[str]:
"""Проверяет JWT токен и возвращает имя пользователя"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
return None
return username
except jwt.PyJWTError:
return None
# Функция для проверки аутентификации
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str:
"""Получает текущего пользователя из токена"""
token = credentials.credentials
username = verify_token(token)
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Недействительный токен. Требуется авторизация.",
headers={"WWW-Authenticate": "Bearer"},
)
return username
# Функция для проверки пользователя
def authenticate_user(username: str, password: str) -> bool:
"""Аутентифицирует пользователя"""
if username == ADMIN_USERNAME:
# Для простоты используем прямое сравнение паролей
# В продакшене рекомендуется использовать хешированные пароли
return password == ADMIN_PASSWORD
return False
# ---------- DOCKER HELPERS ----------
def load_excluded_containers() -> List[str]:
"""
Загружает список исключенных контейнеров из JSON файла
Автор: Сергей Антропов
Сайт: https://devops.org.ru
"""
try:
with open("excluded_containers.json", "r", encoding="utf-8") as f:
data = json.load(f)
return data.get("excluded_containers", [])
except FileNotFoundError:
print("⚠️ Файл excluded_containers.json не найден, используем пустой список")
return []
except json.JSONDecodeError as e:
print(f"❌ Ошибка парсинга excluded_containers.json: {e}")
return []
except Exception as e:
print(f"❌ Ошибка загрузки excluded_containers.json: {e}")
return []
def save_excluded_containers(containers: List[str]) -> bool:
"""
Сохраняет список исключенных контейнеров в JSON файл
Автор: Сергей Антропов
Сайт: https://devops.org.ru
"""
try:
data = {
"excluded_containers": containers,
"description": "Список контейнеров, которые генерируют слишком много логов и исключаются из отображения"
}
with open("excluded_containers.json", "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
return True
except Exception as e:
print(f"❌ Ошибка сохранения excluded_containers.json: {e}")
return False
def get_all_projects() -> List[str]:
"""
Получает список всех проектов Docker Compose с учетом исключенных контейнеров
Автор: Сергей Антропов
Сайт: https://devops.org.ru
"""
projects = set()
excluded_containers = load_excluded_containers()
try:
containers = docker_client.containers.list(all=True)
# Словарь для подсчета контейнеров по проектам
project_containers = {}
standalone_containers = []
for c in containers:
try:
# Пропускаем исключенные контейнеры
if c.name in excluded_containers:
continue
labels = c.labels or {}
project = labels.get("com.docker.compose.project")
if project:
if project not in project_containers:
project_containers[project] = 0
project_containers[project] += 1
else:
standalone_containers.append(c.name)
except Exception:
continue
# Добавляем проекты, у которых есть хотя бы один неисключенный контейнер
for project, count in project_containers.items():
if count > 0:
projects.add(project)
# Добавляем standalone, если есть неисключенные контейнеры без проекта
if standalone_containers:
projects.add("standalone")
except Exception as e:
print(f"❌ Ошибка получения списка проектов: {e}")
return []
result = sorted(list(projects))
print(f"📋 Доступные проекты (с учетом исключенных контейнеров): {result}")
return result
def list_containers(projects: Optional[List[str]] = None, include_stopped: bool = False) -> List[Dict]:
"""
Получает список контейнеров с поддержкой множественных проектов
Автор: Сергей Антропов
Сайт: https://devops.org.ru
"""
# Загружаем список исключенных контейнеров из JSON файла
excluded_containers = load_excluded_containers()
print(f"🚫 Список исключенных контейнеров: {excluded_containers}")
items = []
excluded_count = 0
try:
# Получаем список контейнеров с базовой обработкой ошибок
containers = docker_client.containers.list(all=include_stopped)
for c in containers:
try:
# Базовая информация о контейнере (без health check)
basic_info = {
"id": c.id[:12],
"name": c.name,
"status": c.status,
"image": "unknown",
"service": c.name,
"project": None,
"health": None,
"ports": [],
"url": None,
}
# Безопасно получаем метки
try:
labels = c.labels or {}
basic_info["project"] = labels.get("com.docker.compose.project")
basic_info["service"] = labels.get("com.docker.compose.service") or c.name
except Exception:
pass # Используем значения по умолчанию
# Безопасно получаем информацию об образе
try:
if c.image and c.image.tags:
basic_info["image"] = c.image.tags[0]
elif c.image:
basic_info["image"] = c.image.short_id
except Exception:
pass # Оставляем "unknown"
# Безопасно получаем информацию о портах
try:
ports = c.ports or {}
if ports:
basic_info["ports"] = list(ports.keys())
# Пытаемся найти HTTP/HTTPS порт для создания URL
for port_mapping in ports.values():
if port_mapping:
for mapping in port_mapping:
if isinstance(mapping, dict) and mapping.get("HostPort"):
host_port = mapping["HostPort"]
# Проверяем, что это HTTP порт (80, 443, 8080, 3000, etc.)
host_port_int = int(host_port)
if (host_port_int in [80, 443] or
(1 <= host_port_int <= 7999) or
(8000 <= host_port_int <= 65535)):
protocol = "https" if host_port == "443" else "http"
basic_info["url"] = f"{protocol}://localhost:{host_port}"
basic_info["host_port"] = host_port
break
if basic_info["url"]:
break
except Exception:
pass # Оставляем пустые значения
# Фильтрация по проектам
if projects:
# Если проект не указан, считаем его standalone
container_project = basic_info["project"] or "standalone"
if container_project not in projects:
continue
# Фильтрация исключенных контейнеров
if basic_info["name"] in excluded_containers:
excluded_count += 1
print(f"⚠️ Пропускаем исключенный контейнер: {basic_info['name']}")
continue
# Добавляем контейнер в список
items.append(basic_info)
except Exception as e:
# Пропускаем контейнеры с критическими ошибками
print(f"⚠️ Пропускаем проблемный контейнер {c.name if hasattr(c, 'name') else 'unknown'} (ID: {c.id[:12]}): {e}")
continue
except Exception as e:
print(f"❌ Ошибка получения списка контейнеров: {e}")
return []
# Сортируем по проекту, сервису и имени
items.sort(key=lambda x: (x.get("project") or "", x.get("service") or "", x.get("name") or ""))
# Подсчитываем статистику по проектам
project_stats = {}
for item in items:
project = item.get("project") or "standalone"
if project not in project_stats:
project_stats[project] = {"visible": 0, "excluded": 0}
project_stats[project]["visible"] += 1
# Подсчитываем исключенные контейнеры по проектам
for c in containers:
try:
if c.name in excluded_containers:
labels = c.labels or {}
project = labels.get("com.docker.compose.project") or "standalone"
if project not in project_stats:
project_stats[project] = {"visible": 0, "excluded": 0}
project_stats[project]["excluded"] += 1
except Exception:
continue
print(f"📊 Статистика: найдено {len(items)} контейнеров, исключено {excluded_count} контейнеров")
for project, stats in project_stats.items():
print(f" 📦 {project}: {stats['visible']} видимых, {stats['excluded']} исключенных")
return items
# ---------- HTML ----------
INDEX_PATH = os.getenv("LOGBOARD_INDEX_HTML", "./templates/index.html")
def load_index_html() -> str:
"""Загружает HTML шаблон главной страницы"""
with open(INDEX_PATH, "r", encoding="utf-8") as f:
return f.read()
def load_login_html() -> str:
"""Загружает HTML шаблон страницы входа"""
login_path = "./templates/login.html"
with open(login_path, "r", encoding="utf-8") as f:
return f.read()
# ---------- ROUTES ----------
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
"""Главная страница приложения"""
# Проверяем наличие токена в cookie
access_token = request.cookies.get("access_token")
if not access_token:
return RedirectResponse(url="/login")
# Проверяем валидность токена
username = verify_token(access_token)
if not username:
return RedirectResponse(url="/login")
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/login", response_class=HTMLResponse)
async def login_page(request: Request):
"""Страница входа"""
return templates.TemplateResponse("login.html", {"request": request})
@app.post("/api/auth/login", response_model=Token)
async def login(user_data: UserLogin, response: Response):
"""API для входа в систему"""
if authenticate_user(user_data.username, user_data.password):
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user_data.username}, expires_delta=access_token_expires
)
# Устанавливаем cookie с токеном
response.set_cookie(
key="access_token",
value=access_token,
httponly=True,
secure=False, # Установите True для HTTPS
samesite="lax",
max_age=ACCESS_TOKEN_EXPIRE_MINUTES * 60
)
return {"access_token": access_token, "token_type": "bearer"}
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверное имя пользователя или пароль. Проверьте учетные данные и попробуйте снова.",
headers={"WWW-Authenticate": "Bearer"},
)
@app.post("/api/auth/logout")
async def logout(response: Response):
"""API для выхода из системы"""
response.delete_cookie(key="access_token")
return {"message": "Успешный выход из системы"}
@app.get("/api/auth/me")
async def get_current_user_info(current_user: str = Depends(get_current_user)):
"""Получить информацию о текущем пользователе"""
return {"username": current_user}
@app.get("/api/settings")
async def get_settings(current_user: str = Depends(get_current_user)):
"""Получить настройки приложения"""
return {
"ajax_update_interval": AJAX_UPDATE_INTERVAL,
"default_tail": DEFAULT_TAIL,
"skip_unhealthy": SKIP_UNHEALTHY
}
@app.get("/healthz", response_class=PlainTextResponse)
def healthz():
"""Health check endpoint"""
return "ok"
# Маршруты для тестирования страниц ошибок (только в режиме разработки)
@app.get("/test/error/404")
async def test_404_error():
"""Тест страницы ошибки 404"""
raise HTTPException(status_code=404, detail="Тестовая ошибка 404")
@app.get("/test/error/401")
async def test_401_error():
"""Тест страницы ошибки 401"""
raise HTTPException(status_code=401, detail="Тестовая ошибка 401")
@app.get("/test/error/403")
async def test_403_error():
"""Тест страницы ошибки 403"""
raise HTTPException(status_code=403, detail="Тестовая ошибка 403")
@app.get("/test/error/500")
async def test_500_error():
"""Тест страницы ошибки 500"""
raise HTTPException(status_code=500, detail="Тестовая ошибка 500")
@app.get("/test/error/general")
async def test_general_error():
"""Тест общей ошибки"""
raise Exception("Тестовая общая ошибка")
@app.get("/api/logs/stats/{container_id}")
def api_logs_stats(container_id: str, current_user: str = Depends(get_current_user)):
"""Получить статистику логов контейнера"""
try:
# Ищем контейнер
container = None
for c in docker_client.containers.list(all=True):
if c.id.startswith(container_id):
container = c
break
if container is None:
return JSONResponse({"error": "Container not found"}, status_code=404)
# Получаем логи
logs = container.logs(tail=1000).decode(errors="ignore")
# Подсчитываем статистику
stats = {"debug": 0, "info": 0, "warn": 0, "error": 0}
for line in logs.split('\n'):
if not line.strip():
continue
line_lower = line.lower()
if 'level=debug' in line_lower or 'debug' in line_lower:
stats["debug"] += 1
elif 'level=info' in line_lower or 'info' in line_lower:
stats["info"] += 1
elif 'level=warning' in line_lower or 'level=warn' in line_lower or 'warning' in line_lower or 'warn' in line_lower:
stats["warn"] += 1
elif 'level=error' in line_lower or 'error' in line_lower:
stats["error"] += 1
return JSONResponse(
content=stats,
headers={
"Cache-Control": "no-cache, no-store, must-revalidate",
"Pragma": "no-cache",
"Expires": "0"
}
)
except Exception as e:
print(f"Error getting log stats for {container_id}: {e}")
return JSONResponse({"error": str(e)}, status_code=500)
@app.get("/api/logs/{container_id}")
def api_logs(
container_id: str,
tail: str = Query(str(DEFAULT_TAIL), description="Количество последних строк логов или 'all' для всех логов"),
since: Optional[str] = Query(None, description="Время начала в формате ISO или относительное время (например, '10m', '1h')"),
current_user: str = Depends(get_current_user)
):
"""
Получить логи контейнера через AJAX
Args:
container_id: ID контейнера
tail: Количество последних строк или 'all' для всех логов (по умолчанию 500)
since: Время начала для фильтрации логов
Returns:
JSON с логами и метаданными
"""
try:
# Ищем контейнер
container = None
for c in docker_client.containers.list(all=True):
if c.id.startswith(container_id):
container = c
break
if container is None:
return JSONResponse({"error": "Container not found"}, status_code=404)
# Формируем параметры для получения логов
log_params = {
'timestamps': True
}
# Обрабатываем параметр tail
if tail.lower() == 'all':
# Для всех логов не указываем параметр tail
pass
else:
try:
tail_lines = int(tail)
log_params['tail'] = tail_lines
except ValueError:
# Если не удалось преобразовать в число, используем значение по умолчанию
log_params['tail'] = DEFAULT_TAIL
# Добавляем фильтр по времени, если указан (используем Unix timestamp секундной точности)
if since:
def _parse_since(value: str) -> Optional[int]:
try:
# Числовое значение (unix timestamp)
if re.fullmatch(r"\d+", value or ""):
return int(value)
# ISO 8601 с Z
if value.endswith('Z'):
dt = datetime.fromisoformat(value.replace('Z', '+00:00'))
return int(dt.timestamp())
# Пытаемся распарсить как ISO без Z
try:
dt2 = datetime.fromisoformat(value)
if dt2.tzinfo is None:
# Считаем UTC, если таймзона не указана
from datetime import timezone
dt2 = dt2.replace(tzinfo=timezone.utc)
return int(dt2.timestamp())
except Exception:
pass
except Exception:
return None
return None
parsed_since = _parse_since(since)
if parsed_since is not None:
log_params['since'] = parsed_since
# Получаем логи
logs = container.logs(**log_params).decode(errors="ignore")
# Разбиваем на строки и обрабатываем
log_lines = []
for line in logs.split('\n'):
if line.strip():
# Извлекаем временную метку и сообщение
timestamp_match = re.match(r'^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z)\s+(.+)$', line)
if timestamp_match:
timestamp, message = timestamp_match.groups()
log_lines.append({
'timestamp': timestamp,
'message': message,
'raw': line
})
else:
# Если временная метка не найдена, используем всю строку как сообщение
log_lines.append({
'timestamp': None,
'message': line,
'raw': line
})
# Сервер больше не фильтрует по содержимому (фильтрация по времени делается Docker'ом),
# дополнительная дедупликация выполняется на клиенте с учётом количества строк в ту же секунду
# Получаем информацию о контейнере
container_info = {
'id': container.id,
'name': container.name,
'status': container.status,
'image': container.image.tags[0] if container.image.tags else container.image.id,
'created': container.attrs['Created'],
'state': container.attrs['State']
}
return JSONResponse(
content={
'container': container_info,
'logs': log_lines,
'total_lines': len(log_lines),
'tail': tail,
'since': since,
'timestamp': datetime.now().isoformat()
},
headers={
"Cache-Control": "no-cache, no-store, must-revalidate",
"Pragma": "no-cache",
"Expires": "0"
}
)
except Exception as e:
print(f"Error getting logs for {container_id}: {e}")
return JSONResponse({"error": str(e)}, status_code=500)
@app.get("/api/excluded-containers")
def api_get_excluded_containers(current_user: str = Depends(get_current_user)):
"""Получить список исключенных контейнеров"""
return JSONResponse(
content={"excluded_containers": load_excluded_containers()},
headers={
"Cache-Control": "no-cache, no-store, must-revalidate",
"Pragma": "no-cache",
"Expires": "0"
}
)
@app.post("/api/excluded-containers")
def api_update_excluded_containers(
containers: List[str] = Body(...),
current_user: str = Depends(get_current_user)
):
"""Обновить список исключенных контейнеров"""
success = save_excluded_containers(containers)
if success:
return JSONResponse(
content={"status": "success", "message": "Список исключенных контейнеров обновлен"},
headers={
"Cache-Control": "no-cache, no-store, must-revalidate",
"Pragma": "no-cache",
"Expires": "0"
}
)
else:
raise HTTPException(
status_code=500,
detail="Ошибка сохранения списка исключенных контейнеров. Попробуйте еще раз или обратитесь к администратору."
)
@app.get("/api/projects")
def api_projects(current_user: str = Depends(get_current_user)):
"""Получить список всех проектов Docker Compose"""
return JSONResponse(
content=get_all_projects(),
headers={
"Cache-Control": "no-cache, no-store, must-revalidate",
"Pragma": "no-cache",
"Expires": "0"
}
)
@app.get("/api/services")
def api_services(
projects: Optional[str] = Query(None),
include_stopped: bool = Query(False),
current_user: str = Depends(get_current_user)
):
"""Получить список контейнеров с поддержкой множественных проектов"""
project_list = None
if projects:
project_list = [p.strip() for p in projects.split(",") if p.strip()]
elif DEFAULT_PROJECTS and DEFAULT_PROJECTS.strip():
project_list = [p.strip() for p in DEFAULT_PROJECTS.split(",") if p.strip()]
elif DEFAULT_PROJECT and DEFAULT_PROJECT.strip():
project_list = [DEFAULT_PROJECT]
# Если ни одна переменная не указана или пустая, показываем все контейнеры (project_list остается None)
return JSONResponse(
content=list_containers(projects=project_list, include_stopped=include_stopped),
headers={
"Cache-Control": "no-cache, no-store, must-revalidate",
"Pragma": "no-cache",
"Expires": "0"
}
)
@app.post("/api/snapshot")
def api_snapshot(
current_user: str = Depends(get_current_user),
container_id: str = Body(..., embed=True),
service: str = Body("", embed=True),
content: str = Body("", embed=True),
):
"""Сохранить снимок логов"""
# Save posted content as a snapshot file
safe_service = re.sub(r"[^a-zA-Z0-9_.-]+", "_", service or container_id[:12])
ts = os.getenv("TZ_TS") or ""
from datetime import datetime
stamp = datetime.now().strftime("%Y%m%d-%H%M%S")
fname = f"{safe_service}-{stamp}.log"
fpath = os.path.join(SNAP_DIR, fname)
with open(fpath, "w", encoding="utf-8") as f:
f.write(content)
url = f"/snapshots/{fname}"
return {"file": fname, "url": url}
@app.get("/api/websocket/status")
def api_websocket_status(current_user: str = Depends(get_current_user)):
"""Получить статус WebSocket соединений"""
try:
# Проверяем доступность Docker
docker_client.ping()
# Получаем список активных контейнеров
containers = docker_client.containers.list()
# Проверяем, есть ли контейнеры для подключения
if containers:
return {
"status": "available",
"message": "WebSocket соединения доступны",
"containers_count": len(containers),
"timestamp": datetime.now().isoformat()
}
else:
return {
"status": "no_containers",
"message": "Нет доступных контейнеров для подключения",
"containers_count": 0,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return {
"status": "error",
"message": f"Ошибка проверки WebSocket статуса: {str(e)}",
"containers_count": 0,
"timestamp": datetime.now().isoformat()
}
# WebSocket: verify token (?token=base64(user:pass))
@app.websocket("/ws/logs/{container_id}")
async def ws_logs(ws: WebSocket, container_id: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None,
service: Optional[str] = None, project: Optional[str] = None):
"""WebSocket для получения логов контейнера"""
# Принимаем соединение
await ws.accept()
# Проверяем токен
if not token:
await ws.send_text("ERROR: token required")
await ws.close()
return
username = verify_token(token)
if not username:
await ws.send_text("ERROR: invalid token")
await ws.close()
return
try:
# Простой поиск контейнера по ID
container = None
try:
for c in docker_client.containers.list(all=True):
if c.id.startswith(container_id):
container = c
break
except Exception as e:
await ws.send_text(f"ERROR: cannot list containers - {e}")
return
if container is None:
await ws.send_text("ERROR: container not found")
return
# Отправляем начальное сообщение
await ws.send_text(f"Connected to container: {container.name}")
# Получаем логи (только последние строки, без follow)
try:
print(f"Getting logs for container {container.name} (ID: {container.id[:12]})")
logs = container.logs(tail=tail).decode(errors="ignore")
if logs:
await ws.send_text(logs)
else:
await ws.send_text("No logs available")
except Exception as e:
print(f"Error getting logs for {container.name}: {e}")
await ws.send_text(f"ERROR getting logs: {e}")
# Простое WebSocket соединение - только отправляем логи один раз
print(f"WebSocket connection established for {container.name}")
except WebSocketDisconnect:
print(f"WebSocket client disconnected for container {container.name}")
except Exception as e:
print(f"WebSocket error for {container.name}: {e}")
try:
await ws.send_text(f"ERROR: {e}")
except:
pass
finally:
try:
print(f"Closing WebSocket connection for container {container.name}")
await ws.close()
except:
pass
# WebSocket: fan-in by compose service (aggregate all replicas), prefixing with short container id
@app.websocket("/ws/fan/{service_name}")
async def ws_fan(ws: WebSocket, service_name: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None,
project: Optional[str] = None):
await ws.accept()
# Проверяем токен
if not token:
await ws.send_text("ERROR: token required")
await ws.close()
return
username = verify_token(token)
if not username:
await ws.send_text("ERROR: invalid token")
await ws.close()
return
# Track active streaming tasks by container id
active = {}
def list_by_service(service_name: str, project_name: Optional[str] = None):
found = []
for c in docker_client.containers.list(all=True):
lbl = c.labels or {}
if lbl.get("com.docker.compose.service") == service_name and (project_name is None or lbl.get("com.docker.compose.project")==project_name):
found.append(c)
# sort by Created asc so first lines look ordered-ish
try:
found.sort(key=lambda x: x.attrs.get("Created",""))
except Exception:
pass
return found
async def stream_container(cont):
short = cont.id[:8]
first_tail = tail
while True:
try:
use_tail = first_tail
first_tail = 0
stream = cont.logs(stream=True, follow=True, tail=(use_tail if use_tail>0 else "all"))
for chunk in stream:
if chunk is None:
break
try:
await ws.send_text(f"[{short}] " + chunk.decode(errors="ignore"))
except RuntimeError:
stream.close(); return
stream.close()
# container stopped -> wait and try to find same id again; if gone, exit loop and outer watcher will reassign
await asyncio.sleep(1.0)
except WebSocketDisconnect:
return
except Exception:
await asyncio.sleep(1.0)
continue
async def watcher():
# Periodically reconcile set of containers
try:
while True:
desired = {c.id: c for c in list_by_service(service_name, project)}
# start missing
for cid, cont in desired.items():
if cid not in active:
task = asyncio.create_task(stream_container(cont))
active[cid] = task
# cancel removed
for cid in list(active.keys()):
if cid not in desired:
task = active.pop(cid)
task.cancel()
await asyncio.sleep(2.0)
except asyncio.CancelledError:
pass
watch_task = asyncio.create_task(watcher())
try:
# Keep ws open until disconnected; the tasks stream data
while True:
await asyncio.sleep(1.0)
except WebSocketDisconnect:
pass
finally:
watch_task.cancel()
for t in active.values():
t.cancel()
try:
await ws.close()
except Exception:
pass
# WebSocket: fan-in for multiple compose services (comma-separated), optional project filter.
@app.websocket("/ws/fan_group")
async def ws_fan_group(ws: WebSocket, services: str, tail: int = DEFAULT_TAIL, token: Optional[str] = None,
project: Optional[str] = None):
await ws.accept()
# Проверяем токен
if not token:
await ws.send_text("ERROR: token required")
await ws.close()
return
username = verify_token(token)
if not username:
await ws.send_text("ERROR: invalid token")
await ws.close()
return
svc_set = {s.strip() for s in services.split(",") if s.strip()}
if not svc_set:
await ws.send_text("ERROR: no services provided")
await ws.close(); return
active = {}
def list_by_services(names, project_name: Optional[str] = None):
res = []
for c in docker_client.containers.list(all=True):
lbl = c.labels or {}
if lbl.get("com.docker.compose.service") in names and (project_name is None or lbl.get("com.docker.compose.project")==project_name):
res.append(c)
try:
res.sort(key=lambda x: x.attrs.get("Created",""))
except Exception:
pass
return res
async def stream_container(cont):
short = cont.id[:8]
svc = (cont.labels or {}).get("com.docker.compose.service","")
first_tail = tail
while True:
try:
use_tail = first_tail
first_tail = 0
stream = cont.logs(stream=True, follow=True, tail=(use_tail if use_tail>0 else "all"))
for chunk in stream:
if chunk is None:
break
line = chunk.decode(errors="ignore")
try:
await ws.send_text(f"[{short} {svc}] " + line)
except RuntimeError:
stream.close(); return
stream.close()
await asyncio.sleep(1.0)
except WebSocketDisconnect:
return
except Exception:
await asyncio.sleep(1.0)
continue
async def watcher():
try:
while True:
desired = {c.id: c for c in list_by_services(svc_set, project)}
for cid, cont in desired.items():
if cid not in active:
task = asyncio.create_task(stream_container(cont))
active[cid] = task
for cid in list(active.keys()):
if cid not in desired:
task = active.pop(cid)
task.cancel()
await asyncio.sleep(2.0)
except asyncio.CancelledError:
pass
watch_task = asyncio.create_task(watcher())
try:
while True:
await asyncio.sleep(1.0)
except WebSocketDisconnect:
pass
finally:
watch_task.cancel()
for t in active.values():
t.cancel()
try:
await ws.close()
except Exception:
pass
if __name__ == "__main__":
import uvicorn
print(f"LogBoard+ http://0.0.0.0:{APP_PORT}")
print(f"Debug mode: {'ON' if DEBUG_MODE else 'OFF'}")
if DEBUG_MODE:
print("Swagger UI: http://0.0.0.0:{}/docs".format(APP_PORT))
print("ReDoc: http://0.0.0.0:{}/redoc".format(APP_PORT))
uvicorn.run(
app,
host="0.0.0.0",
port=APP_PORT,
reload=DEBUG_MODE,
log_level="debug" if DEBUG_MODE else "info"
)