Files
KindClustersDashboard/app/core/container_resource_stats.py
Sergey Antropoff eb063aec20 Веб-интерфейс: страница /clusters, навигация и крошки для кластеров
- Выделена страница списка кластеров, панель упрощена; nav_active и крошки
  ведут в раздел Кластеры; theme.js синхронизирует активную пилюлю по URL.
- Доработки дашборда, аддонов, журнала, стилей и API-документации.
- Поддержка Podman: docker-compose.podman.yml, скрипты сокета; Makefile и env.
2026-04-04 13:42:21 +03:00

364 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Метрики CPU, памяти и I/O узлов kind через ``docker stats`` / ``podman stats``.
Имена контейнеров совпадают с префиксом кластера (см. ``list_kind_cluster_container_names``).
Автор: Сергей Антропов
Сайт: https://devops.org.ru
"""
from __future__ import annotations
import logging
import re
import shutil
import subprocess
from core.cluster_lifecycle import _sort_kind_node_containers, list_registered_kind_clusters
from kind_k8s_paths import container_cli_name
from models.schemas import AggregateResourcesSummary, KindClusterResources, KindNodeResourceStat
logger = logging.getLogger("kind_k8s.container_resource_stats")
# Одна строка на контейнер (совместимо с Docker и Podman).
_STAT_FORMAT = "{{.Name}}\t{{.CPUPerc}}\t{{.MemUsage}}\t{{.MemPerc}}\t{{.NetIO}}\t{{.BlockIO}}\t{{.PIDs}}"
def _container_cli_bin() -> str:
return container_cli_name()
def _list_running_container_names(cli: str) -> set[str]:
"""Имена **работающих** контейнеров (``ps`` без ``-a``)."""
if not shutil.which(cli):
return set()
p = subprocess.run(
[cli, "ps", "--format", "{{.Names}}"],
capture_output=True,
text=True,
timeout=30,
)
if p.returncode != 0:
logger.debug("%s ps: код %s", cli, p.returncode)
return set()
return {n.strip() for n in (p.stdout or "").splitlines() if n.strip()}
def running_kind_clusters_mask(cluster_names: list[str]) -> dict[str, bool]:
"""
Один вызов ``<cli> ps``: для каждого имени кластера — есть ли запущенный контейнер-узел
с префиксом ``{имя}-`` (как в ``collect_kind_clusters_resource_stats``).
"""
out: dict[str, bool] = {n: False for n in cluster_names}
if not cluster_names:
return out
cli = _container_cli_bin()
if not shutil.which(cli):
logger.debug("running_kind_clusters_mask: CLI «%s» не в PATH", cli)
return out
running = _list_running_container_names(cli)
for name in cluster_names:
pfx = f"{name}-"
out[name] = any(c.startswith(pfx) for c in running)
return out
def _all_container_names_ps_a(cli: str) -> tuple[list[str], str | None]:
"""Все имена контейнеров (``ps -a``). При ошибке — ([], сообщение)."""
p = subprocess.run(
[cli, "ps", "-a", "--format", "{{.Names}}"],
capture_output=True,
text=True,
timeout=30,
)
if p.returncode != 0:
err = (p.stderr or p.stdout or "").strip() or str(p.returncode)
return [], f"{cli} ps -a: {err}"
lines = [n.strip() for n in (p.stdout or "").splitlines() if n.strip()]
return lines, None
def _parse_pids(raw: str) -> int | None:
s = raw.strip()
if not s:
return None
try:
return int(re.sub(r"\D", "", s) or "0")
except ValueError:
return None
# Множители для суффиксов Docker/Podman (kB/MB — десятичные, KiB/MiB — двоичные).
_SUFFIX_TO_BYTES: dict[str, float] = {
"b": 1.0,
"kb": 1e3,
"kib": 1024.0,
"mb": 1e6,
"mib": 1024.0**2,
"gb": 1e9,
"gib": 1024.0**3,
"tb": 1e12,
"tib": 1024.0**4,
"pb": 1e15,
"pib": 1024.0**5,
}
def _parse_docker_size_bytes(chunk: str) -> float | None:
"""Разбор одного значения вроде ``512MiB``, ``7.7GiB``, ``800kB`` в байты."""
t = chunk.strip().lower().replace(" ", "")
if not t:
return None
m = re.match(r"^([\d.]+)([a-z]+)$", t)
if not m:
return None
val = float(m.group(1))
suf = m.group(2)
mult = _SUFFIX_TO_BYTES.get(suf)
if mult is None:
return None
return val * mult
def _parse_percent_value(raw: str | None) -> float | None:
"""Число из строки вида ``12.34%``."""
if not raw:
return None
m = re.match(r"^\s*([\d.]+)\s*%", raw.strip())
if not m:
return None
return float(m.group(1))
def _memory_used_ratio_percent(memory_usage: str | None) -> float | None:
"""Процент занятой памяти: левая часть MemUsage / правая (лимит контейнера)."""
if not memory_usage or "/" not in memory_usage:
return None
left, right = memory_usage.split("/", 1)
used = _parse_docker_size_bytes(left.strip())
limit = _parse_docker_size_bytes(right.strip())
if used is None or limit is None or limit <= 0:
return None
return min(100.0, 100.0 * used / limit)
def _parse_io_pair_sum_bytes(raw: str | None) -> float | None:
"""Сумма байт по обеим частям NetIO/BlockIO до и после ``/`` (чтение + запись)."""
if not raw or "/" not in raw:
return None
a, b = raw.split("/", 1)
ba = _parse_docker_size_bytes(a.strip())
bb = _parse_docker_size_bytes(b.strip())
if ba is None and bb is None:
return None
return (ba or 0.0) + (bb or 0.0)
def _fmt_bytes_ru(n: float) -> str:
"""Краткая подпись объёма для UI."""
if n <= 0:
return "0 B"
if n < 1024:
return f"{n:.0f} B"
for thresh, suffix in (
(1024.0**4, "TiB"),
(1024.0**3, "GiB"),
(1024.0**2, "MiB"),
(1024.0, "KiB"),
):
if n >= thresh:
v = n / thresh
s = f"{v:.2f}".rstrip("0").rstrip(".")
return f"{s} {suffix}"
return f"{n:.0f} B"
def _io_ring_percent(total_bytes: float, ref_bytes: float) -> float:
"""Нормализация суммарного I/O в 0100 для кольца (ref ≈ «полное кольцо»)."""
if total_bytes <= 0 or ref_bytes <= 0:
return 0.0
return min(100.0, 100.0 * total_bytes / ref_bytes)
def aggregate_kind_cluster_resources(blocks: list[KindClusterResources]) -> AggregateResourcesSummary:
"""
Средние и суммы по всем узлам из ``cluster_resources`` для донатов на главной.
Кольца сети/диска — условная шкала относительно порога (8 GiB суммарного I/O ≈ 100%).
"""
nodes: list[KindNodeResourceStat] = []
for b in blocks:
nodes.extend(b.nodes)
n = len(nodes)
if n == 0:
logger.debug("aggregate_kind_cluster_resources: узлов нет")
return AggregateResourcesSummary()
cpus = [_parse_percent_value(x.cpu_percent) for x in nodes]
mem_pcts = [_parse_percent_value(x.memory_percent) for x in nodes]
mem_ratios = [_memory_used_ratio_percent(x.memory_usage) for x in nodes]
nets = [_parse_io_pair_sum_bytes(x.net_io) for x in nodes]
blks = [_parse_io_pair_sum_bytes(x.block_io) for x in nodes]
def avg(vals: list[float | None]) -> float | None:
xs = [v for v in vals if v is not None]
if not xs:
return None
return sum(xs) / len(xs)
av_cpu = avg(cpus)
av_mem_pct = avg(mem_pcts)
av_mem_ratio = avg(mem_ratios)
sum_net = sum(v for v in nets if v is not None)
sum_blk = sum(v for v in blks if v is not None)
ref_io = 8.0 * 1024.0**3
cpu_ring = min(100.0, av_cpu) if av_cpu is not None else 0.0
mem_ring = min(100.0, av_mem_pct) if av_mem_pct is not None else 0.0
ratio_ring = min(100.0, av_mem_ratio) if av_mem_ratio is not None else 0.0
net_ring = _io_ring_percent(sum_net, ref_io)
disk_ring = _io_ring_percent(sum_blk, ref_io)
cpu_l = f"ср. {av_cpu:.1f}%" if av_cpu is not None else ""
mem_l = f"ср. {av_mem_pct:.1f}%" if av_mem_pct is not None else ""
ratio_l = f"ср. {av_mem_ratio:.1f}%" if av_mem_ratio is not None else ""
net_l = f"всего {_fmt_bytes_ru(sum_net)}" if sum_net > 0 else ""
disk_l = f"всего {_fmt_bytes_ru(sum_blk)}" if sum_blk > 0 else ""
logger.debug(
"aggregate_kind_cluster_resources: узлов=%s cpu_ring=%.1f net_ring=%.1f",
n,
cpu_ring,
net_ring,
)
return AggregateResourcesSummary(
nodes_count=n,
cpu_ring=cpu_ring,
cpu_label=cpu_l,
memory_percent_ring=mem_ring,
memory_percent_label=mem_l,
memory_used_ratio_ring=ratio_ring,
memory_used_ratio_label=ratio_l,
network_ring=net_ring,
network_label=net_l,
disk_ring=disk_ring,
disk_label=disk_l,
)
def _stats_for_container_names(cli: str, names: list[str]) -> list[KindNodeResourceStat]:
"""Вызов ``<cli> stats --no-stream`` для списка имён (только запущенные)."""
if not names:
return []
cmd = [cli, "stats", "--no-stream", "--format", _STAT_FORMAT, *names]
p = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if p.returncode != 0:
err = (p.stderr or p.stdout or "").strip() or str(p.returncode)
logger.warning("%s stats: %s", cli, err[:500])
return []
out: list[KindNodeResourceStat] = []
for line in (p.stdout or "").splitlines():
line = line.strip()
if not line:
continue
parts = line.split("\t")
while len(parts) < 7:
parts.append("")
name, cpu, mem_use, mem_pct, net_io, blk_io, pids_raw = parts[:7]
out.append(
KindNodeResourceStat(
container_name=name.strip(),
cpu_percent=cpu.strip() or None,
memory_usage=mem_use.strip() or None,
memory_percent=mem_pct.strip() or None,
net_io=net_io.strip() or None,
block_io=blk_io.strip() or None,
pids=_parse_pids(pids_raw),
)
)
return out
def collect_kind_clusters_resource_stats() -> tuple[list[KindClusterResources], str | None]:
"""
Для каждого кластера из ``kind get clusters`` — метрики **запущенных** узлов.
Один вызов ``ps -a`` на все имена; префикс узлов совпадает с ``list_kind_cluster_container_names``.
Возвращает (список блоков, сообщение об ошибке уровня движка или ``None``).
"""
cli = _container_cli_bin()
if not shutil.which(cli):
return [], f"CLI «{cli}» не найден в PATH — метрики узлов недоступны"
all_lines, ps_err = _all_container_names_ps_a(cli)
if ps_err:
return [], ps_err
running = _list_running_container_names(cli)
if not running:
logger.debug("Нет запущенных контейнеров по %s ps", cli)
kind_names = list_registered_kind_clusters()
blocks: list[KindClusterResources] = []
for cluster_name in kind_names:
prefix = f"{cluster_name}-"
node_names = _sort_kind_node_containers([n for n in all_lines if n.startswith(prefix)])
active = [n for n in node_names if n in running]
if not active:
blocks.append(
KindClusterResources(
cluster_name=cluster_name,
nodes=[],
note="узлы остановлены или контейнеры отсутствуют",
)
)
continue
stats = _stats_for_container_names(cli, active)
# Сохраняем порядок control-plane первым, как в kind
order = {n: i for i, n in enumerate(node_names)}
stats.sort(key=lambda s: order.get(s.container_name, 99))
blocks.append(KindClusterResources(cluster_name=cluster_name, nodes=stats, note=None))
return blocks, None
def collect_single_cluster_resource_stats(cluster_name: str) -> tuple[KindClusterResources, str | None]:
"""
Метрики узлов **одного** кластера (без полного прохода по всем kind-кластерам).
Если кластера нет в ``kind get clusters`` — пустой список узлов и пояснение в ``note``.
"""
name = cluster_name.strip()
cli = _container_cli_bin()
if not shutil.which(cli):
return KindClusterResources(cluster_name=name, nodes=[], note=None), f"CLI «{cli}» не найден в PATH"
if name not in list_registered_kind_clusters():
return KindClusterResources(
cluster_name=name,
nodes=[],
note="Кластер не в списке kind — метрики контейнеров-узлов недоступны",
), None
all_lines, ps_err = _all_container_names_ps_a(cli)
if ps_err:
return KindClusterResources(cluster_name=name, nodes=[], note=None), ps_err
running = _list_running_container_names(cli)
prefix = f"{name}-"
node_names = _sort_kind_node_containers([n for n in all_lines if n.startswith(prefix)])
active = [n for n in node_names if n in running]
if not active:
return KindClusterResources(
cluster_name=name,
nodes=[],
note="узлы остановлены или контейнеры отсутствуют",
), None
stats = _stats_for_container_names(cli, active)
order = {n: i for i, n in enumerate(node_names)}
stats.sort(key=lambda s: order.get(s.container_name, 99))
return KindClusterResources(cluster_name=name, nodes=stats, note=None), None