- Выделена страница списка кластеров, панель упрощена; nav_active и крошки ведут в раздел Кластеры; theme.js синхронизирует активную пилюлю по URL. - Доработки дашборда, аддонов, журнала, стилей и API-документации. - Поддержка Podman: docker-compose.podman.yml, скрипты сокета; Makefile и env.
264 lines
9.3 KiB
Python
264 lines
9.3 KiB
Python
"""Сохранение полного журнала развёртывания кластера в ``clusters/<имя>/provision_log.json``.
|
||
|
||
Журнал операций Helm-аддонов — ``helm_addon_log.json``: **история** операций (массив ``entries``),
|
||
старый формат «один объект в файле» при чтении мигрируется автоматически.
|
||
|
||
Автор: Сергей Антропов
|
||
Сайт: https://devops.org.ru
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import os
|
||
import threading
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
from core.cluster_lifecycle import validate_cluster_name
|
||
from kind_k8s_paths import clusters_dir
|
||
|
||
logger = logging.getLogger("kind_k8s.provision_log")
|
||
|
||
PROVISION_LOG_FILENAME = "provision_log.json"
|
||
PROVISION_LOG_VERSION = 1
|
||
|
||
HELM_ADDON_LOG_FILENAME = "helm_addon_log.json"
|
||
"""Имя файла истории Helm; корневой объект с полем ``entries`` (версия файла 2)."""
|
||
HELM_ADDON_LOG_VERSION = 1
|
||
"""Версия схемы одной записи внутри ``entries``."""
|
||
HELM_ADDON_LOG_FILE_VERSION = 2
|
||
"""Версия обёртки файла (список ``entries``)."""
|
||
|
||
_locks_guard = threading.Lock()
|
||
_helm_log_locks: dict[str, threading.Lock] = {}
|
||
|
||
|
||
def _helm_cluster_lock(cluster_name: str) -> threading.Lock:
|
||
with _locks_guard:
|
||
if cluster_name not in _helm_log_locks:
|
||
_helm_log_locks[cluster_name] = threading.Lock()
|
||
return _helm_log_locks[cluster_name]
|
||
|
||
|
||
def _max_helm_addon_entries() -> int:
|
||
raw = (os.environ.get("KIND_K8S_HELM_ADDON_LOG_MAX_ENTRIES") or "500").strip()
|
||
try:
|
||
return max(20, min(int(raw), 5000))
|
||
except ValueError:
|
||
return 500
|
||
|
||
|
||
def provision_log_file_path(cluster_name: str) -> Path:
|
||
"""Путь к JSON с журналом операции для кластера ``cluster_name``."""
|
||
return clusters_dir() / cluster_name.strip() / PROVISION_LOG_FILENAME
|
||
|
||
|
||
def helm_addon_log_file_path(cluster_name: str) -> Path:
|
||
"""Путь к JSON с историей журналов установки/удаления Helm-аддонов."""
|
||
return clusters_dir() / cluster_name.strip() / HELM_ADDON_LOG_FILENAME
|
||
|
||
|
||
def write_cluster_provision_log(
|
||
*,
|
||
cluster_name: str,
|
||
job_id: str,
|
||
job_kind: str,
|
||
status: str,
|
||
message: str | None,
|
||
lines: list[str],
|
||
result: dict[str, Any] | None,
|
||
) -> Path | None:
|
||
"""
|
||
Атомарно записать полный журнал (все строки без обрезки буфера задания).
|
||
|
||
Возвращает путь к файлу или ``None``, если каталог кластера не существует.
|
||
"""
|
||
name = cluster_name.strip()
|
||
cdir = clusters_dir() / name
|
||
if not cdir.is_dir():
|
||
logger.debug("Каталог кластера отсутствует, provision_log не пишем: %s", cdir)
|
||
return None
|
||
|
||
path = cdir / PROVISION_LOG_FILENAME
|
||
payload: dict[str, Any] = {
|
||
"version": PROVISION_LOG_VERSION,
|
||
"job_id": job_id,
|
||
"kind": job_kind,
|
||
"cluster_name": name,
|
||
"finished_at_utc": datetime.now(timezone.utc).isoformat(),
|
||
"status": status,
|
||
"message": message,
|
||
"lines": list(lines),
|
||
"result": result,
|
||
}
|
||
tmp = path.with_suffix(path.suffix + ".tmp")
|
||
try:
|
||
text = json.dumps(payload, ensure_ascii=False, indent=2, default=str)
|
||
tmp.write_text(text, encoding="utf-8")
|
||
tmp.replace(path)
|
||
logger.info("Сохранён журнал развёртывания: %s (%s строк)", path, len(lines))
|
||
return path
|
||
except OSError as e:
|
||
logger.warning("Не удалось записать %s: %s", path, e)
|
||
try:
|
||
tmp.unlink(missing_ok=True)
|
||
except OSError:
|
||
pass
|
||
return None
|
||
|
||
|
||
def _helm_addon_rows_from_parsed_json(data: dict[str, Any], cname: str) -> list[dict[str, Any]]:
|
||
"""Разобрать содержимое ``helm_addon_log.json`` в плоский список строк для UI/API."""
|
||
raw_entries = data.get("entries")
|
||
if isinstance(raw_entries, list) and raw_entries:
|
||
out: list[dict[str, Any]] = []
|
||
for e in raw_entries:
|
||
if isinstance(e, dict):
|
||
row = dict(e)
|
||
row["source_cluster"] = cname
|
||
out.append(row)
|
||
return out
|
||
# Легаси: один объект операции в корне файла
|
||
if data.get("job_id") is not None or data.get("finished_at_utc") is not None:
|
||
row = dict(data)
|
||
row["source_cluster"] = cname
|
||
return [row]
|
||
return []
|
||
|
||
|
||
def write_cluster_helm_addon_log(
|
||
*,
|
||
cluster_name: str,
|
||
job_id: str,
|
||
job_kind: str,
|
||
status: str,
|
||
message: str | None,
|
||
lines: list[str],
|
||
result: dict[str, Any] | None,
|
||
) -> Path | None:
|
||
"""
|
||
Добавить запись в историю ``helm_addon_log.json`` (новые записи в начале списка).
|
||
|
||
Файл хранит ``{"file_version": 2, "entries": [ ... ]}``; лимит длины — ``KIND_K8S_HELM_ADDON_LOG_MAX_ENTRIES``.
|
||
"""
|
||
name = cluster_name.strip()
|
||
cdir = clusters_dir() / name
|
||
if not cdir.is_dir():
|
||
logger.debug("Каталог кластера отсутствует, helm_addon_log не пишем: %s", cdir)
|
||
return None
|
||
|
||
path = cdir / HELM_ADDON_LOG_FILENAME
|
||
new_entry: dict[str, Any] = {
|
||
"version": HELM_ADDON_LOG_VERSION,
|
||
"job_id": job_id,
|
||
"kind": job_kind,
|
||
"cluster_name": name,
|
||
"finished_at_utc": datetime.now(timezone.utc).isoformat(),
|
||
"status": status,
|
||
"message": message,
|
||
"lines": list(lines),
|
||
"result": result,
|
||
}
|
||
|
||
lock = _helm_cluster_lock(name)
|
||
with lock:
|
||
entries: list[dict[str, Any]] = []
|
||
if path.is_file():
|
||
try:
|
||
raw = json.loads(path.read_text(encoding="utf-8"))
|
||
if isinstance(raw, dict):
|
||
ex = raw.get("entries")
|
||
if isinstance(ex, list):
|
||
entries = [e for e in ex if isinstance(e, dict)]
|
||
elif raw.get("job_id") is not None or raw.get("finished_at_utc") is not None:
|
||
legacy = {k: v for k, v in raw.items() if k != "source_cluster"}
|
||
entries = [legacy]
|
||
except (OSError, json.JSONDecodeError) as e:
|
||
logger.warning("helm_addon_log %s не прочитан, создаём заново: %s", path, e)
|
||
entries = []
|
||
|
||
entries.insert(0, new_entry)
|
||
cap = _max_helm_addon_entries()
|
||
if len(entries) > cap:
|
||
entries = entries[:cap]
|
||
|
||
payload: dict[str, Any] = {
|
||
"file_version": HELM_ADDON_LOG_FILE_VERSION,
|
||
"entries": entries,
|
||
}
|
||
tmp = path.with_suffix(path.suffix + ".tmp")
|
||
try:
|
||
tmp.write_text(
|
||
json.dumps(payload, ensure_ascii=False, indent=2, default=str),
|
||
encoding="utf-8",
|
||
)
|
||
tmp.replace(path)
|
||
logger.info(
|
||
"Сохранён журнал Helm-аддонов: %s (запись %s, всего записей в файле: %s)",
|
||
path,
|
||
job_id,
|
||
len(entries),
|
||
)
|
||
return path
|
||
except OSError as e:
|
||
logger.warning("Не удалось записать %s: %s", path, e)
|
||
try:
|
||
tmp.unlink(missing_ok=True)
|
||
except OSError:
|
||
pass
|
||
return None
|
||
|
||
|
||
def collect_cluster_dir_logs_page_sync(
|
||
*,
|
||
filename: str,
|
||
limit: int,
|
||
offset: int,
|
||
) -> tuple[list[dict[str, Any]], int]:
|
||
"""
|
||
Собрать JSON с диска по кластерам.
|
||
|
||
- ``provision_log.json`` — по одной строке на кластер (как раньше).
|
||
- ``helm_addon_log.json`` — по одной строке на **каждую** запись в ``entries`` (или одна для легаси-файла).
|
||
|
||
Сортировка по ``finished_at_utc`` (новые первыми), затем пагинация.
|
||
"""
|
||
lim = max(1, min(limit, 100))
|
||
off = max(0, offset)
|
||
root = clusters_dir()
|
||
if not root.is_dir():
|
||
return [], 0
|
||
rows: list[dict[str, Any]] = []
|
||
for sub in sorted(root.iterdir()):
|
||
if not sub.is_dir():
|
||
continue
|
||
cname = sub.name
|
||
if not validate_cluster_name(cname):
|
||
continue
|
||
path = sub / filename
|
||
if not path.is_file():
|
||
continue
|
||
try:
|
||
data = json.loads(path.read_text(encoding="utf-8"))
|
||
except (OSError, json.JSONDecodeError):
|
||
continue
|
||
if not isinstance(data, dict):
|
||
continue
|
||
if filename == HELM_ADDON_LOG_FILENAME:
|
||
rows.extend(_helm_addon_rows_from_parsed_json(data, cname))
|
||
else:
|
||
row = dict(data)
|
||
row["source_cluster"] = cname
|
||
rows.append(row)
|
||
|
||
def sort_key(r: dict[str, Any]) -> str:
|
||
return str(r.get("finished_at_utc") or "")
|
||
|
||
rows.sort(key=sort_key, reverse=True)
|
||
total = len(rows)
|
||
chunk = rows[off : off + lim]
|
||
return chunk, total
|