Files
KindClustersDashboard/app/core/job_store.py
Sergey Antropoff c1e867a01f Веб-UI: логи kind create, старт/стоп кластеров, документация README
- Потоковые логи в job_store и UI; kind create через Popen с построчным выводом
- POST /clusters/{name}/start|stop; create по сохранённому kind-config.yaml
- Страница /documentation: GET /api/v1/docs/readme, marked+DOMPurify из static/vendor
- Иконки действий, плавающие подсказки, модалка подтверждения вместо confirm
- Makefile: make docker|podman rebuild; compose: монтирование README.md
- Dockerfile: COPY README.md; readme_doc: несколько путей к README

Автор: Сергей Антропов — https://devops.org.ru
2026-04-04 06:21:00 +03:00

219 lines
8.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Хранилище фоновых заданий (создание кластера) в памяти процесса.
При перезапуске контейнера история заданий обнуляется — это ожидаемо для dev-среды.
Потокобезопасные флаги отмены и прогресс (для worker-thread) — через ``threading.Lock``.
Автор: Сергей Антропов
Сайт: https://devops.org.ru
"""
from __future__ import annotations
import asyncio
import logging
import os
import threading
import uuid
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Literal
logger = logging.getLogger("kind_k8s.job_store")
# Лимит записей в памяти (dev-инструмент; старые задания вытесняются)
_MAX_JOBS = 200
JobStatus = Literal["queued", "running", "success", "failed", "cancelled"]
# --- Синхронное сопровождение задания (worker-thread и HTTP отмена) ---
_thread_lock = threading.Lock()
_cancel_events: dict[str, threading.Event] = {}
_progress: dict[str, tuple[str, int]] = {}
# Хвост логов для активных заданий (kind create и т.д.); после завершения копируется в JobRecord.log_lines
_job_log_deques: dict[str, deque[str]] = {}
def _max_job_log_lines() -> int:
raw = (os.environ.get("KIND_K8S_JOB_LOG_MAX_LINES") or "500").strip()
try:
return max(50, min(int(raw), 5000))
except ValueError:
return 500
def append_log_sync(job_id: str, line: str) -> None:
"""Добавить строку в журнал задания (вызывается из worker-thread во время долгих команд)."""
text = (line or "").rstrip()
if not text:
return
cap = _max_job_log_lines()
with _thread_lock:
if job_id not in _job_log_deques:
_job_log_deques[job_id] = deque(maxlen=cap)
_job_log_deques[job_id].append(text)
def get_logs_snapshot_sync(job_id: str) -> list[str]:
"""Снимок текущего журнала (для API во время running/queued)."""
with _thread_lock:
d = _job_log_deques.get(job_id)
return list(d) if d else []
def take_logs_finalize_sync(job_id: str) -> list[str]:
"""
Забрать журнал в список и удалить deque (после успеха/ошибки/отмены).
Вызывать перед или внутри обновления JobRecord.
"""
with _thread_lock:
d = _job_log_deques.pop(job_id, None)
return list(d) if d else []
def begin_job_tracking(job_id: str) -> None:
"""Зарегистрировать отмену/прогресс для нового job_id (вызывать при создании задания)."""
with _thread_lock:
_cancel_events[job_id] = threading.Event()
_progress[job_id] = ("В очереди", 0)
def end_job_tracking(job_id: str) -> None:
"""Очистить служебные структуры после завершения задания."""
with _thread_lock:
_cancel_events.pop(job_id, None)
_progress.pop(job_id, None)
_job_log_deques.pop(job_id, None)
def set_progress_sync(job_id: str, stage: str, percent: int) -> None:
"""Обновить текст этапа и процент (0100) из worker-thread."""
pct = max(0, min(100, int(percent)))
with _thread_lock:
if job_id in _progress:
_progress[job_id] = (stage, pct)
def get_progress_sync(job_id: str) -> tuple[str, int] | None:
"""Снимок прогресса для ответа API."""
with _thread_lock:
return _progress.get(job_id)
def request_cancel_sync(job_id: str) -> bool:
"""Запросить отмену. Вернуть False, если job_id не отслеживается."""
with _thread_lock:
ev = _cancel_events.get(job_id)
if ev is None:
return False
ev.set()
logger.info("Запрошена отмена задания %s", job_id)
return True
def is_cancelled_sync(job_id: str) -> bool:
"""Проверка из worker-thread между этапами создания кластера."""
with _thread_lock:
ev = _cancel_events.get(job_id)
return bool(ev and ev.is_set())
@dataclass
class JobRecord:
"""Описание одного задания."""
job_id: str
kind: str
status: JobStatus
cluster_name: str | None
created_at_utc: str
message: str | None = None
result: dict[str, Any] | None = None
# Журнал после завершения (stdout/stderr kind create и этапы); пока задание активно — см. deque
log_lines: list[str] = field(default_factory=list)
class JobStore:
"""Потокобезопасное (asyncio) хранилище заданий."""
def __init__(self) -> None:
self._jobs: dict[str, JobRecord] = {}
self._lock = asyncio.Lock()
async def create_job(self, kind: str, *, cluster_name: str | None) -> JobRecord:
"""Зарегистрировать задание в статусе ``queued``."""
jid = uuid.uuid4().hex
now = datetime.now(timezone.utc).isoformat()
rec = JobRecord(
job_id=jid,
kind=kind,
status="queued",
cluster_name=cluster_name,
created_at_utc=now,
)
async with self._lock:
self._jobs[jid] = rec
while len(self._jobs) > _MAX_JOBS:
oldest_id = min(self._jobs, key=lambda k: self._jobs[k].created_at_utc)
end_job_tracking(oldest_id)
del self._jobs[oldest_id]
logger.debug("Вытеснено старое задание из хранилища: %s", oldest_id)
begin_job_tracking(jid)
logger.info("Создано задание %s kind=%s cluster=%s", jid, kind, cluster_name)
return rec
async def set_running(self, job_id: str) -> None:
async with self._lock:
if job_id in self._jobs:
self._jobs[job_id].status = "running"
self._jobs[job_id].message = None
set_progress_sync(job_id, "Запуск создания кластера…", 5)
async def set_success(self, job_id: str, *, result: dict[str, Any] | None = None, message: str | None = None) -> None:
logs = take_logs_finalize_sync(job_id)
async with self._lock:
if job_id in self._jobs:
self._jobs[job_id].status = "success"
self._jobs[job_id].result = result
self._jobs[job_id].message = message
self._jobs[job_id].log_lines = logs
set_progress_sync(job_id, "Готово", 100)
async def set_failed(self, job_id: str, message: str) -> None:
logs = take_logs_finalize_sync(job_id)
async with self._lock:
if job_id in self._jobs:
self._jobs[job_id].status = "failed"
self._jobs[job_id].message = message
self._jobs[job_id].log_lines = logs
logger.warning("Задание %s завершилось ошибкой: %s", job_id, message)
async def set_cancelled(self, job_id: str, message: str = "Создание отменено пользователем") -> None:
logs = take_logs_finalize_sync(job_id)
async with self._lock:
if job_id in self._jobs:
self._jobs[job_id].status = "cancelled"
self._jobs[job_id].message = message
self._jobs[job_id].log_lines = logs
logger.info("Задание %s отменено: %s", job_id, message)
async def get(self, job_id: str) -> JobRecord | None:
async with self._lock:
return self._jobs.get(job_id)
def snapshot_all(self) -> list[JobRecord]:
"""Снимок всех заданий (для отладки; без блокировки — eventual consistency)."""
return list(self._jobs.values())
def snapshot_recent_sorted(self, *, limit: int) -> list[JobRecord]:
"""Задания от новых к старым, не более ``limit``."""
items = self.snapshot_all()
items.sort(key=lambda r: r.created_at_utc, reverse=True)
return items[: max(1, limit)]
# Синглтон на процесс uvicorn
job_store = JobStore()