#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ LogBoard+ - Логи API Автор: Сергей Антропов Сайт: https://devops.org.ru """ import re from datetime import datetime from typing import Optional from fastapi import APIRouter, Depends, HTTPException, Query, Body from fastapi.responses import JSONResponse import docker from core.auth import get_current_user from core.docker import docker_client, DEFAULT_TAIL from core.logger import api_logger router = APIRouter() @router.get("/stats/{container_id}") def api_logs_stats(container_id: str, current_user: str = Depends(get_current_user)): """Получить статистику логов контейнера""" try: # Ищем контейнер container = None for c in docker_client.containers.list(all=True): if c.id.startswith(container_id): container = c break if container is None: return JSONResponse({"error": "Container not found"}, status_code=404) # Получаем логи logs = container.logs(tail=1000).decode(errors="ignore") # Подсчитываем статистику stats = {"debug": 0, "info": 0, "warn": 0, "error": 0} for line in logs.split('\n'): if not line.strip(): continue line_lower = line.lower() if 'level=debug' in line_lower or 'debug' in line_lower: stats["debug"] += 1 elif 'level=info' in line_lower or 'info' in line_lower: stats["info"] += 1 elif 'level=warning' in line_lower or 'level=warn' in line_lower or 'warning' in line_lower or 'warn' in line_lower: stats["warn"] += 1 elif 'level=error' in line_lower or 'error' in line_lower: stats["error"] += 1 return JSONResponse( content=stats, headers={ "Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", "Expires": "0" } ) except Exception as e: api_logger.error(f"Error getting log stats for {container_id}: {e}") return JSONResponse({"error": str(e)}, status_code=500) @router.get("/{container_id}") def api_logs( container_id: str, tail: str = Query(str(DEFAULT_TAIL), description="Количество последних строк логов или 'all' для всех логов"), since: Optional[str] = Query(None, description="Время начала в формате ISO или относительное время (например, '10m', '1h')"), current_user: str = Depends(get_current_user) ): """ Получить логи контейнера через AJAX Args: container_id: ID контейнера tail: Количество последних строк или 'all' для всех логов (по умолчанию 500) since: Время начала для фильтрации логов Returns: JSON с логами и метаданными """ try: # Ищем контейнер container = None for c in docker_client.containers.list(all=True): if c.id.startswith(container_id): container = c break if container is None: return JSONResponse({"error": "Container not found"}, status_code=404) # Формируем параметры для получения логов log_params = { 'timestamps': True } # Обрабатываем параметр tail if tail.lower() == 'all': # Для всех логов не указываем параметр tail pass else: try: tail_lines = int(tail) log_params['tail'] = tail_lines except ValueError: # Если не удалось преобразовать в число, используем значение по умолчанию log_params['tail'] = DEFAULT_TAIL # Добавляем фильтр по времени, если указан (используем Unix timestamp секундной точности) if since: def _parse_since(value: str) -> Optional[int]: try: # Числовое значение (unix timestamp) if re.fullmatch(r"\d+", value or ""): return int(value) # ISO 8601 с Z if value.endswith('Z'): dt = datetime.fromisoformat(value.replace('Z', '+00:00')) return int(dt.timestamp()) # Пытаемся распарсить как ISO без Z try: dt2 = datetime.fromisoformat(value) if dt2.tzinfo is None: # Считаем UTC, если таймзона не указана from datetime import timezone dt2 = dt2.replace(tzinfo=timezone.utc) return int(dt2.timestamp()) except Exception: pass except Exception: return None return None parsed_since = _parse_since(since) if parsed_since is not None: log_params['since'] = parsed_since # Получаем логи logs = container.logs(**log_params).decode(errors="ignore") # Разбиваем на строки и обрабатываем log_lines = [] for line in logs.split('\n'): if line.strip(): # Извлекаем временную метку и сообщение timestamp_match = re.match(r'^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z)\s+(.+)$', line) if timestamp_match: timestamp, message = timestamp_match.groups() log_lines.append({ 'timestamp': timestamp, 'message': message, 'raw': line }) else: # Если временная метка не найдена, используем всю строку как сообщение log_lines.append({ 'timestamp': None, 'message': line, 'raw': line }) # Получаем информацию о контейнере container_info = { 'id': container.id, 'name': container.name, 'status': container.status, 'image': container.image.tags[0] if container.image.tags else container.image.id, 'created': container.attrs['Created'], 'state': container.attrs['State'] } return JSONResponse( content={ 'container': container_info, 'logs': log_lines, 'total_lines': len(log_lines), 'tail': tail, 'since': since, 'timestamp': datetime.now().isoformat() }, headers={ "Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", "Expires": "0" } ) except Exception as e: api_logger.error(f"Error getting logs for {container_id}: {e}") return JSONResponse({"error": str(e)}, status_code=500) @router.post("/snapshot") def api_snapshot( current_user: str = Depends(get_current_user), container_id: str = Body(..., embed=True), service: str = Body("", embed=True), content: str = Body("", embed=True), ): """Сохранить снимок логов""" import os from core.config import SNAP_DIR # Save posted content as a snapshot file safe_service = re.sub(r"[^a-zA-Z0-9_.-]+", "_", service or container_id[:12]) ts = os.getenv("TZ_TS") or "" from datetime import datetime stamp = datetime.now().strftime("%Y%m%d-%H%M%S") fname = f"{safe_service}-{stamp}.log" fpath = os.path.join(SNAP_DIR, fname) with open(fpath, "w", encoding="utf-8") as f: f.write(content) url = f"/snapshots/{fname}" return {"file": fname, "url": url}