- Выделена страница списка кластеров, панель упрощена; nav_active и крошки ведут в раздел Кластеры; theme.js синхронизирует активную пилюлю по URL. - Доработки дашборда, аддонов, журнала, стилей и API-документации. - Поддержка Podman: docker-compose.podman.yml, скрипты сокета; Makefile и env.
1140 lines
43 KiB
Python
1140 lines
43 KiB
Python
"""Синхронные операции с kind: конфиг, создание, удаление, ожидание готовности нод.
|
||
|
||
Используются интерактивными CLI-скриптами и веб-слоем (через asyncio.to_thread / executor).
|
||
|
||
Автор: Сергей Антропов
|
||
Сайт: https://devops.org.ru
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import codecs
|
||
import errno
|
||
import json
|
||
import logging
|
||
import os
|
||
import re
|
||
import shutil
|
||
import subprocess
|
||
from collections.abc import Callable
|
||
from typing import Any
|
||
from dataclasses import dataclass
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
|
||
from kind_k8s_paths import clusters_dir, container_cli_name, data_root
|
||
from kindest_node_tags import normalize_tag_v_prefix
|
||
from kubeconfig_patch import (
|
||
kubeconfig_host_file,
|
||
kubeconfig_path_for_container_kubectl,
|
||
patch_kubeconfig_server_for_host,
|
||
should_patch_after_create,
|
||
)
|
||
|
||
logger = logging.getLogger("kind_k8s.cluster_lifecycle")
|
||
|
||
# Удаление ANSI/OSC из потока (docker TTY: курсор [1A, очистка [2K и т.д.) — в журнале остаётся читаемый текст.
|
||
_ANSI_CSI_RE = re.compile(r"\x1b\[[\d;?]*[ -/]*[@-~]")
|
||
_ANSI_OSC_RE = re.compile(r"\x1b\][^\x07]*(?:\x07|\x1b\\)")
|
||
|
||
|
||
def _strip_ansi_stream_text(s: str) -> str:
|
||
"""Убрать CSI/OSC-последовательности терминала из строки лога (docker TTY: [1A, [2K, …)."""
|
||
if not s:
|
||
return ""
|
||
t = _ANSI_CSI_RE.sub("", s)
|
||
return _ANSI_OSC_RE.sub("", t)
|
||
|
||
|
||
def _rollback_after_cancel(*, cluster_name: str, out_dir: Path) -> None:
|
||
"""Удалить кластер kind и каталог данных после запроса отмены (best-effort)."""
|
||
logger.info("Откат после отмены: kind delete «%s»", cluster_name)
|
||
subprocess.run(
|
||
["kind", "delete", "cluster", "--name", cluster_name],
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
if out_dir.is_dir():
|
||
try:
|
||
shutil.rmtree(out_dir)
|
||
logger.info("Удалён каталог %s", out_dir)
|
||
except OSError as e:
|
||
logger.warning("Не удалось удалить %s: %s", out_dir, e)
|
||
|
||
# Имя кластера: поддомен DNS (RFC 1123)
|
||
_NAME_RE = re.compile(r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$")
|
||
|
||
|
||
class KindClusterError(Exception):
|
||
"""Ошибка операции kind (создание, удаление и т.д.)."""
|
||
|
||
def __init__(self, message: str, *, exit_code: int = 1) -> None:
|
||
super().__init__(message)
|
||
self.exit_code = exit_code
|
||
|
||
|
||
def validate_cluster_name(name: str) -> bool:
|
||
"""Проверить имя кластера (DNS-подмножество, длина ≤ 63)."""
|
||
if not name or len(name) > 63:
|
||
return False
|
||
return bool(_NAME_RE.match(name))
|
||
|
||
|
||
def normalize_k8s_version(raw: str) -> str:
|
||
"""Превратить ввод в тег образа kindest/node (1.29.4 → v1.29.4) или ``latest``."""
|
||
s = raw.strip().lower()
|
||
if not s:
|
||
return "v1.29.4"
|
||
if s in ("latest", "vlatest"):
|
||
return "latest"
|
||
s = s.removeprefix("v")
|
||
return f"v{s}"
|
||
|
||
|
||
def build_kind_config_yaml(*, node_image: str, workers: int) -> str:
|
||
"""YAML для kind: один control-plane + workers."""
|
||
lines = [
|
||
"kind: Cluster",
|
||
"apiVersion: kind.x-k8s.io/v1alpha4",
|
||
"nodes:",
|
||
" - role: control-plane",
|
||
f" image: {node_image}",
|
||
]
|
||
for _ in range(workers):
|
||
lines.append(" - role: worker")
|
||
lines.append(f" image: {node_image}")
|
||
return "\n".join(lines) + "\n"
|
||
|
||
|
||
def list_registered_kind_clusters() -> list[str]:
|
||
"""Имена кластеров kind; при ошибке — пустой список."""
|
||
p = subprocess.run(["kind", "get", "clusters"], capture_output=True, text=True)
|
||
if p.returncode != 0:
|
||
logger.info("kind get clusters завершился с кодом %s", p.returncode)
|
||
return []
|
||
lines = [x.strip() for x in (p.stdout or "").splitlines() if x.strip()]
|
||
return [x for x in lines if "no kind" not in x.lower()]
|
||
|
||
|
||
def _in_container() -> bool:
|
||
return os.environ.get("KIND_K8S_IN_CONTAINER", "").strip() == "1"
|
||
|
||
|
||
def _run_checked(cmd: list[str], *, cwd: Path | None = None) -> None:
|
||
"""Выполнить команду; при ошибке — KindClusterError с текстом stderr."""
|
||
logger.info("Выполнение: %s", " ".join(cmd))
|
||
p = subprocess.run(cmd, cwd=cwd, capture_output=True, text=True)
|
||
if p.returncode != 0:
|
||
err = (p.stderr or p.stdout or "").strip()
|
||
raise KindClusterError(f"Команда завершилась с кодом {p.returncode}: {err}", exit_code=p.returncode)
|
||
|
||
|
||
def _stream_pty_enabled() -> bool:
|
||
"""
|
||
Псевдо-TTY: docker/kind при выводе в pipe часто полностью буферизуют stdout — в журнале «тишина»
|
||
до конца команды. С TTY строки прогресса (в т.ч. pull) уходят порциями.
|
||
|
||
Отключение: ``KIND_K8S_STREAM_PTY=0``.
|
||
"""
|
||
if os.name != "posix":
|
||
return False
|
||
v = (os.environ.get("KIND_K8S_STREAM_PTY") or "1").strip().lower()
|
||
return v not in ("0", "false", "no", "off", "нет")
|
||
|
||
|
||
def _emit_stream_lines(
|
||
buffer: str,
|
||
on_line: Callable[[str], None] | None,
|
||
*,
|
||
flush_tail: bool,
|
||
) -> str:
|
||
"""
|
||
Разобрать буфер по ``\\n`` и ``\\r`` (прогресс docker/containerd часто идёт с ``\\r`` без ``\\n``).
|
||
|
||
При ``flush_tail=False`` возвращает неполный хвост после последнего ``\\n``.
|
||
"""
|
||
if not buffer:
|
||
return ""
|
||
normalized = buffer.replace("\r\n", "\n").replace("\r", "\n")
|
||
parts = normalized.split("\n")
|
||
if flush_tail:
|
||
for part in parts:
|
||
s = _strip_ansi_stream_text(part).strip()
|
||
if s and on_line:
|
||
on_line(s)
|
||
if s:
|
||
logger.debug("stream: %s", s[:800])
|
||
return ""
|
||
if len(parts) == 1:
|
||
return parts[0]
|
||
*body, tail = parts
|
||
for part in body:
|
||
s = _strip_ansi_stream_text(part).strip()
|
||
if s and on_line:
|
||
on_line(s)
|
||
if s:
|
||
logger.debug("stream: %s", s[:800])
|
||
return tail
|
||
|
||
|
||
def _run_checked_stream(
|
||
cmd: list[str],
|
||
*,
|
||
cwd: Path | None = None,
|
||
on_line: Callable[[str], None] | None = None,
|
||
job_id: str | None = None,
|
||
use_pty: bool | None = None,
|
||
) -> None:
|
||
"""
|
||
Выполнить команду с потоковым выводом в колбэк (stdout+stderr объединены).
|
||
|
||
``use_pty=None`` — как в ``KIND_K8S_STREAM_PTY``. ``use_pty=False`` — только pipe (удобно вместе
|
||
с ``docker pull --progress plain``). ``use_pty=True`` — принудительно PTY на POSIX.
|
||
|
||
``job_id`` — регистрация ``Popen`` для принудительной отмены (``SIGKILL`` группы процессов).
|
||
"""
|
||
from core import job_store as _js
|
||
|
||
logger.info("Выполнение (поток): %s", " ".join(cmd))
|
||
|
||
master_fd: int | None = None
|
||
if use_pty is None:
|
||
use_pty = _stream_pty_enabled()
|
||
slave_fd: int | None = None
|
||
|
||
if use_pty:
|
||
try:
|
||
import pty
|
||
|
||
master_fd, slave_fd = pty.openpty()
|
||
except OSError as e:
|
||
logger.warning("PTY недоступен (%s), поток через pipe — вывод может обновляться с задержкой", e)
|
||
use_pty = False
|
||
master_fd = None
|
||
slave_fd = None
|
||
|
||
if use_pty and slave_fd is not None and master_fd is not None:
|
||
try:
|
||
p = subprocess.Popen(
|
||
cmd,
|
||
cwd=cwd,
|
||
stdin=slave_fd,
|
||
stdout=slave_fd,
|
||
stderr=subprocess.STDOUT,
|
||
close_fds=False,
|
||
start_new_session=True,
|
||
)
|
||
except Exception:
|
||
try:
|
||
os.close(slave_fd)
|
||
except OSError:
|
||
pass
|
||
try:
|
||
os.close(master_fd)
|
||
except OSError:
|
||
pass
|
||
raise
|
||
try:
|
||
os.close(slave_fd)
|
||
except OSError:
|
||
pass
|
||
slave_fd = None
|
||
else:
|
||
p = subprocess.Popen(
|
||
cmd,
|
||
cwd=cwd,
|
||
stdout=subprocess.PIPE,
|
||
stderr=subprocess.STDOUT,
|
||
bufsize=0,
|
||
start_new_session=True,
|
||
)
|
||
master_fd = None
|
||
if p.stdout is None:
|
||
raise KindClusterError("Не удалось открыть stdout процесса", exit_code=1)
|
||
|
||
if job_id:
|
||
_js.register_subprocess_sync(job_id, p)
|
||
decoder = codecs.getincrementaldecoder("utf-8")(errors="replace")
|
||
carry = ""
|
||
rc = -1
|
||
try:
|
||
while True:
|
||
try:
|
||
if master_fd is not None:
|
||
raw = os.read(master_fd, 4096)
|
||
else:
|
||
raw = p.stdout.read(4096) if p.stdout else b""
|
||
except OSError as e:
|
||
# EINTR — повторить read; EIO — slave PTY закрыт (часто после SIGKILL при отмене).
|
||
_eio = getattr(errno, "EIO", 5)
|
||
if e.errno == errno.EINTR:
|
||
continue
|
||
if e.errno == _eio:
|
||
logger.debug("Чтение потока: EIO (вероятно процесс завершён), выходим из цикла")
|
||
break
|
||
raise
|
||
if not raw:
|
||
break
|
||
carry += decoder.decode(raw)
|
||
carry = _emit_stream_lines(carry, on_line, flush_tail=False)
|
||
carry += decoder.decode(b"", final=True)
|
||
carry = _emit_stream_lines(carry, on_line, flush_tail=True)
|
||
rc = p.wait()
|
||
finally:
|
||
if job_id:
|
||
_js.unregister_subprocess_sync(job_id)
|
||
if master_fd is not None:
|
||
try:
|
||
os.close(master_fd)
|
||
except OSError:
|
||
pass
|
||
else:
|
||
try:
|
||
if p.stdout:
|
||
p.stdout.close()
|
||
except OSError:
|
||
pass
|
||
|
||
if job_id and _js.is_cancelled_sync(job_id):
|
||
raise KindClusterError("Операция отменена", exit_code=130)
|
||
if rc != 0:
|
||
raise KindClusterError(f"Команда завершилась с кодом {rc} (см. журнал выше)", exit_code=rc)
|
||
|
||
|
||
def _docker_pull_plain_progress_enabled() -> bool:
|
||
"""
|
||
Разрешить попытку ``docker pull --progress=plain`` (если CLI это поддерживает).
|
||
|
||
Полностью отключить: ``KIND_K8S_DOCKER_PULL_PLAIN=0``.
|
||
"""
|
||
v = (os.environ.get("KIND_K8S_DOCKER_PULL_PLAIN") or "1").strip().lower()
|
||
return v not in ("0", "false", "no", "off", "нет")
|
||
|
||
|
||
# Кэш: есть ли в выводе ``docker pull --help`` опция ``--progress`` (в старых CLI её нет).
|
||
_docker_pull_help_has_progress: bool | None = None
|
||
|
||
|
||
def _docker_pull_cli_supports_progress_flag() -> bool:
|
||
"""
|
||
Узнать по справке, объявляет ли данный ``docker`` флаг ``--progress`` для ``pull``.
|
||
|
||
Без этого на старых клиентах в журнал попадало «unknown flag: --progress», а код выхода часто **1**,
|
||
а не 125 — повтор без флага не срабатывал.
|
||
"""
|
||
global _docker_pull_help_has_progress
|
||
if _docker_pull_help_has_progress is not None:
|
||
return _docker_pull_help_has_progress
|
||
exe = shutil.which("docker")
|
||
if not exe:
|
||
_docker_pull_help_has_progress = False
|
||
return False
|
||
try:
|
||
p = subprocess.run(
|
||
[exe, "pull", "--help"],
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=12,
|
||
)
|
||
blob = (p.stdout or "") + (p.stderr or "")
|
||
_docker_pull_help_has_progress = "--progress" in blob
|
||
if not _docker_pull_help_has_progress:
|
||
logger.info("docker pull: в справке нет --progress — используем обычный pull (PTY по KIND_K8S_STREAM_PTY)")
|
||
except (OSError, subprocess.TimeoutExpired) as e:
|
||
logger.warning("Не удалось выполнить docker pull --help (%s), без флага --progress", e)
|
||
_docker_pull_help_has_progress = False
|
||
return _docker_pull_help_has_progress
|
||
|
||
|
||
def _pull_node_image_streaming(*, image: str, on_line: Callable[[str], None], job_id: str | None) -> None:
|
||
"""
|
||
Скачать образ узлов через ``docker pull`` / ``podman pull`` с выводом в журнал задания.
|
||
|
||
Для **docker** по умолчанию: ``--progress plain`` и **без PTY** — идут строки Downloading/Extracting
|
||
с процентами. Для **podman** — обычный pull и PTY по ``KIND_K8S_STREAM_PTY``.
|
||
"""
|
||
cli = _container_cli_bin()
|
||
if not shutil.which(cli):
|
||
raise KindClusterError(
|
||
"Скачивание образа недоступно: не найдена программа для работы с контейнерами (docker/podman).",
|
||
exit_code=127,
|
||
)
|
||
use_docker_plain = (
|
||
cli == "docker"
|
||
and _docker_pull_plain_progress_enabled()
|
||
and _docker_pull_cli_supports_progress_flag()
|
||
)
|
||
if use_docker_plain:
|
||
try:
|
||
_run_checked_stream(
|
||
[cli, "pull", "--progress=plain", image],
|
||
on_line=on_line,
|
||
job_id=job_id,
|
||
use_pty=False,
|
||
)
|
||
return
|
||
except KindClusterError as e:
|
||
if e.exit_code == 130:
|
||
raise
|
||
# На всякий случай, если справка устарела или другой код ошибки.
|
||
logger.warning("Повтор docker pull без --progress=plain (код %s): %s", e.exit_code, e)
|
||
_run_checked_stream([cli, "pull", image], on_line=on_line, job_id=job_id, use_pty=None)
|
||
return
|
||
_run_checked_stream([cli, "pull", image], on_line=on_line, job_id=job_id, use_pty=None)
|
||
|
||
|
||
def _run_capture_checked(cmd: list[str]) -> str:
|
||
p = subprocess.run(cmd, capture_output=True, text=True)
|
||
if p.returncode != 0:
|
||
err = (p.stderr or p.stdout or "").strip()
|
||
raise KindClusterError(err or "команда не удалась", exit_code=p.returncode)
|
||
return (p.stdout or "").strip()
|
||
|
||
|
||
def _wait_nodes_enabled() -> bool:
|
||
raw = (os.environ.get("KIND_K8S_WAIT_NODES") or "1").strip().lower()
|
||
return raw in ("1", "true", "yes", "да")
|
||
|
||
|
||
def _wait_nodes_timeout_sec() -> int:
|
||
raw = (os.environ.get("KIND_K8S_WAIT_NODES_TIMEOUT_SEC") or "300").strip()
|
||
try:
|
||
return max(30, min(int(raw), 3600))
|
||
except ValueError:
|
||
return 300
|
||
|
||
|
||
def wait_nodes_ready(
|
||
*,
|
||
cluster_name: str,
|
||
kubeconfig_path: Path,
|
||
timeout_sec: int | None = None,
|
||
) -> tuple[bool, str]:
|
||
"""
|
||
Дождаться condition=Ready для всех нод через kubectl wait.
|
||
|
||
В контейнере веб-приложения путь к kubeconfig проходит через
|
||
``kubeconfig_path_for_container_kubectl`` (шлюз хоста + проброшенный порт или имя узла kind).
|
||
|
||
Возвращает (успех, сообщение для лога/UI).
|
||
"""
|
||
if timeout_sec is None:
|
||
timeout_sec = _wait_nodes_timeout_sec()
|
||
cfg = kubeconfig_path_for_container_kubectl(
|
||
cluster_name=cluster_name,
|
||
kube_source_path=kubeconfig_path,
|
||
)
|
||
t = f"{timeout_sec}s"
|
||
cmd = [
|
||
"kubectl",
|
||
"--kubeconfig",
|
||
str(cfg),
|
||
"wait",
|
||
"--for=condition=Ready",
|
||
"nodes",
|
||
"--all",
|
||
f"--timeout={t}",
|
||
]
|
||
logger.info("Ожидание готовности нод: timeout=%s", t)
|
||
p = subprocess.run(cmd, capture_output=True, text=True)
|
||
out = (p.stdout or "").strip()
|
||
err = (p.stderr or "").strip()
|
||
if p.returncode == 0:
|
||
return True, out or "ноды в состоянии Ready"
|
||
msg = err or out or f"код выхода {p.returncode}"
|
||
return False, msg
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class CreateClusterResult:
|
||
"""Результат успешного создания кластера."""
|
||
|
||
cluster_name: str
|
||
ver_tag: str
|
||
node_image: str
|
||
workers: int
|
||
kubeconfig_path: Path
|
||
meta_path: Path
|
||
kubeconfig_patched_for_host: bool
|
||
nodes_ready: bool | None
|
||
nodes_ready_message: str | None
|
||
|
||
|
||
def create_cluster_non_interactive(
|
||
*,
|
||
name: str,
|
||
kubernetes_version_tag: str,
|
||
workers: int,
|
||
job_id: str | None = None,
|
||
use_existing_config: bool = False,
|
||
) -> CreateClusterResult:
|
||
"""
|
||
Создать кластер kind без диалогов.
|
||
|
||
``kubernetes_version_tag`` — тег kindest/node: ``latest``, ``v1.29.4`` и т.д. (см. ``normalize_tag_v_prefix``).
|
||
|
||
``job_id`` — если задан, обновляется прогресс и проверяется отмена (см. ``job_store``).
|
||
|
||
``use_existing_config=True`` — не перезаписывать ``kind-config.yaml``, поднять кластер по уже
|
||
сохранённому файлу (каталог ``clusters/<имя>/`` должен существовать).
|
||
"""
|
||
from core import job_store as _job_store
|
||
|
||
def _progress(stage: str, pct: int) -> None:
|
||
if job_id:
|
||
_job_store.set_progress_sync(job_id, stage, pct)
|
||
|
||
def _log(line: str) -> None:
|
||
if job_id:
|
||
_job_store.append_log_sync(job_id, line)
|
||
|
||
def _cancelled() -> bool:
|
||
return bool(job_id and _job_store.is_cancelled_sync(job_id))
|
||
|
||
if not shutil.which("kind"):
|
||
raise KindClusterError("Не найден бинарник kind в PATH.", exit_code=127)
|
||
|
||
if not validate_cluster_name(name):
|
||
raise KindClusterError("Некорректное имя кластера (a-z0-9-, не длиннее 63).")
|
||
|
||
existing = list_registered_kind_clusters()
|
||
if name in existing:
|
||
raise KindClusterError(f"Кластер «{name}» уже существует в kind.")
|
||
|
||
if not use_existing_config and (workers < 0 or workers > 20):
|
||
raise KindClusterError("Количество worker-нод должно быть от 0 до 20.")
|
||
|
||
# Каталог данных кластера до долгих шагов — чтобы при сбое журнал в journal/jobs_history.json
|
||
# и provision_log могли быть привязаны к существующему clusters/<имя>/.
|
||
root = data_root()
|
||
cdir = clusters_dir()
|
||
out_dir = cdir / name
|
||
out_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
ver_tag = normalize_tag_v_prefix(kubernetes_version_tag)
|
||
node_image = f"kindest/node:{ver_tag}"
|
||
cfg_path = out_dir / "kind-config.yaml"
|
||
kube_path = out_dir / "kubeconfig"
|
||
meta_path = out_dir / "meta.json"
|
||
|
||
prev_meta_for_workers: dict[str, object] = {}
|
||
if use_existing_config:
|
||
if not cfg_path.is_file():
|
||
raise KindClusterError(f"Нет сохранённого kind-config.yaml: {cfg_path}")
|
||
prev = read_meta_json(name) or {}
|
||
prev_meta_for_workers = prev
|
||
if prev.get("node_image"):
|
||
node_image = str(prev["node_image"])
|
||
if prev.get("kubernetes_version_tag"):
|
||
ver_tag = str(prev["kubernetes_version_tag"])
|
||
_progress("Чтение сохранённого конфига", 10)
|
||
_log("Используется сохранённый файл конфигурации кластера.")
|
||
else:
|
||
yaml_text = build_kind_config_yaml(node_image=node_image, workers=workers)
|
||
cfg_path.write_text(yaml_text, encoding="utf-8")
|
||
_progress("Подготовка конфигурации", 12)
|
||
_log("Файл конфигурации кластера сохранён в каталог данных.")
|
||
|
||
if _cancelled():
|
||
_rollback_after_cancel(cluster_name=name, out_dir=out_dir)
|
||
raise KindClusterError("Операция отменена")
|
||
|
||
logger.info(
|
||
"Создание кластера «%s», образ %s, workers=%s, existing_cfg=%s",
|
||
name,
|
||
node_image,
|
||
workers,
|
||
use_existing_config,
|
||
)
|
||
_progress("Скачивание образа при необходимости", 18)
|
||
if _cancelled():
|
||
_rollback_after_cancel(cluster_name=name, out_dir=out_dir)
|
||
raise KindClusterError("Операция отменена")
|
||
try:
|
||
_pull_node_image_streaming(image=node_image, on_line=_log, job_id=job_id)
|
||
except KindClusterError as e:
|
||
if _cancelled():
|
||
_rollback_after_cancel(cluster_name=name, out_dir=out_dir)
|
||
raise KindClusterError("Операция отменена") from e
|
||
raise
|
||
|
||
if _cancelled():
|
||
_rollback_after_cancel(cluster_name=name, out_dir=out_dir)
|
||
raise KindClusterError("Операция отменена")
|
||
|
||
_progress("Создание узлов кластера", 32)
|
||
_run_checked_stream(
|
||
["kind", "create", "cluster", "--name", name, "--config", str(cfg_path)],
|
||
on_line=_log,
|
||
job_id=job_id,
|
||
)
|
||
|
||
if _cancelled():
|
||
_rollback_after_cancel(cluster_name=name, out_dir=out_dir)
|
||
raise KindClusterError("Операция отменена")
|
||
|
||
_progress("Сохранение доступа к кластеру", 58)
|
||
kube = _run_capture_checked(["kind", "get", "kubeconfig", "--name", name])
|
||
kube_path.write_text(kube, encoding="utf-8")
|
||
|
||
if _cancelled():
|
||
_rollback_after_cancel(cluster_name=name, out_dir=out_dir)
|
||
raise KindClusterError("Операция отменена")
|
||
|
||
host_kube_path = kubeconfig_host_file(kube_path.parent)
|
||
patched = False
|
||
if should_patch_after_create():
|
||
_progress("Настройка доступа с вашего компьютера", 72)
|
||
shutil.copyfile(kube_path, host_kube_path, follow_symlinks=True)
|
||
patched = patch_kubeconfig_server_for_host(cluster_name=name, kube_path=host_kube_path)
|
||
if not patched:
|
||
host_kube_path.unlink(missing_ok=True)
|
||
|
||
if _cancelled():
|
||
_rollback_after_cancel(cluster_name=name, out_dir=out_dir)
|
||
raise KindClusterError("Операция отменена")
|
||
|
||
nodes_ready: bool | None = None
|
||
nodes_msg: str | None = None
|
||
if _wait_nodes_enabled():
|
||
_progress("Ожидание готовности узлов", 82)
|
||
ok, msg = wait_nodes_ready(cluster_name=name, kubeconfig_path=kube_path)
|
||
nodes_ready = ok
|
||
nodes_msg = msg
|
||
if ok:
|
||
logger.info("Ноды готовы: %s", msg)
|
||
else:
|
||
logger.warning("Ожидание нод не завершилось успешно: %s", msg)
|
||
_log(("Узлы готовы" if ok else "Ожидание узлов: ") + (msg or "")[:4000])
|
||
|
||
worker_nodes_meta = workers
|
||
if use_existing_config:
|
||
prev_w = prev_meta_for_workers.get("worker_nodes")
|
||
if prev_w is not None:
|
||
try:
|
||
worker_nodes_meta = int(prev_w)
|
||
except (TypeError, ValueError):
|
||
worker_nodes_meta = workers
|
||
|
||
meta = {
|
||
"cluster_name": name,
|
||
"kubernetes_version_tag": ver_tag,
|
||
"node_image": node_image,
|
||
"worker_nodes": worker_nodes_meta,
|
||
"created_at_utc": datetime.now(timezone.utc).isoformat(),
|
||
"kind_config_path": str(cfg_path.relative_to(root)),
|
||
"kubeconfig_path": str(kube_path.relative_to(root)),
|
||
"kubeconfig_patched_for_host": patched,
|
||
"created_via_container": _in_container(),
|
||
"nodes_ready_after_create": nodes_ready,
|
||
"nodes_ready_message": nodes_msg,
|
||
"provisioned_from_existing_config": use_existing_config,
|
||
}
|
||
if patched:
|
||
meta["kubeconfig_host_path"] = str(host_kube_path.relative_to(root))
|
||
# Сохраняем описание из прежнего meta.json при подъёме по существующему конфигу (в т.ч. reapply).
|
||
if use_existing_config:
|
||
desc = prev_meta_for_workers.get("description")
|
||
if isinstance(desc, str) and desc.strip():
|
||
meta["description"] = desc.strip()[:2000]
|
||
meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")
|
||
|
||
_progress("Завершение", 95)
|
||
|
||
return CreateClusterResult(
|
||
cluster_name=name,
|
||
ver_tag=ver_tag,
|
||
node_image=node_image,
|
||
workers=worker_nodes_meta,
|
||
kubeconfig_path=kube_path,
|
||
meta_path=meta_path,
|
||
kubeconfig_patched_for_host=patched,
|
||
nodes_ready=nodes_ready,
|
||
nodes_ready_message=nodes_msg,
|
||
)
|
||
|
||
|
||
def delete_kind_cluster_and_data(*, name: str, log_to_stdout: bool = False) -> tuple[bool, str]:
|
||
"""
|
||
``kind delete cluster`` и удаление ``clusters/<имя>/``.
|
||
|
||
Первый элемент — успешность ``kind delete``; второй — текстовое резюме всего шага.
|
||
|
||
``log_to_stdout=True`` — не перехватывать stdout/stderr kind (удобно в интерактивном CLI).
|
||
"""
|
||
if not shutil.which("kind"):
|
||
raise KindClusterError("Не найден kind в PATH.", exit_code=127)
|
||
|
||
cdir = clusters_dir()
|
||
parts: list[str] = []
|
||
kind_ok = True
|
||
|
||
if log_to_stdout:
|
||
p = subprocess.run(["kind", "delete", "cluster", "--name", name])
|
||
if p.returncode != 0:
|
||
parts.append(f"kind delete: код {p.returncode}")
|
||
logger.warning("kind delete cluster %s: код %s", name, p.returncode)
|
||
kind_ok = False
|
||
else:
|
||
parts.append("kind delete: OK")
|
||
else:
|
||
p = subprocess.run(
|
||
["kind", "delete", "cluster", "--name", name],
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
if p.returncode != 0:
|
||
err = (p.stderr or p.stdout or "").strip()
|
||
parts.append(f"kind delete: ошибка ({err or p.returncode})")
|
||
logger.warning("kind delete cluster %s: %s", name, err)
|
||
kind_ok = False
|
||
else:
|
||
parts.append("kind delete: OK")
|
||
|
||
d = cdir / name
|
||
if d.is_dir():
|
||
shutil.rmtree(d)
|
||
parts.append(f"удалена папка {d}")
|
||
else:
|
||
parts.append("локальная папка отсутствовала")
|
||
|
||
return kind_ok, "; ".join(parts)
|
||
|
||
|
||
def delete_kind_cluster_keep_data(*, name: str, job_id: str | None = None) -> tuple[bool, str]:
|
||
"""
|
||
Выполнить ``kind delete cluster`` без удаления каталога ``clusters/<имя>/``.
|
||
|
||
Используется после изменения ``kind-config.yaml``: затем ``kind create`` поднимает узлы
|
||
по новому файлу. Если запись в kind уже отсутствует — считаем шаг успешным и идём к create.
|
||
|
||
При ``job_id`` вывод и отмена — как у других длительных операций (см. ``_run_checked_stream``).
|
||
|
||
Автор: Сергей Антропов
|
||
Сайт: https://devops.org.ru
|
||
"""
|
||
from core import job_store as _js
|
||
|
||
def _log(line: str) -> None:
|
||
if job_id:
|
||
_js.append_log_sync(job_id, line)
|
||
|
||
if not shutil.which("kind"):
|
||
raise KindClusterError("Не найден kind в PATH.", exit_code=127)
|
||
|
||
if job_id and _js.is_cancelled_sync(job_id):
|
||
raise KindClusterError("Операция отменена", exit_code=130)
|
||
|
||
_log("Удаление записи кластера в kind (каталог clusters/<имя>/ на диске не удаляется).")
|
||
logger.info("kind delete с сохранением каталога данных для «%s»", name)
|
||
|
||
try:
|
||
_run_checked_stream(
|
||
["kind", "delete", "cluster", "--name", name],
|
||
on_line=_log if job_id else None,
|
||
job_id=job_id,
|
||
)
|
||
return True, "kind delete: OK"
|
||
except KindClusterError as e:
|
||
if job_id and _js.is_cancelled_sync(job_id):
|
||
raise KindClusterError("Операция отменена", exit_code=130) from e
|
||
# kind иногда возвращает ошибку, если кластер уже снят с учёта — проверяем фактическое состояние
|
||
remaining = list_registered_kind_clusters()
|
||
if name not in remaining:
|
||
_log("Запись кластера в kind отсутствует — далее создание по сохранённому kind-config.yaml.")
|
||
logger.info(
|
||
"После сбоя kind delete кластер «%s» не в списке зарегистрированных — продолжаем",
|
||
name,
|
||
)
|
||
return True, "Запись в kind уже отсутствовала"
|
||
raise
|
||
|
||
|
||
def _sort_kind_node_containers(names: list[str]) -> list[str]:
|
||
"""Сначала control-plane, затем остальные — удобнее для ``docker start``."""
|
||
|
||
def sort_key(n: str) -> tuple[int, str]:
|
||
if n.endswith("-control-plane"):
|
||
return (0, n)
|
||
return (1, n)
|
||
|
||
return sorted(names, key=sort_key)
|
||
|
||
|
||
def list_kind_cluster_container_names(*, cluster_name: str) -> list[str]:
|
||
"""Имена контейнеров узлов kind (все с префиксом ``<имя>-``)."""
|
||
cli = _container_cli_bin()
|
||
if not shutil.which(cli):
|
||
raise KindClusterError("Программа для контейнеров не найдена в PATH.", exit_code=127)
|
||
p = subprocess.run(
|
||
[cli, "ps", "-a", "--format", "{{.Names}}"],
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
if p.returncode != 0:
|
||
err = (p.stderr or p.stdout or "").strip()
|
||
raise KindClusterError(f"Не удалось получить список контейнеров: {err}", exit_code=p.returncode)
|
||
prefix = f"{cluster_name}-"
|
||
raw = [n.strip() for n in (p.stdout or "").splitlines() if n.strip()]
|
||
matched = [n for n in raw if n.startswith(prefix)]
|
||
return _sort_kind_node_containers(matched)
|
||
|
||
|
||
def stop_kind_cluster_containers(*, name: str, job_id: str | None = None) -> tuple[bool, str]:
|
||
"""
|
||
Остановить контейнеры узлов (CLI контейнеров из ``CONTAINER_CLI``).
|
||
|
||
Запись kind о кластере сохраняется; позже можно вызвать ``start_kind_cluster_containers``.
|
||
|
||
При ``job_id`` — потоковый вывод в журнал задания и возможность прервать текущую команду отменой.
|
||
"""
|
||
from core import job_store as _js
|
||
|
||
names = list_kind_cluster_container_names(cluster_name=name)
|
||
if not names:
|
||
return True, "Узлы уже остановлены или отсутствуют"
|
||
|
||
def _cancelled() -> bool:
|
||
return bool(job_id and _js.is_cancelled_sync(job_id))
|
||
|
||
cli = _container_cli_bin()
|
||
n = len(names)
|
||
ok_all = True
|
||
failed_idx: list[int] = []
|
||
|
||
for i, ctr in enumerate(names, start=1):
|
||
if _cancelled():
|
||
raise KindClusterError("Операция отменена", exit_code=130)
|
||
if job_id:
|
||
pct = 5 + int(88 * (i - 1) / max(n, 1))
|
||
_js.set_progress_sync(job_id, f"Остановка узла {i} из {n}", min(pct, 93))
|
||
_js.append_log_sync(job_id, f"Узел {i} из {n}")
|
||
try:
|
||
|
||
def _on_line(line: str) -> None:
|
||
_js.append_log_sync(job_id, line)
|
||
|
||
_run_checked_stream([cli, "stop", ctr], on_line=_on_line, job_id=job_id)
|
||
except KindClusterError as e:
|
||
if _cancelled():
|
||
raise KindClusterError("Операция отменена", exit_code=130) from e
|
||
ok_all = False
|
||
failed_idx.append(i)
|
||
logger.warning("%s stop узел %s: %s", cli, i, e)
|
||
else:
|
||
p = subprocess.run([cli, "stop", ctr], capture_output=True, text=True)
|
||
if p.returncode != 0:
|
||
ok_all = False
|
||
err = (p.stderr or p.stdout or "").strip() or str(p.returncode)
|
||
failed_idx.append(i)
|
||
logger.warning("%s stop %s: %s", cli, ctr, err)
|
||
|
||
if job_id:
|
||
_js.set_progress_sync(job_id, "Готово", 98)
|
||
|
||
if ok_all:
|
||
return True, f"Остановлено узлов: {n}"
|
||
return False, "Не удалось остановить узлы: " + (
|
||
", ".join(str(x) for x in failed_idx) if failed_idx else "см. журнал"
|
||
)
|
||
|
||
|
||
def start_kind_cluster_containers(*, name: str, job_id: str | None = None) -> tuple[bool, str]:
|
||
"""
|
||
Запустить контейнеры узлов (после остановки или перезапуска движка контейнеров).
|
||
|
||
При ``job_id`` — журнал и прогресс в задании, прерывание текущей команды через отмену задания.
|
||
"""
|
||
from core import job_store as _js
|
||
|
||
names = list_kind_cluster_container_names(cluster_name=name)
|
||
if not names:
|
||
return False, (
|
||
"Сохранённые узлы не найдены. Если кластер ещё не создавался в этой среде — создайте его "
|
||
"или поднимите по сохранённой конфигурации из панели."
|
||
)
|
||
|
||
def _cancelled() -> bool:
|
||
return bool(job_id and _js.is_cancelled_sync(job_id))
|
||
|
||
cli = _container_cli_bin()
|
||
n = len(names)
|
||
ok_all = True
|
||
failed_idx: list[int] = []
|
||
|
||
for i, ctr in enumerate(names, start=1):
|
||
if _cancelled():
|
||
raise KindClusterError("Операция отменена", exit_code=130)
|
||
if job_id:
|
||
pct = 5 + int(88 * (i - 1) / max(n, 1))
|
||
_js.set_progress_sync(job_id, f"Запуск узла {i} из {n}", min(pct, 93))
|
||
_js.append_log_sync(job_id, f"Узел {i} из {n}")
|
||
try:
|
||
|
||
def _on_line(line: str) -> None:
|
||
_js.append_log_sync(job_id, line)
|
||
|
||
_run_checked_stream([cli, "start", ctr], on_line=_on_line, job_id=job_id)
|
||
except KindClusterError as e:
|
||
if _cancelled():
|
||
raise KindClusterError("Операция отменена", exit_code=130) from e
|
||
ok_all = False
|
||
failed_idx.append(i)
|
||
logger.warning("%s start узел %s: %s", cli, i, e)
|
||
else:
|
||
p = subprocess.run([cli, "start", ctr], capture_output=True, text=True)
|
||
if p.returncode != 0:
|
||
ok_all = False
|
||
err = (p.stderr or p.stdout or "").strip() or str(p.returncode)
|
||
failed_idx.append(i)
|
||
logger.warning("%s start %s: %s", cli, ctr, err)
|
||
|
||
if job_id:
|
||
_js.set_progress_sync(job_id, "Готово", 98)
|
||
|
||
if ok_all:
|
||
return True, f"Запущено узлов: {n}"
|
||
return False, "Не удалось запустить узлы: " + (
|
||
", ".join(str(x) for x in failed_idx) if failed_idx else "см. журнал"
|
||
)
|
||
|
||
|
||
def read_meta_json(cluster_name: str) -> dict[str, object] | None:
|
||
"""Прочитать ``clusters/<имя>/meta.json`` если есть."""
|
||
p = clusters_dir() / cluster_name / "meta.json"
|
||
if not p.is_file():
|
||
return None
|
||
try:
|
||
raw = json.loads(p.read_text(encoding="utf-8"))
|
||
if isinstance(raw, dict):
|
||
return raw
|
||
except (OSError, json.JSONDecodeError) as e:
|
||
logger.debug("meta.json не прочитан: %s", e)
|
||
return None
|
||
|
||
|
||
def _container_cli_bin() -> str:
|
||
"""Имя CLI к сокету: см. :func:`kind_k8s_paths.container_cli_name`."""
|
||
return container_cli_name()
|
||
|
||
|
||
def configured_container_cli() -> str:
|
||
"""Имя CLI, которое реально вызывает процесс (в контейнере веб-UI — всегда ``docker``).
|
||
|
||
Единый источник для ``docker stats``/``podman stats``, ``kind``, health и поля ``container_cli`` в API.
|
||
"""
|
||
return _container_cli_bin()
|
||
|
||
|
||
def container_engine_ping(*, timeout_sec: float = 12.0) -> tuple[bool, str, str]:
|
||
"""
|
||
Проверить доступ к движку контейнеров (``docker info`` / ``podman info``).
|
||
|
||
Возвращает (успех, краткое сообщение или stderr, имя CLI).
|
||
"""
|
||
cli = _container_cli_bin()
|
||
if not shutil.which(cli):
|
||
return False, f"«{cli}» не найден в PATH", cli
|
||
try:
|
||
p = subprocess.run(
|
||
[cli, "info"],
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=timeout_sec,
|
||
)
|
||
except subprocess.TimeoutExpired:
|
||
logger.warning("%s info: таймаут %s с", cli, timeout_sec)
|
||
return False, f"таймаут {timeout_sec} с", cli
|
||
if p.returncode == 0:
|
||
return True, "OK", cli
|
||
err = (p.stderr or p.stdout or "").strip() or f"код {p.returncode}"
|
||
logger.info("%s info неуспешно: %s", cli, err[:200])
|
||
return False, err[:800], cli
|
||
|
||
|
||
def kubectl_nodes_wide(*, cluster_name: str, kubeconfig: str | Path) -> tuple[int, str]:
|
||
"""``kubectl get nodes -o wide``; возвращает (код, объединённый вывод)."""
|
||
src = Path(kubeconfig)
|
||
cfg = kubeconfig_path_for_container_kubectl(cluster_name=cluster_name, kube_source_path=src)
|
||
p = subprocess.run(
|
||
[
|
||
"kubectl",
|
||
"--kubeconfig",
|
||
str(cfg),
|
||
"get",
|
||
"nodes",
|
||
"-o",
|
||
"wide",
|
||
"--request-timeout=15s",
|
||
],
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
out = (p.stdout or "").strip()
|
||
err = (p.stderr or "").strip()
|
||
msg = out if out else err
|
||
return p.returncode, msg
|
||
|
||
|
||
def kubectl_pods_all_namespaces(*, cluster_name: str, kubeconfig: str | Path) -> tuple[int, str]:
|
||
"""``kubectl get pods -A``; сводка подов по кластеру."""
|
||
src = Path(kubeconfig)
|
||
cfg = kubeconfig_path_for_container_kubectl(cluster_name=cluster_name, kube_source_path=src)
|
||
p = subprocess.run(
|
||
[
|
||
"kubectl",
|
||
"--kubeconfig",
|
||
str(cfg),
|
||
"get",
|
||
"pods",
|
||
"-A",
|
||
"--request-timeout=20s",
|
||
],
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
out = (p.stdout or "").strip()
|
||
err = (p.stderr or "").strip()
|
||
msg = out if out else err
|
||
return p.returncode, msg
|
||
|
||
|
||
def kubectl_get_all_namespaces_wide(
|
||
*,
|
||
cluster_name: str,
|
||
kubeconfig: str | Path,
|
||
resource: str,
|
||
timeout: str = "30s",
|
||
) -> tuple[int, str]:
|
||
"""
|
||
``kubectl get <resource> -A -o wide`` — Deployments, StatefulSets, DaemonSets, Service, Ingress и т.д.
|
||
|
||
``resource`` — аргумент kubectl (``deployments``, ``pods``, ``svc`` …).
|
||
"""
|
||
src = Path(kubeconfig)
|
||
cfg = kubeconfig_path_for_container_kubectl(cluster_name=cluster_name, kube_source_path=src)
|
||
p = subprocess.run(
|
||
[
|
||
"kubectl",
|
||
"--kubeconfig",
|
||
str(cfg),
|
||
"get",
|
||
resource,
|
||
"-A",
|
||
"-o",
|
||
"wide",
|
||
f"--request-timeout={timeout}",
|
||
],
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
out = (p.stdout or "").strip()
|
||
err = (p.stderr or "").strip()
|
||
msg = out if out else err
|
||
return p.returncode, msg
|
||
|
||
|
||
def _parse_kubectl_list_json(stdout: str) -> tuple[list[dict[str, Any]] | None, str]:
|
||
"""Разбор вывода ``kubectl get … -o json`` (Kind List)."""
|
||
if not (stdout or "").strip():
|
||
return [], ""
|
||
try:
|
||
data = json.loads(stdout)
|
||
except json.JSONDecodeError as e:
|
||
return None, f"JSON: {e}"
|
||
if not isinstance(data, dict):
|
||
return None, "корень ответа не объект"
|
||
raw_items = data.get("items")
|
||
if raw_items is None:
|
||
return [], ""
|
||
if not isinstance(raw_items, list):
|
||
return None, "поле items не список"
|
||
out: list[dict[str, Any]] = [x for x in raw_items if isinstance(x, dict)]
|
||
return out, ""
|
||
|
||
|
||
def kubectl_get_json(
|
||
*,
|
||
cluster_name: str,
|
||
kubeconfig: str | Path,
|
||
resource_args: list[str],
|
||
timeout: str = "30s",
|
||
) -> tuple[int, list[dict[str, Any]] | None, str]:
|
||
"""
|
||
``kubectl get <resource_args…> -o json``.
|
||
|
||
``resource_args`` — например ``["nodes"]`` или ``["pods", "-A"]``, ``["deployments", "-A"]``.
|
||
Возвращает (код, список items или None, сообщение об ошибке).
|
||
"""
|
||
src = Path(kubeconfig)
|
||
cfg = kubeconfig_path_for_container_kubectl(cluster_name=cluster_name, kube_source_path=src)
|
||
cmd = [
|
||
"kubectl",
|
||
"--kubeconfig",
|
||
str(cfg),
|
||
"get",
|
||
*resource_args,
|
||
"-o",
|
||
"json",
|
||
f"--request-timeout={timeout}",
|
||
]
|
||
p = subprocess.run(cmd, capture_output=True, text=True)
|
||
out = (p.stdout or "").strip()
|
||
err = (p.stderr or "").strip()
|
||
if p.returncode != 0:
|
||
return p.returncode, None, err or out or f"код {p.returncode}"
|
||
items, perr = _parse_kubectl_list_json(out)
|
||
if items is None:
|
||
return p.returncode, None, perr
|
||
return 0, items, ""
|
||
|
||
|
||
def kubectl_delete_pod(
|
||
*,
|
||
cluster_name: str,
|
||
kubeconfig: str | Path,
|
||
namespace: str,
|
||
pod_name: str,
|
||
grace_period: int = 30,
|
||
) -> tuple[int, str]:
|
||
"""
|
||
Удаление пода (контроллер при необходимости создаст новый) — «мягкий рестарт».
|
||
|
||
``grace_period`` — секунды до SIGKILL (0 — сразу; для системных подов осторожно).
|
||
"""
|
||
src = Path(kubeconfig)
|
||
cfg = kubeconfig_path_for_container_kubectl(cluster_name=cluster_name, kube_source_path=src)
|
||
cmd = [
|
||
"kubectl",
|
||
"--kubeconfig",
|
||
str(cfg),
|
||
"delete",
|
||
"pod",
|
||
pod_name,
|
||
"-n",
|
||
namespace,
|
||
f"--grace-period={grace_period}",
|
||
"--request-timeout=60s",
|
||
]
|
||
p = subprocess.run(cmd, capture_output=True, text=True)
|
||
msg = ((p.stdout or "").strip() or (p.stderr or "").strip() or f"код {p.returncode}")[:8000]
|
||
return p.returncode, msg
|
||
|
||
|
||
def cluster_summary_for_api(name: str) -> dict[str, object]:
|
||
"""Сводка по кластеру для JSON API (без блокирующих долгих вызовов)."""
|
||
from core.provision_log import provision_log_file_path
|
||
|
||
meta = read_meta_json(name) or {}
|
||
saved_kc = clusters_dir() / name / "kubeconfig"
|
||
in_kind = name in list_registered_kind_clusters()
|
||
out: dict[str, object] = {
|
||
"name": name,
|
||
"registered_in_kind": in_kind,
|
||
"has_local_kubeconfig": saved_kc.is_file(),
|
||
"kubeconfig_path": str(saved_kc) if saved_kc.is_file() else None,
|
||
"meta": meta,
|
||
"has_provision_log": provision_log_file_path(name).is_file(),
|
||
}
|
||
return out
|