Files
RoleForge/app/services/yamllint_runtime.py
Sergey Antropoff 1d2301fb09 first commit
2026-04-30 08:59:31 +03:00

69 lines
2.2 KiB
Python

"""Cached YamlLintConfig built from app_config (`yamllint` key)."""
from __future__ import annotations
import json
import logging
from yamllint.config import YamlLintConfig
from app.db.pool import get_pool
from app.services.yamllint_rules import default_merged_rules, merge_yamllint_saved, rules_to_yamllint_yaml
_logger = logging.getLogger(__name__)
_cached: YamlLintConfig | None = None
def _saved_from_row(row: object | None) -> dict:
if not row:
return {}
try:
raw = row["value"] # asyncpg.Record / mapping
except (KeyError, TypeError):
return {}
if raw is None:
return {}
if isinstance(raw, dict):
return dict(raw)
if isinstance(raw, str):
try:
parsed = json.loads(raw)
except (json.JSONDecodeError, TypeError, ValueError):
_logger.warning("app_config.yamllint value is invalid JSON string; ignoring")
return {}
return dict(parsed) if isinstance(parsed, dict) else {}
_logger.warning("app_config.yamllint value is not an object; ignoring")
return {}
async def get_yamllint_config_for_lint() -> YamlLintConfig:
"""Return cached config; loads from DB on first use."""
global _cached
if _cached is not None:
return _cached
return await refresh_yamllint_config_cache()
async def refresh_yamllint_config_cache() -> YamlLintConfig:
"""Reload config from `app_config.yamllint` and replace cache."""
global _cached
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow("select value from app_config where key = $1", "yamllint")
saved = _saved_from_row(row)
merged = merge_yamllint_saved(saved)
yaml_text = rules_to_yamllint_yaml(merged)
try:
_cached = YamlLintConfig(yaml_text)
except Exception as exc: # noqa: BLE001 — yamllint may raise on invalid generated config
_logger.warning("Invalid YamlLint config from DB or generated YAML; using defaults: %s", exc)
yaml_text = rules_to_yamllint_yaml(default_merged_rules())
_cached = YamlLintConfig(yaml_text)
return _cached
def invalidate_yamllint_config_cache() -> None:
global _cached
_cached = None