Files
KindClustersDashboard/app/core/provision_log.py
Sergey Antropoff eb063aec20 Веб-интерфейс: страница /clusters, навигация и крошки для кластеров
- Выделена страница списка кластеров, панель упрощена; nav_active и крошки
  ведут в раздел Кластеры; theme.js синхронизирует активную пилюлю по URL.
- Доработки дашборда, аддонов, журнала, стилей и API-документации.
- Поддержка Podman: docker-compose.podman.yml, скрипты сокета; Makefile и env.
2026-04-04 13:42:21 +03:00

264 lines
9.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Сохранение полного журнала развёртывания кластера в ``clusters/<имя>/provision_log.json``.
Журнал операций Helm-аддонов — ``helm_addon_log.json``: **история** операций (массив ``entries``),
старый формат «один объект в файле» при чтении мигрируется автоматически.
Автор: Сергей Антропов
Сайт: https://devops.org.ru
"""
from __future__ import annotations
import json
import logging
import os
import threading
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from core.cluster_lifecycle import validate_cluster_name
from kind_k8s_paths import clusters_dir
logger = logging.getLogger("kind_k8s.provision_log")
PROVISION_LOG_FILENAME = "provision_log.json"
PROVISION_LOG_VERSION = 1
HELM_ADDON_LOG_FILENAME = "helm_addon_log.json"
"""Имя файла истории Helm; корневой объект с полем ``entries`` (версия файла 2)."""
HELM_ADDON_LOG_VERSION = 1
"""Версия схемы одной записи внутри ``entries``."""
HELM_ADDON_LOG_FILE_VERSION = 2
"""Версия обёртки файла (список ``entries``)."""
_locks_guard = threading.Lock()
_helm_log_locks: dict[str, threading.Lock] = {}
def _helm_cluster_lock(cluster_name: str) -> threading.Lock:
with _locks_guard:
if cluster_name not in _helm_log_locks:
_helm_log_locks[cluster_name] = threading.Lock()
return _helm_log_locks[cluster_name]
def _max_helm_addon_entries() -> int:
raw = (os.environ.get("KIND_K8S_HELM_ADDON_LOG_MAX_ENTRIES") or "500").strip()
try:
return max(20, min(int(raw), 5000))
except ValueError:
return 500
def provision_log_file_path(cluster_name: str) -> Path:
"""Путь к JSON с журналом операции для кластера ``cluster_name``."""
return clusters_dir() / cluster_name.strip() / PROVISION_LOG_FILENAME
def helm_addon_log_file_path(cluster_name: str) -> Path:
"""Путь к JSON с историей журналов установки/удаления Helm-аддонов."""
return clusters_dir() / cluster_name.strip() / HELM_ADDON_LOG_FILENAME
def write_cluster_provision_log(
*,
cluster_name: str,
job_id: str,
job_kind: str,
status: str,
message: str | None,
lines: list[str],
result: dict[str, Any] | None,
) -> Path | None:
"""
Атомарно записать полный журнал (все строки без обрезки буфера задания).
Возвращает путь к файлу или ``None``, если каталог кластера не существует.
"""
name = cluster_name.strip()
cdir = clusters_dir() / name
if not cdir.is_dir():
logger.debug("Каталог кластера отсутствует, provision_log не пишем: %s", cdir)
return None
path = cdir / PROVISION_LOG_FILENAME
payload: dict[str, Any] = {
"version": PROVISION_LOG_VERSION,
"job_id": job_id,
"kind": job_kind,
"cluster_name": name,
"finished_at_utc": datetime.now(timezone.utc).isoformat(),
"status": status,
"message": message,
"lines": list(lines),
"result": result,
}
tmp = path.with_suffix(path.suffix + ".tmp")
try:
text = json.dumps(payload, ensure_ascii=False, indent=2, default=str)
tmp.write_text(text, encoding="utf-8")
tmp.replace(path)
logger.info("Сохранён журнал развёртывания: %s (%s строк)", path, len(lines))
return path
except OSError as e:
logger.warning("Не удалось записать %s: %s", path, e)
try:
tmp.unlink(missing_ok=True)
except OSError:
pass
return None
def _helm_addon_rows_from_parsed_json(data: dict[str, Any], cname: str) -> list[dict[str, Any]]:
"""Разобрать содержимое ``helm_addon_log.json`` в плоский список строк для UI/API."""
raw_entries = data.get("entries")
if isinstance(raw_entries, list) and raw_entries:
out: list[dict[str, Any]] = []
for e in raw_entries:
if isinstance(e, dict):
row = dict(e)
row["source_cluster"] = cname
out.append(row)
return out
# Легаси: один объект операции в корне файла
if data.get("job_id") is not None or data.get("finished_at_utc") is not None:
row = dict(data)
row["source_cluster"] = cname
return [row]
return []
def write_cluster_helm_addon_log(
*,
cluster_name: str,
job_id: str,
job_kind: str,
status: str,
message: str | None,
lines: list[str],
result: dict[str, Any] | None,
) -> Path | None:
"""
Добавить запись в историю ``helm_addon_log.json`` (новые записи в начале списка).
Файл хранит ``{"file_version": 2, "entries": [ ... ]}``; лимит длины — ``KIND_K8S_HELM_ADDON_LOG_MAX_ENTRIES``.
"""
name = cluster_name.strip()
cdir = clusters_dir() / name
if not cdir.is_dir():
logger.debug("Каталог кластера отсутствует, helm_addon_log не пишем: %s", cdir)
return None
path = cdir / HELM_ADDON_LOG_FILENAME
new_entry: dict[str, Any] = {
"version": HELM_ADDON_LOG_VERSION,
"job_id": job_id,
"kind": job_kind,
"cluster_name": name,
"finished_at_utc": datetime.now(timezone.utc).isoformat(),
"status": status,
"message": message,
"lines": list(lines),
"result": result,
}
lock = _helm_cluster_lock(name)
with lock:
entries: list[dict[str, Any]] = []
if path.is_file():
try:
raw = json.loads(path.read_text(encoding="utf-8"))
if isinstance(raw, dict):
ex = raw.get("entries")
if isinstance(ex, list):
entries = [e for e in ex if isinstance(e, dict)]
elif raw.get("job_id") is not None or raw.get("finished_at_utc") is not None:
legacy = {k: v for k, v in raw.items() if k != "source_cluster"}
entries = [legacy]
except (OSError, json.JSONDecodeError) as e:
logger.warning("helm_addon_log %s не прочитан, создаём заново: %s", path, e)
entries = []
entries.insert(0, new_entry)
cap = _max_helm_addon_entries()
if len(entries) > cap:
entries = entries[:cap]
payload: dict[str, Any] = {
"file_version": HELM_ADDON_LOG_FILE_VERSION,
"entries": entries,
}
tmp = path.with_suffix(path.suffix + ".tmp")
try:
tmp.write_text(
json.dumps(payload, ensure_ascii=False, indent=2, default=str),
encoding="utf-8",
)
tmp.replace(path)
logger.info(
"Сохранён журнал Helm-аддонов: %s (запись %s, всего записей в файле: %s)",
path,
job_id,
len(entries),
)
return path
except OSError as e:
logger.warning("Не удалось записать %s: %s", path, e)
try:
tmp.unlink(missing_ok=True)
except OSError:
pass
return None
def collect_cluster_dir_logs_page_sync(
*,
filename: str,
limit: int,
offset: int,
) -> tuple[list[dict[str, Any]], int]:
"""
Собрать JSON с диска по кластерам.
- ``provision_log.json`` — по одной строке на кластер (как раньше).
- ``helm_addon_log.json`` — по одной строке на **каждую** запись в ``entries`` (или одна для легаси-файла).
Сортировка по ``finished_at_utc`` (новые первыми), затем пагинация.
"""
lim = max(1, min(limit, 100))
off = max(0, offset)
root = clusters_dir()
if not root.is_dir():
return [], 0
rows: list[dict[str, Any]] = []
for sub in sorted(root.iterdir()):
if not sub.is_dir():
continue
cname = sub.name
if not validate_cluster_name(cname):
continue
path = sub / filename
if not path.is_file():
continue
try:
data = json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
continue
if not isinstance(data, dict):
continue
if filename == HELM_ADDON_LOG_FILENAME:
rows.extend(_helm_addon_rows_from_parsed_json(data, cname))
else:
row = dict(data)
row["source_cluster"] = cname
rows.append(row)
def sort_key(r: dict[str, Any]) -> str:
return str(r.get("finished_at_utc") or "")
rows.sort(key=sort_key, reverse=True)
total = len(rows)
chunk = rows[off : off + lim]
return chunk, total