Files
KindClustersDashboard/app/core/cluster_lifecycle.py
Sergey Antropoff eb063aec20 Веб-интерфейс: страница /clusters, навигация и крошки для кластеров
- Выделена страница списка кластеров, панель упрощена; nav_active и крошки
  ведут в раздел Кластеры; theme.js синхронизирует активную пилюлю по URL.
- Доработки дашборда, аддонов, журнала, стилей и API-документации.
- Поддержка Podman: docker-compose.podman.yml, скрипты сокета; Makefile и env.
2026-04-04 13:42:21 +03:00

1140 lines
43 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Синхронные операции с kind: конфиг, создание, удаление, ожидание готовности нод.
Используются интерактивными CLI-скриптами и веб-слоем (через asyncio.to_thread / executor).
Автор: Сергей Антропов
Сайт: https://devops.org.ru
"""
from __future__ import annotations
import codecs
import errno
import json
import logging
import os
import re
import shutil
import subprocess
from collections.abc import Callable
from typing import Any
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from kind_k8s_paths import clusters_dir, container_cli_name, data_root
from kindest_node_tags import normalize_tag_v_prefix
from kubeconfig_patch import (
kubeconfig_host_file,
kubeconfig_path_for_container_kubectl,
patch_kubeconfig_server_for_host,
should_patch_after_create,
)
logger = logging.getLogger("kind_k8s.cluster_lifecycle")
# Удаление ANSI/OSC из потока (docker TTY: курсор [1A, очистка [2K и т.д.) — в журнале остаётся читаемый текст.
_ANSI_CSI_RE = re.compile(r"\x1b\[[\d;?]*[ -/]*[@-~]")
_ANSI_OSC_RE = re.compile(r"\x1b\][^\x07]*(?:\x07|\x1b\\)")
def _strip_ansi_stream_text(s: str) -> str:
"""Убрать CSI/OSC-последовательности терминала из строки лога (docker TTY: [1A, [2K, …)."""
if not s:
return ""
t = _ANSI_CSI_RE.sub("", s)
return _ANSI_OSC_RE.sub("", t)
def _rollback_after_cancel(*, cluster_name: str, out_dir: Path) -> None:
"""Удалить кластер kind и каталог данных после запроса отмены (best-effort)."""
logger.info("Откат после отмены: kind delete «%s»", cluster_name)
subprocess.run(
["kind", "delete", "cluster", "--name", cluster_name],
capture_output=True,
text=True,
)
if out_dir.is_dir():
try:
shutil.rmtree(out_dir)
logger.info("Удалён каталог %s", out_dir)
except OSError as e:
logger.warning("Не удалось удалить %s: %s", out_dir, e)
# Имя кластера: поддомен DNS (RFC 1123)
_NAME_RE = re.compile(r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$")
class KindClusterError(Exception):
"""Ошибка операции kind (создание, удаление и т.д.)."""
def __init__(self, message: str, *, exit_code: int = 1) -> None:
super().__init__(message)
self.exit_code = exit_code
def validate_cluster_name(name: str) -> bool:
"""Проверить имя кластера (DNS-подмножество, длина ≤ 63)."""
if not name or len(name) > 63:
return False
return bool(_NAME_RE.match(name))
def normalize_k8s_version(raw: str) -> str:
"""Превратить ввод в тег образа kindest/node (1.29.4 → v1.29.4) или ``latest``."""
s = raw.strip().lower()
if not s:
return "v1.29.4"
if s in ("latest", "vlatest"):
return "latest"
s = s.removeprefix("v")
return f"v{s}"
def build_kind_config_yaml(*, node_image: str, workers: int) -> str:
"""YAML для kind: один control-plane + workers."""
lines = [
"kind: Cluster",
"apiVersion: kind.x-k8s.io/v1alpha4",
"nodes:",
" - role: control-plane",
f" image: {node_image}",
]
for _ in range(workers):
lines.append(" - role: worker")
lines.append(f" image: {node_image}")
return "\n".join(lines) + "\n"
def list_registered_kind_clusters() -> list[str]:
"""Имена кластеров kind; при ошибке — пустой список."""
p = subprocess.run(["kind", "get", "clusters"], capture_output=True, text=True)
if p.returncode != 0:
logger.info("kind get clusters завершился с кодом %s", p.returncode)
return []
lines = [x.strip() for x in (p.stdout or "").splitlines() if x.strip()]
return [x for x in lines if "no kind" not in x.lower()]
def _in_container() -> bool:
return os.environ.get("KIND_K8S_IN_CONTAINER", "").strip() == "1"
def _run_checked(cmd: list[str], *, cwd: Path | None = None) -> None:
"""Выполнить команду; при ошибке — KindClusterError с текстом stderr."""
logger.info("Выполнение: %s", " ".join(cmd))
p = subprocess.run(cmd, cwd=cwd, capture_output=True, text=True)
if p.returncode != 0:
err = (p.stderr or p.stdout or "").strip()
raise KindClusterError(f"Команда завершилась с кодом {p.returncode}: {err}", exit_code=p.returncode)
def _stream_pty_enabled() -> bool:
"""
Псевдо-TTY: docker/kind при выводе в pipe часто полностью буферизуют stdout — в журнале «тишина»
до конца команды. С TTY строки прогресса (в т.ч. pull) уходят порциями.
Отключение: ``KIND_K8S_STREAM_PTY=0``.
"""
if os.name != "posix":
return False
v = (os.environ.get("KIND_K8S_STREAM_PTY") or "1").strip().lower()
return v not in ("0", "false", "no", "off", "нет")
def _emit_stream_lines(
buffer: str,
on_line: Callable[[str], None] | None,
*,
flush_tail: bool,
) -> str:
"""
Разобрать буфер по ``\\n`` и ``\\r`` (прогресс docker/containerd часто идёт с ``\\r`` без ``\\n``).
При ``flush_tail=False`` возвращает неполный хвост после последнего ``\\n``.
"""
if not buffer:
return ""
normalized = buffer.replace("\r\n", "\n").replace("\r", "\n")
parts = normalized.split("\n")
if flush_tail:
for part in parts:
s = _strip_ansi_stream_text(part).strip()
if s and on_line:
on_line(s)
if s:
logger.debug("stream: %s", s[:800])
return ""
if len(parts) == 1:
return parts[0]
*body, tail = parts
for part in body:
s = _strip_ansi_stream_text(part).strip()
if s and on_line:
on_line(s)
if s:
logger.debug("stream: %s", s[:800])
return tail
def _run_checked_stream(
cmd: list[str],
*,
cwd: Path | None = None,
on_line: Callable[[str], None] | None = None,
job_id: str | None = None,
use_pty: bool | None = None,
) -> None:
"""
Выполнить команду с потоковым выводом в колбэк (stdout+stderr объединены).
``use_pty=None`` — как в ``KIND_K8S_STREAM_PTY``. ``use_pty=False`` — только pipe (удобно вместе
с ``docker pull --progress plain``). ``use_pty=True`` — принудительно PTY на POSIX.
``job_id`` — регистрация ``Popen`` для принудительной отмены (``SIGKILL`` группы процессов).
"""
from core import job_store as _js
logger.info("Выполнение (поток): %s", " ".join(cmd))
master_fd: int | None = None
if use_pty is None:
use_pty = _stream_pty_enabled()
slave_fd: int | None = None
if use_pty:
try:
import pty
master_fd, slave_fd = pty.openpty()
except OSError as e:
logger.warning("PTY недоступен (%s), поток через pipe — вывод может обновляться с задержкой", e)
use_pty = False
master_fd = None
slave_fd = None
if use_pty and slave_fd is not None and master_fd is not None:
try:
p = subprocess.Popen(
cmd,
cwd=cwd,
stdin=slave_fd,
stdout=slave_fd,
stderr=subprocess.STDOUT,
close_fds=False,
start_new_session=True,
)
except Exception:
try:
os.close(slave_fd)
except OSError:
pass
try:
os.close(master_fd)
except OSError:
pass
raise
try:
os.close(slave_fd)
except OSError:
pass
slave_fd = None
else:
p = subprocess.Popen(
cmd,
cwd=cwd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=0,
start_new_session=True,
)
master_fd = None
if p.stdout is None:
raise KindClusterError("Не удалось открыть stdout процесса", exit_code=1)
if job_id:
_js.register_subprocess_sync(job_id, p)
decoder = codecs.getincrementaldecoder("utf-8")(errors="replace")
carry = ""
rc = -1
try:
while True:
try:
if master_fd is not None:
raw = os.read(master_fd, 4096)
else:
raw = p.stdout.read(4096) if p.stdout else b""
except OSError as e:
# EINTR — повторить read; EIO — slave PTY закрыт (часто после SIGKILL при отмене).
_eio = getattr(errno, "EIO", 5)
if e.errno == errno.EINTR:
continue
if e.errno == _eio:
logger.debug("Чтение потока: EIO (вероятно процесс завершён), выходим из цикла")
break
raise
if not raw:
break
carry += decoder.decode(raw)
carry = _emit_stream_lines(carry, on_line, flush_tail=False)
carry += decoder.decode(b"", final=True)
carry = _emit_stream_lines(carry, on_line, flush_tail=True)
rc = p.wait()
finally:
if job_id:
_js.unregister_subprocess_sync(job_id)
if master_fd is not None:
try:
os.close(master_fd)
except OSError:
pass
else:
try:
if p.stdout:
p.stdout.close()
except OSError:
pass
if job_id and _js.is_cancelled_sync(job_id):
raise KindClusterError("Операция отменена", exit_code=130)
if rc != 0:
raise KindClusterError(f"Команда завершилась с кодом {rc} (см. журнал выше)", exit_code=rc)
def _docker_pull_plain_progress_enabled() -> bool:
"""
Разрешить попытку ``docker pull --progress=plain`` (если CLI это поддерживает).
Полностью отключить: ``KIND_K8S_DOCKER_PULL_PLAIN=0``.
"""
v = (os.environ.get("KIND_K8S_DOCKER_PULL_PLAIN") or "1").strip().lower()
return v not in ("0", "false", "no", "off", "нет")
# Кэш: есть ли в выводе ``docker pull --help`` опция ``--progress`` (в старых CLI её нет).
_docker_pull_help_has_progress: bool | None = None
def _docker_pull_cli_supports_progress_flag() -> bool:
"""
Узнать по справке, объявляет ли данный ``docker`` флаг ``--progress`` для ``pull``.
Без этого на старых клиентах в журнал попадало «unknown flag: --progress», а код выхода часто **1**,
а не 125 — повтор без флага не срабатывал.
"""
global _docker_pull_help_has_progress
if _docker_pull_help_has_progress is not None:
return _docker_pull_help_has_progress
exe = shutil.which("docker")
if not exe:
_docker_pull_help_has_progress = False
return False
try:
p = subprocess.run(
[exe, "pull", "--help"],
capture_output=True,
text=True,
timeout=12,
)
blob = (p.stdout or "") + (p.stderr or "")
_docker_pull_help_has_progress = "--progress" in blob
if not _docker_pull_help_has_progress:
logger.info("docker pull: в справке нет --progress — используем обычный pull (PTY по KIND_K8S_STREAM_PTY)")
except (OSError, subprocess.TimeoutExpired) as e:
logger.warning("Не удалось выполнить docker pull --help (%s), без флага --progress", e)
_docker_pull_help_has_progress = False
return _docker_pull_help_has_progress
def _pull_node_image_streaming(*, image: str, on_line: Callable[[str], None], job_id: str | None) -> None:
"""
Скачать образ узлов через ``docker pull`` / ``podman pull`` с выводом в журнал задания.
Для **docker** по умолчанию: ``--progress plain`` и **без PTY** — идут строки Downloading/Extracting
с процентами. Для **podman** — обычный pull и PTY по ``KIND_K8S_STREAM_PTY``.
"""
cli = _container_cli_bin()
if not shutil.which(cli):
raise KindClusterError(
"Скачивание образа недоступно: не найдена программа для работы с контейнерами (docker/podman).",
exit_code=127,
)
use_docker_plain = (
cli == "docker"
and _docker_pull_plain_progress_enabled()
and _docker_pull_cli_supports_progress_flag()
)
if use_docker_plain:
try:
_run_checked_stream(
[cli, "pull", "--progress=plain", image],
on_line=on_line,
job_id=job_id,
use_pty=False,
)
return
except KindClusterError as e:
if e.exit_code == 130:
raise
# На всякий случай, если справка устарела или другой код ошибки.
logger.warning("Повтор docker pull без --progress=plain (код %s): %s", e.exit_code, e)
_run_checked_stream([cli, "pull", image], on_line=on_line, job_id=job_id, use_pty=None)
return
_run_checked_stream([cli, "pull", image], on_line=on_line, job_id=job_id, use_pty=None)
def _run_capture_checked(cmd: list[str]) -> str:
p = subprocess.run(cmd, capture_output=True, text=True)
if p.returncode != 0:
err = (p.stderr or p.stdout or "").strip()
raise KindClusterError(err or "команда не удалась", exit_code=p.returncode)
return (p.stdout or "").strip()
def _wait_nodes_enabled() -> bool:
raw = (os.environ.get("KIND_K8S_WAIT_NODES") or "1").strip().lower()
return raw in ("1", "true", "yes", "да")
def _wait_nodes_timeout_sec() -> int:
raw = (os.environ.get("KIND_K8S_WAIT_NODES_TIMEOUT_SEC") or "300").strip()
try:
return max(30, min(int(raw), 3600))
except ValueError:
return 300
def wait_nodes_ready(
*,
cluster_name: str,
kubeconfig_path: Path,
timeout_sec: int | None = None,
) -> tuple[bool, str]:
"""
Дождаться condition=Ready для всех нод через kubectl wait.
В контейнере веб-приложения путь к kubeconfig проходит через
``kubeconfig_path_for_container_kubectl`` (шлюз хоста + проброшенный порт или имя узла kind).
Возвращает (успех, сообщение для лога/UI).
"""
if timeout_sec is None:
timeout_sec = _wait_nodes_timeout_sec()
cfg = kubeconfig_path_for_container_kubectl(
cluster_name=cluster_name,
kube_source_path=kubeconfig_path,
)
t = f"{timeout_sec}s"
cmd = [
"kubectl",
"--kubeconfig",
str(cfg),
"wait",
"--for=condition=Ready",
"nodes",
"--all",
f"--timeout={t}",
]
logger.info("Ожидание готовности нод: timeout=%s", t)
p = subprocess.run(cmd, capture_output=True, text=True)
out = (p.stdout or "").strip()
err = (p.stderr or "").strip()
if p.returncode == 0:
return True, out or "ноды в состоянии Ready"
msg = err or out or f"код выхода {p.returncode}"
return False, msg
@dataclass(frozen=True)
class CreateClusterResult:
"""Результат успешного создания кластера."""
cluster_name: str
ver_tag: str
node_image: str
workers: int
kubeconfig_path: Path
meta_path: Path
kubeconfig_patched_for_host: bool
nodes_ready: bool | None
nodes_ready_message: str | None
def create_cluster_non_interactive(
*,
name: str,
kubernetes_version_tag: str,
workers: int,
job_id: str | None = None,
use_existing_config: bool = False,
) -> CreateClusterResult:
"""
Создать кластер kind без диалогов.
``kubernetes_version_tag`` — тег kindest/node: ``latest``, ``v1.29.4`` и т.д. (см. ``normalize_tag_v_prefix``).
``job_id`` — если задан, обновляется прогресс и проверяется отмена (см. ``job_store``).
``use_existing_config=True`` — не перезаписывать ``kind-config.yaml``, поднять кластер по уже
сохранённому файлу (каталог ``clusters/<имя>/`` должен существовать).
"""
from core import job_store as _job_store
def _progress(stage: str, pct: int) -> None:
if job_id:
_job_store.set_progress_sync(job_id, stage, pct)
def _log(line: str) -> None:
if job_id:
_job_store.append_log_sync(job_id, line)
def _cancelled() -> bool:
return bool(job_id and _job_store.is_cancelled_sync(job_id))
if not shutil.which("kind"):
raise KindClusterError("Не найден бинарник kind в PATH.", exit_code=127)
if not validate_cluster_name(name):
raise KindClusterError("Некорректное имя кластера (a-z0-9-, не длиннее 63).")
existing = list_registered_kind_clusters()
if name in existing:
raise KindClusterError(f"Кластер «{name}» уже существует в kind.")
if not use_existing_config and (workers < 0 or workers > 20):
raise KindClusterError("Количество worker-нод должно быть от 0 до 20.")
# Каталог данных кластера до долгих шагов — чтобы при сбое журнал в journal/jobs_history.json
# и provision_log могли быть привязаны к существующему clusters/<имя>/.
root = data_root()
cdir = clusters_dir()
out_dir = cdir / name
out_dir.mkdir(parents=True, exist_ok=True)
ver_tag = normalize_tag_v_prefix(kubernetes_version_tag)
node_image = f"kindest/node:{ver_tag}"
cfg_path = out_dir / "kind-config.yaml"
kube_path = out_dir / "kubeconfig"
meta_path = out_dir / "meta.json"
prev_meta_for_workers: dict[str, object] = {}
if use_existing_config:
if not cfg_path.is_file():
raise KindClusterError(f"Нет сохранённого kind-config.yaml: {cfg_path}")
prev = read_meta_json(name) or {}
prev_meta_for_workers = prev
if prev.get("node_image"):
node_image = str(prev["node_image"])
if prev.get("kubernetes_version_tag"):
ver_tag = str(prev["kubernetes_version_tag"])
_progress("Чтение сохранённого конфига", 10)
_log("Используется сохранённый файл конфигурации кластера.")
else:
yaml_text = build_kind_config_yaml(node_image=node_image, workers=workers)
cfg_path.write_text(yaml_text, encoding="utf-8")
_progress("Подготовка конфигурации", 12)
_log("Файл конфигурации кластера сохранён в каталог данных.")
if _cancelled():
_rollback_after_cancel(cluster_name=name, out_dir=out_dir)
raise KindClusterError("Операция отменена")
logger.info(
"Создание кластера «%s», образ %s, workers=%s, existing_cfg=%s",
name,
node_image,
workers,
use_existing_config,
)
_progress("Скачивание образа при необходимости", 18)
if _cancelled():
_rollback_after_cancel(cluster_name=name, out_dir=out_dir)
raise KindClusterError("Операция отменена")
try:
_pull_node_image_streaming(image=node_image, on_line=_log, job_id=job_id)
except KindClusterError as e:
if _cancelled():
_rollback_after_cancel(cluster_name=name, out_dir=out_dir)
raise KindClusterError("Операция отменена") from e
raise
if _cancelled():
_rollback_after_cancel(cluster_name=name, out_dir=out_dir)
raise KindClusterError("Операция отменена")
_progress("Создание узлов кластера", 32)
_run_checked_stream(
["kind", "create", "cluster", "--name", name, "--config", str(cfg_path)],
on_line=_log,
job_id=job_id,
)
if _cancelled():
_rollback_after_cancel(cluster_name=name, out_dir=out_dir)
raise KindClusterError("Операция отменена")
_progress("Сохранение доступа к кластеру", 58)
kube = _run_capture_checked(["kind", "get", "kubeconfig", "--name", name])
kube_path.write_text(kube, encoding="utf-8")
if _cancelled():
_rollback_after_cancel(cluster_name=name, out_dir=out_dir)
raise KindClusterError("Операция отменена")
host_kube_path = kubeconfig_host_file(kube_path.parent)
patched = False
if should_patch_after_create():
_progress("Настройка доступа с вашего компьютера", 72)
shutil.copyfile(kube_path, host_kube_path, follow_symlinks=True)
patched = patch_kubeconfig_server_for_host(cluster_name=name, kube_path=host_kube_path)
if not patched:
host_kube_path.unlink(missing_ok=True)
if _cancelled():
_rollback_after_cancel(cluster_name=name, out_dir=out_dir)
raise KindClusterError("Операция отменена")
nodes_ready: bool | None = None
nodes_msg: str | None = None
if _wait_nodes_enabled():
_progress("Ожидание готовности узлов", 82)
ok, msg = wait_nodes_ready(cluster_name=name, kubeconfig_path=kube_path)
nodes_ready = ok
nodes_msg = msg
if ok:
logger.info("Ноды готовы: %s", msg)
else:
logger.warning("Ожидание нод не завершилось успешно: %s", msg)
_log(("Узлы готовы" if ok else "Ожидание узлов: ") + (msg or "")[:4000])
worker_nodes_meta = workers
if use_existing_config:
prev_w = prev_meta_for_workers.get("worker_nodes")
if prev_w is not None:
try:
worker_nodes_meta = int(prev_w)
except (TypeError, ValueError):
worker_nodes_meta = workers
meta = {
"cluster_name": name,
"kubernetes_version_tag": ver_tag,
"node_image": node_image,
"worker_nodes": worker_nodes_meta,
"created_at_utc": datetime.now(timezone.utc).isoformat(),
"kind_config_path": str(cfg_path.relative_to(root)),
"kubeconfig_path": str(kube_path.relative_to(root)),
"kubeconfig_patched_for_host": patched,
"created_via_container": _in_container(),
"nodes_ready_after_create": nodes_ready,
"nodes_ready_message": nodes_msg,
"provisioned_from_existing_config": use_existing_config,
}
if patched:
meta["kubeconfig_host_path"] = str(host_kube_path.relative_to(root))
# Сохраняем описание из прежнего meta.json при подъёме по существующему конфигу (в т.ч. reapply).
if use_existing_config:
desc = prev_meta_for_workers.get("description")
if isinstance(desc, str) and desc.strip():
meta["description"] = desc.strip()[:2000]
meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")
_progress("Завершение", 95)
return CreateClusterResult(
cluster_name=name,
ver_tag=ver_tag,
node_image=node_image,
workers=worker_nodes_meta,
kubeconfig_path=kube_path,
meta_path=meta_path,
kubeconfig_patched_for_host=patched,
nodes_ready=nodes_ready,
nodes_ready_message=nodes_msg,
)
def delete_kind_cluster_and_data(*, name: str, log_to_stdout: bool = False) -> tuple[bool, str]:
"""
``kind delete cluster`` и удаление ``clusters/<имя>/``.
Первый элемент — успешность ``kind delete``; второй — текстовое резюме всего шага.
``log_to_stdout=True`` — не перехватывать stdout/stderr kind (удобно в интерактивном CLI).
"""
if not shutil.which("kind"):
raise KindClusterError("Не найден kind в PATH.", exit_code=127)
cdir = clusters_dir()
parts: list[str] = []
kind_ok = True
if log_to_stdout:
p = subprocess.run(["kind", "delete", "cluster", "--name", name])
if p.returncode != 0:
parts.append(f"kind delete: код {p.returncode}")
logger.warning("kind delete cluster %s: код %s", name, p.returncode)
kind_ok = False
else:
parts.append("kind delete: OK")
else:
p = subprocess.run(
["kind", "delete", "cluster", "--name", name],
capture_output=True,
text=True,
)
if p.returncode != 0:
err = (p.stderr or p.stdout or "").strip()
parts.append(f"kind delete: ошибка ({err or p.returncode})")
logger.warning("kind delete cluster %s: %s", name, err)
kind_ok = False
else:
parts.append("kind delete: OK")
d = cdir / name
if d.is_dir():
shutil.rmtree(d)
parts.append(f"удалена папка {d}")
else:
parts.append("локальная папка отсутствовала")
return kind_ok, "; ".join(parts)
def delete_kind_cluster_keep_data(*, name: str, job_id: str | None = None) -> tuple[bool, str]:
"""
Выполнить ``kind delete cluster`` без удаления каталога ``clusters/<имя>/``.
Используется после изменения ``kind-config.yaml``: затем ``kind create`` поднимает узлы
по новому файлу. Если запись в kind уже отсутствует — считаем шаг успешным и идём к create.
При ``job_id`` вывод и отмена — как у других длительных операций (см. ``_run_checked_stream``).
Автор: Сергей Антропов
Сайт: https://devops.org.ru
"""
from core import job_store as _js
def _log(line: str) -> None:
if job_id:
_js.append_log_sync(job_id, line)
if not shutil.which("kind"):
raise KindClusterError("Не найден kind в PATH.", exit_code=127)
if job_id and _js.is_cancelled_sync(job_id):
raise KindClusterError("Операция отменена", exit_code=130)
_log("Удаление записи кластера в kind (каталог clusters/<имя>/ на диске не удаляется).")
logger.info("kind delete с сохранением каталога данных для «%s»", name)
try:
_run_checked_stream(
["kind", "delete", "cluster", "--name", name],
on_line=_log if job_id else None,
job_id=job_id,
)
return True, "kind delete: OK"
except KindClusterError as e:
if job_id and _js.is_cancelled_sync(job_id):
raise KindClusterError("Операция отменена", exit_code=130) from e
# kind иногда возвращает ошибку, если кластер уже снят с учёта — проверяем фактическое состояние
remaining = list_registered_kind_clusters()
if name not in remaining:
_log("Запись кластера в kind отсутствует — далее создание по сохранённому kind-config.yaml.")
logger.info(
"После сбоя kind delete кластер «%s» не в списке зарегистрированных — продолжаем",
name,
)
return True, "Запись в kind уже отсутствовала"
raise
def _sort_kind_node_containers(names: list[str]) -> list[str]:
"""Сначала control-plane, затем остальные — удобнее для ``docker start``."""
def sort_key(n: str) -> tuple[int, str]:
if n.endswith("-control-plane"):
return (0, n)
return (1, n)
return sorted(names, key=sort_key)
def list_kind_cluster_container_names(*, cluster_name: str) -> list[str]:
"""Имена контейнеров узлов kind (все с префиксом ``<имя>-``)."""
cli = _container_cli_bin()
if not shutil.which(cli):
raise KindClusterError("Программа для контейнеров не найдена в PATH.", exit_code=127)
p = subprocess.run(
[cli, "ps", "-a", "--format", "{{.Names}}"],
capture_output=True,
text=True,
)
if p.returncode != 0:
err = (p.stderr or p.stdout or "").strip()
raise KindClusterError(f"Не удалось получить список контейнеров: {err}", exit_code=p.returncode)
prefix = f"{cluster_name}-"
raw = [n.strip() for n in (p.stdout or "").splitlines() if n.strip()]
matched = [n for n in raw if n.startswith(prefix)]
return _sort_kind_node_containers(matched)
def stop_kind_cluster_containers(*, name: str, job_id: str | None = None) -> tuple[bool, str]:
"""
Остановить контейнеры узлов (CLI контейнеров из ``CONTAINER_CLI``).
Запись kind о кластере сохраняется; позже можно вызвать ``start_kind_cluster_containers``.
При ``job_id`` — потоковый вывод в журнал задания и возможность прервать текущую команду отменой.
"""
from core import job_store as _js
names = list_kind_cluster_container_names(cluster_name=name)
if not names:
return True, "Узлы уже остановлены или отсутствуют"
def _cancelled() -> bool:
return bool(job_id and _js.is_cancelled_sync(job_id))
cli = _container_cli_bin()
n = len(names)
ok_all = True
failed_idx: list[int] = []
for i, ctr in enumerate(names, start=1):
if _cancelled():
raise KindClusterError("Операция отменена", exit_code=130)
if job_id:
pct = 5 + int(88 * (i - 1) / max(n, 1))
_js.set_progress_sync(job_id, f"Остановка узла {i} из {n}", min(pct, 93))
_js.append_log_sync(job_id, f"Узел {i} из {n}")
try:
def _on_line(line: str) -> None:
_js.append_log_sync(job_id, line)
_run_checked_stream([cli, "stop", ctr], on_line=_on_line, job_id=job_id)
except KindClusterError as e:
if _cancelled():
raise KindClusterError("Операция отменена", exit_code=130) from e
ok_all = False
failed_idx.append(i)
logger.warning("%s stop узел %s: %s", cli, i, e)
else:
p = subprocess.run([cli, "stop", ctr], capture_output=True, text=True)
if p.returncode != 0:
ok_all = False
err = (p.stderr or p.stdout or "").strip() or str(p.returncode)
failed_idx.append(i)
logger.warning("%s stop %s: %s", cli, ctr, err)
if job_id:
_js.set_progress_sync(job_id, "Готово", 98)
if ok_all:
return True, f"Остановлено узлов: {n}"
return False, "Не удалось остановить узлы: " + (
", ".join(str(x) for x in failed_idx) if failed_idx else "см. журнал"
)
def start_kind_cluster_containers(*, name: str, job_id: str | None = None) -> tuple[bool, str]:
"""
Запустить контейнеры узлов (после остановки или перезапуска движка контейнеров).
При ``job_id`` — журнал и прогресс в задании, прерывание текущей команды через отмену задания.
"""
from core import job_store as _js
names = list_kind_cluster_container_names(cluster_name=name)
if not names:
return False, (
"Сохранённые узлы не найдены. Если кластер ещё не создавался в этой среде — создайте его "
"или поднимите по сохранённой конфигурации из панели."
)
def _cancelled() -> bool:
return bool(job_id and _js.is_cancelled_sync(job_id))
cli = _container_cli_bin()
n = len(names)
ok_all = True
failed_idx: list[int] = []
for i, ctr in enumerate(names, start=1):
if _cancelled():
raise KindClusterError("Операция отменена", exit_code=130)
if job_id:
pct = 5 + int(88 * (i - 1) / max(n, 1))
_js.set_progress_sync(job_id, f"Запуск узла {i} из {n}", min(pct, 93))
_js.append_log_sync(job_id, f"Узел {i} из {n}")
try:
def _on_line(line: str) -> None:
_js.append_log_sync(job_id, line)
_run_checked_stream([cli, "start", ctr], on_line=_on_line, job_id=job_id)
except KindClusterError as e:
if _cancelled():
raise KindClusterError("Операция отменена", exit_code=130) from e
ok_all = False
failed_idx.append(i)
logger.warning("%s start узел %s: %s", cli, i, e)
else:
p = subprocess.run([cli, "start", ctr], capture_output=True, text=True)
if p.returncode != 0:
ok_all = False
err = (p.stderr or p.stdout or "").strip() or str(p.returncode)
failed_idx.append(i)
logger.warning("%s start %s: %s", cli, ctr, err)
if job_id:
_js.set_progress_sync(job_id, "Готово", 98)
if ok_all:
return True, f"Запущено узлов: {n}"
return False, "Не удалось запустить узлы: " + (
", ".join(str(x) for x in failed_idx) if failed_idx else "см. журнал"
)
def read_meta_json(cluster_name: str) -> dict[str, object] | None:
"""Прочитать ``clusters/<имя>/meta.json`` если есть."""
p = clusters_dir() / cluster_name / "meta.json"
if not p.is_file():
return None
try:
raw = json.loads(p.read_text(encoding="utf-8"))
if isinstance(raw, dict):
return raw
except (OSError, json.JSONDecodeError) as e:
logger.debug("meta.json не прочитан: %s", e)
return None
def _container_cli_bin() -> str:
"""Имя CLI к сокету: см. :func:`kind_k8s_paths.container_cli_name`."""
return container_cli_name()
def configured_container_cli() -> str:
"""Имя CLI, которое реально вызывает процесс (в контейнере веб-UI — всегда ``docker``).
Единый источник для ``docker stats``/``podman stats``, ``kind``, health и поля ``container_cli`` в API.
"""
return _container_cli_bin()
def container_engine_ping(*, timeout_sec: float = 12.0) -> tuple[bool, str, str]:
"""
Проверить доступ к движку контейнеров (``docker info`` / ``podman info``).
Возвращает (успех, краткое сообщение или stderr, имя CLI).
"""
cli = _container_cli_bin()
if not shutil.which(cli):
return False, f"«{cli}» не найден в PATH", cli
try:
p = subprocess.run(
[cli, "info"],
capture_output=True,
text=True,
timeout=timeout_sec,
)
except subprocess.TimeoutExpired:
logger.warning("%s info: таймаут %s с", cli, timeout_sec)
return False, f"таймаут {timeout_sec} с", cli
if p.returncode == 0:
return True, "OK", cli
err = (p.stderr or p.stdout or "").strip() or f"код {p.returncode}"
logger.info("%s info неуспешно: %s", cli, err[:200])
return False, err[:800], cli
def kubectl_nodes_wide(*, cluster_name: str, kubeconfig: str | Path) -> tuple[int, str]:
"""``kubectl get nodes -o wide``; возвращает (код, объединённый вывод)."""
src = Path(kubeconfig)
cfg = kubeconfig_path_for_container_kubectl(cluster_name=cluster_name, kube_source_path=src)
p = subprocess.run(
[
"kubectl",
"--kubeconfig",
str(cfg),
"get",
"nodes",
"-o",
"wide",
"--request-timeout=15s",
],
capture_output=True,
text=True,
)
out = (p.stdout or "").strip()
err = (p.stderr or "").strip()
msg = out if out else err
return p.returncode, msg
def kubectl_pods_all_namespaces(*, cluster_name: str, kubeconfig: str | Path) -> tuple[int, str]:
"""``kubectl get pods -A``; сводка подов по кластеру."""
src = Path(kubeconfig)
cfg = kubeconfig_path_for_container_kubectl(cluster_name=cluster_name, kube_source_path=src)
p = subprocess.run(
[
"kubectl",
"--kubeconfig",
str(cfg),
"get",
"pods",
"-A",
"--request-timeout=20s",
],
capture_output=True,
text=True,
)
out = (p.stdout or "").strip()
err = (p.stderr or "").strip()
msg = out if out else err
return p.returncode, msg
def kubectl_get_all_namespaces_wide(
*,
cluster_name: str,
kubeconfig: str | Path,
resource: str,
timeout: str = "30s",
) -> tuple[int, str]:
"""
``kubectl get <resource> -A -o wide`` — Deployments, StatefulSets, DaemonSets, Service, Ingress и т.д.
``resource`` — аргумент kubectl (``deployments``, ``pods``, ``svc`` …).
"""
src = Path(kubeconfig)
cfg = kubeconfig_path_for_container_kubectl(cluster_name=cluster_name, kube_source_path=src)
p = subprocess.run(
[
"kubectl",
"--kubeconfig",
str(cfg),
"get",
resource,
"-A",
"-o",
"wide",
f"--request-timeout={timeout}",
],
capture_output=True,
text=True,
)
out = (p.stdout or "").strip()
err = (p.stderr or "").strip()
msg = out if out else err
return p.returncode, msg
def _parse_kubectl_list_json(stdout: str) -> tuple[list[dict[str, Any]] | None, str]:
"""Разбор вывода ``kubectl get … -o json`` (Kind List)."""
if not (stdout or "").strip():
return [], ""
try:
data = json.loads(stdout)
except json.JSONDecodeError as e:
return None, f"JSON: {e}"
if not isinstance(data, dict):
return None, "корень ответа не объект"
raw_items = data.get("items")
if raw_items is None:
return [], ""
if not isinstance(raw_items, list):
return None, "поле items не список"
out: list[dict[str, Any]] = [x for x in raw_items if isinstance(x, dict)]
return out, ""
def kubectl_get_json(
*,
cluster_name: str,
kubeconfig: str | Path,
resource_args: list[str],
timeout: str = "30s",
) -> tuple[int, list[dict[str, Any]] | None, str]:
"""
``kubectl get <resource_args…> -o json``.
``resource_args`` — например ``["nodes"]`` или ``["pods", "-A"]``, ``["deployments", "-A"]``.
Возвращает (код, список items или None, сообщение об ошибке).
"""
src = Path(kubeconfig)
cfg = kubeconfig_path_for_container_kubectl(cluster_name=cluster_name, kube_source_path=src)
cmd = [
"kubectl",
"--kubeconfig",
str(cfg),
"get",
*resource_args,
"-o",
"json",
f"--request-timeout={timeout}",
]
p = subprocess.run(cmd, capture_output=True, text=True)
out = (p.stdout or "").strip()
err = (p.stderr or "").strip()
if p.returncode != 0:
return p.returncode, None, err or out or f"код {p.returncode}"
items, perr = _parse_kubectl_list_json(out)
if items is None:
return p.returncode, None, perr
return 0, items, ""
def kubectl_delete_pod(
*,
cluster_name: str,
kubeconfig: str | Path,
namespace: str,
pod_name: str,
grace_period: int = 30,
) -> tuple[int, str]:
"""
Удаление пода (контроллер при необходимости создаст новый) — «мягкий рестарт».
``grace_period`` — секунды до SIGKILL (0 — сразу; для системных подов осторожно).
"""
src = Path(kubeconfig)
cfg = kubeconfig_path_for_container_kubectl(cluster_name=cluster_name, kube_source_path=src)
cmd = [
"kubectl",
"--kubeconfig",
str(cfg),
"delete",
"pod",
pod_name,
"-n",
namespace,
f"--grace-period={grace_period}",
"--request-timeout=60s",
]
p = subprocess.run(cmd, capture_output=True, text=True)
msg = ((p.stdout or "").strip() or (p.stderr or "").strip() or f"код {p.returncode}")[:8000]
return p.returncode, msg
def cluster_summary_for_api(name: str) -> dict[str, object]:
"""Сводка по кластеру для JSON API (без блокирующих долгих вызовов)."""
from core.provision_log import provision_log_file_path
meta = read_meta_json(name) or {}
saved_kc = clusters_dir() / name / "kubeconfig"
in_kind = name in list_registered_kind_clusters()
out: dict[str, object] = {
"name": name,
"registered_in_kind": in_kind,
"has_local_kubeconfig": saved_kc.is_file(),
"kubeconfig_path": str(saved_kc) if saved_kc.is_file() else None,
"meta": meta,
"has_provision_log": provision_log_file_path(name).is_file(),
}
return out