#!/usr/bin/env python3 """Интерактивное или пакетное создание локального кластера Kubernetes через kind. Сохраняет kind-config.yaml, kubeconfig и meta.json в подпапку clusters/<имя>/. Автор: Сергей Антропов Сайт: https://devops.org.ru Требования: kind, клиент контейнеров (``docker`` к сокету Docker/Podman) и kubectl в PATH. Рекомендуется: веб-интерфейс (``make docker up``) — всё внутри Docker, на хосте только Docker и make. Пакетный режим: ``--non-interactive --name X --kubernetes-version 1.29.4 [--workers N]``. """ from __future__ import annotations import argparse import json import logging import os import shutil import subprocess import sys from core.cluster_lifecycle import ( CreateClusterResult, KindClusterError, create_cluster_non_interactive, list_registered_kind_clusters, normalize_k8s_version, validate_cluster_name, ) from kindest_node_tags import fetch_kindest_node_tags, normalize_tag_v_prefix from kind_k8s_paths import clusters_dir, container_cli_name # Имя кластера: поддомен DNS (RFC 1123) — дублируем только для CLI-подсказок; # основная проверка в core.cluster_lifecycle. def _which(cmd: str) -> str | None: return shutil.which(cmd) def _container_cli_bin() -> str: return container_cli_name() def _in_container() -> bool: return os.environ.get("KIND_K8S_IN_CONTAINER", "").strip() == "1" def _ask(prompt: str, default: str | None = None) -> str: if default is not None: line = input(f"{prompt} [{default}]: ").strip() return line if line else default line = input(f"{prompt}: ").strip() return line def _ask_int(prompt: str, default: int, *, min_v: int, max_v: int) -> int: while True: raw = _ask(prompt, str(default)) try: n = int(raw, 10) except ValueError: print("Введите целое число.") continue if n < min_v or n > max_v: print(f"Допустимый диапазон: {min_v}…{max_v}.") continue return n def _configure_logging() -> None: """Базовая настройка логов для вспомогательных модулей (kindest_node_tags и т.д.).""" if logging.root.handlers: return level = logging.DEBUG if os.environ.get("KIND_K8S_DEBUG", "").strip() in ("1", "true", "yes") else logging.INFO logging.basicConfig(level=level, format="%(levelname)s %(name)s: %(message)s") def _display_limit_for_version_list() -> int: raw = (os.environ.get("KIND_K8S_VERSION_LIST_DISPLAY") or "50").strip() try: return max(5, min(int(raw), 500)) except ValueError: return 50 def _interactive_k8s_version_tag() -> str: """ Спросить версию Kubernetes: загрузить теги kindest/node с Docker Hub (1.19+), показать список, выбор по номеру или ввод тега вручную. """ skip = os.environ.get("KIND_K8S_SKIP_VERSION_LIST", "").strip().lower() in ("1", "true", "yes", "да") tags: list[str] = [] if not skip: print( "Загрузка списка стабильных тегов kindest/node с Docker Hub (Kubernetes 1.19+, нужна сеть)…", flush=True, ) tags = fetch_kindest_node_tags() if not tags: print( "Предупреждение: список тегов недоступен (сеть, лимит Docker Hub или пустой ответ). " "Введите версию вручную.", flush=True, ) else: print("Загрузка списка пропущена (переменная KIND_K8S_SKIP_VERSION_LIST).", flush=True) if not tags: raw = _ask("Версия Kubernetes / тег образа kindest/node (например 1.29.4)", "1.29.4") return normalize_k8s_version(raw) display_n = _display_limit_for_version_list() print( f"\nДоступные теги (всего {len(tags)}): сначала latest, затем стабильные семверы от новых к старым:", flush=True, ) for i, t in enumerate(tags[:display_n], start=1): print(f" {i:3}) {t}", flush=True) if len(tags) > display_n: print( f" … показаны первые {display_n} из {len(tags)}; можно ввести номер от 1 до {len(tags)} " "или тег вручную (например 1.25.11).", flush=True, ) default_choice = "1" while True: raw = _ask(f"Номер строки (1–{len(tags)}) или версия вручную", default_choice) choice = raw if raw else default_choice if choice.isdigit(): idx = int(choice, 10) if 1 <= idx <= len(tags): picked = tags[idx - 1] print(f"Выбран образ kindest/node:{picked}", flush=True) return picked print(f"Введите число от 1 до {len(tags)} или тег версии (например 1.28.0).", flush=True) continue ver = normalize_k8s_version(choice) canon = normalize_tag_v_prefix(ver) if canon not in tags: print( f"Примечание: «{ver}» нет среди загруженных тегов; при отсутствии образа в реестре kind сообщит об ошибке.", flush=True, ) return ver def _run_interactive() -> None: print("=== Создание кластера kind ===\n") if not _which("kind"): print("Не найден бинарник kind.", file=sys.stderr) print(" Установка kind на хост: https://kind.sigs.k8s.io/docs/user/quick-start/#installation", file=sys.stderr) print(" Через Docker: из корня репозитория выполните «make docker up» и откройте веб-интерфейс.", file=sys.stderr) sys.exit(127) cli = _container_cli_bin() if not _which(cli): print(f"Не найден «{cli}» (CLI к API контейнеров).", file=sys.stderr) sys.exit(127) existing = list_registered_kind_clusters() default_name = "dev" if default_name in existing: default_name = "dev2" while True: name = _ask("Имя кластера (DNS-имя, a-z0-9-)", default_name) if not validate_cluster_name(name): print("Некорректное имя: только строчные буквы, цифры, дефис; не длиннее 63 символов.") continue if name in existing: print(f"Кластер «{name}» уже существует в kind. Удалите его в веб-интерфейсе или другое имя.") continue break ver_tag = _interactive_k8s_version_tag() workers = _ask_int( "Количество worker-нод (0 = только control-plane, он же может принимать поды)", 2, min_v=0, max_v=20, ) try: result = create_cluster_non_interactive(name=name, kubernetes_version_tag=ver_tag, workers=workers) except KindClusterError as e: print(str(e), file=sys.stderr) raise SystemExit(getattr(e, "exit_code", 1)) from e print("\nГотово.") print(f" kubeconfig (в среде запуска): {result.kubeconfig_path}") if _in_container(): print(f" Том на хосте: clusters/{result.cluster_name}/ в корне репозитория (рядом с Makefile)") if result.kubeconfig_patched_for_host: print( f" Kubectl **с хоста**: clusters/{result.cluster_name}/kubeconfig.host " f"(или скачивание из веб-UI) — server=:<порт>.", ) print( " Файл kubeconfig (без .host) — как выдал kind; статистика в UI и kubectl внутри приложения " "используют apiserver через шлюз хоста (host.docker.internal:<порт> из docker port; см. compose extra_hosts).", ) else: print( f" Проверка в этом контейнере: kubectl --kubeconfig=/work/clusters/{result.cluster_name}/kubeconfig get nodes", ) else: print(" Проверка без kubectl на хосте: веб-интерфейс (кластер → узлы/поды) или из корня репозитория:") print(f" make docker kubectl CLUSTER={result.cluster_name} # или: make podman kubectl …") print(" (сервис kind-k8s-web должен быть запущен: make docker up).") kc_for_shell = ( result.kubeconfig_path.parent / "kubeconfig.host" if result.kubeconfig_patched_for_host else result.kubeconfig_path ) print(f" Локально при установленном kubectl: kubectl --kubeconfig={kc_for_shell} get nodes") if result.kubeconfig_patched_for_host: print(" (файл kubeconfig.host — apiserver: localhost или KIND_K8S_KUBECONFIG_CLIENT_HOST и порт Docker.)") if result.nodes_ready is False and result.nodes_ready_message: print(f" Предупреждение (ожидание нод): {result.nodes_ready_message}", file=sys.stderr) def _parse_args() -> argparse.Namespace: p = argparse.ArgumentParser(description="Создание кластера kind (интерактивно или --non-interactive).") p.add_argument( "--non-interactive", action="store_true", help="Без диалогов; обязательны --name и --kubernetes-version", ) p.add_argument("--name", help="Имя кластера (DNS, a-z0-9-)") p.add_argument( "--kubernetes-version", dest="kubernetes_version", help="Версия / тег kindest/node, например 1.29.4 или v1.29.4", ) p.add_argument("--workers", type=int, default=2, help="Число worker-нод (0–20), по умолчанию 2") return p.parse_args() def main() -> None: _configure_logging() args = _parse_args() if args.non_interactive: if not args.name or not args.kubernetes_version: print("В режиме --non-interactive нужны --name и --kubernetes-version.", file=sys.stderr) raise SystemExit(2) try: result = create_cluster_non_interactive( name=args.name.strip(), kubernetes_version_tag=args.kubernetes_version.strip(), workers=args.workers, ) except KindClusterError as e: print(str(e), file=sys.stderr) raise SystemExit(getattr(e, "exit_code", 1)) from e print(_json_result_summary(result)) return _run_interactive() def _json_result_summary(result: CreateClusterResult) -> str: """JSON для stdout в пакетном режиме ``--non-interactive``.""" payload = { "cluster_name": result.cluster_name, "kubernetes_version_tag": result.ver_tag, "node_image": result.node_image, "workers": result.workers, "kubeconfig_path": str(result.kubeconfig_path), "kubeconfig_patched_for_host": result.kubeconfig_patched_for_host, "nodes_ready": result.nodes_ready, "nodes_ready_message": result.nodes_ready_message, } return json.dumps(payload, ensure_ascii=False, indent=2) if __name__ == "__main__": main()