refactor: extract all routes to app/api/v1/endpoints/ with proper structure

This commit is contained in:
Сергей Антропов
2025-08-20 16:48:06 +03:00
parent 1e6149107d
commit 40f614304b
11 changed files with 1193 additions and 1153 deletions

View File

@@ -0,0 +1,220 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
LogBoard+ - Логи API
Автор: Сергей Антропов
Сайт: https://devops.org.ru
"""
import re
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, Query, Body
from fastapi.responses import JSONResponse
import docker
from app.core.auth import get_current_user
from app.core.docker import docker_client, DEFAULT_TAIL
router = APIRouter()
@router.get("/stats/{container_id}")
def api_logs_stats(container_id: str, current_user: str = Depends(get_current_user)):
"""Получить статистику логов контейнера"""
try:
# Ищем контейнер
container = None
for c in docker_client.containers.list(all=True):
if c.id.startswith(container_id):
container = c
break
if container is None:
return JSONResponse({"error": "Container not found"}, status_code=404)
# Получаем логи
logs = container.logs(tail=1000).decode(errors="ignore")
# Подсчитываем статистику
stats = {"debug": 0, "info": 0, "warn": 0, "error": 0}
for line in logs.split('\n'):
if not line.strip():
continue
line_lower = line.lower()
if 'level=debug' in line_lower or 'debug' in line_lower:
stats["debug"] += 1
elif 'level=info' in line_lower or 'info' in line_lower:
stats["info"] += 1
elif 'level=warning' in line_lower or 'level=warn' in line_lower or 'warning' in line_lower or 'warn' in line_lower:
stats["warn"] += 1
elif 'level=error' in line_lower or 'error' in line_lower:
stats["error"] += 1
return JSONResponse(
content=stats,
headers={
"Cache-Control": "no-cache, no-store, must-revalidate",
"Pragma": "no-cache",
"Expires": "0"
}
)
except Exception as e:
print(f"Error getting log stats for {container_id}: {e}")
return JSONResponse({"error": str(e)}, status_code=500)
@router.get("/{container_id}")
def api_logs(
container_id: str,
tail: str = Query(str(DEFAULT_TAIL), description="Количество последних строк логов или 'all' для всех логов"),
since: Optional[str] = Query(None, description="Время начала в формате ISO или относительное время (например, '10m', '1h')"),
current_user: str = Depends(get_current_user)
):
"""
Получить логи контейнера через AJAX
Args:
container_id: ID контейнера
tail: Количество последних строк или 'all' для всех логов (по умолчанию 500)
since: Время начала для фильтрации логов
Returns:
JSON с логами и метаданными
"""
try:
# Ищем контейнер
container = None
for c in docker_client.containers.list(all=True):
if c.id.startswith(container_id):
container = c
break
if container is None:
return JSONResponse({"error": "Container not found"}, status_code=404)
# Формируем параметры для получения логов
log_params = {
'timestamps': True
}
# Обрабатываем параметр tail
if tail.lower() == 'all':
# Для всех логов не указываем параметр tail
pass
else:
try:
tail_lines = int(tail)
log_params['tail'] = tail_lines
except ValueError:
# Если не удалось преобразовать в число, используем значение по умолчанию
log_params['tail'] = DEFAULT_TAIL
# Добавляем фильтр по времени, если указан (используем Unix timestamp секундной точности)
if since:
def _parse_since(value: str) -> Optional[int]:
try:
# Числовое значение (unix timestamp)
if re.fullmatch(r"\d+", value or ""):
return int(value)
# ISO 8601 с Z
if value.endswith('Z'):
dt = datetime.fromisoformat(value.replace('Z', '+00:00'))
return int(dt.timestamp())
# Пытаемся распарсить как ISO без Z
try:
dt2 = datetime.fromisoformat(value)
if dt2.tzinfo is None:
# Считаем UTC, если таймзона не указана
from datetime import timezone
dt2 = dt2.replace(tzinfo=timezone.utc)
return int(dt2.timestamp())
except Exception:
pass
except Exception:
return None
return None
parsed_since = _parse_since(since)
if parsed_since is not None:
log_params['since'] = parsed_since
# Получаем логи
logs = container.logs(**log_params).decode(errors="ignore")
# Разбиваем на строки и обрабатываем
log_lines = []
for line in logs.split('\n'):
if line.strip():
# Извлекаем временную метку и сообщение
timestamp_match = re.match(r'^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z)\s+(.+)$', line)
if timestamp_match:
timestamp, message = timestamp_match.groups()
log_lines.append({
'timestamp': timestamp,
'message': message,
'raw': line
})
else:
# Если временная метка не найдена, используем всю строку как сообщение
log_lines.append({
'timestamp': None,
'message': line,
'raw': line
})
# Получаем информацию о контейнере
container_info = {
'id': container.id,
'name': container.name,
'status': container.status,
'image': container.image.tags[0] if container.image.tags else container.image.id,
'created': container.attrs['Created'],
'state': container.attrs['State']
}
return JSONResponse(
content={
'container': container_info,
'logs': log_lines,
'total_lines': len(log_lines),
'tail': tail,
'since': since,
'timestamp': datetime.now().isoformat()
},
headers={
"Cache-Control": "no-cache, no-store, must-revalidate",
"Pragma": "no-cache",
"Expires": "0"
}
)
except Exception as e:
print(f"Error getting logs for {container_id}: {e}")
return JSONResponse({"error": str(e)}, status_code=500)
@router.post("/snapshot")
def api_snapshot(
current_user: str = Depends(get_current_user),
container_id: str = Body(..., embed=True),
service: str = Body("", embed=True),
content: str = Body("", embed=True),
):
"""Сохранить снимок логов"""
import os
from app.core.config import SNAP_DIR
# Save posted content as a snapshot file
safe_service = re.sub(r"[^a-zA-Z0-9_.-]+", "_", service or container_id[:12])
ts = os.getenv("TZ_TS") or ""
from datetime import datetime
stamp = datetime.now().strftime("%Y%m%d-%H%M%S")
fname = f"{safe_service}-{stamp}.log"
fpath = os.path.join(SNAP_DIR, fname)
with open(fpath, "w", encoding="utf-8") as f:
f.write(content)
url = f"/snapshots/{fname}"
return {"file": fname, "url": url}