Files
RoleForge/app/services/jsonlint_runtime.py
Sergey Antropoff 1d2301fb09 first commit
2026-04-30 08:59:31 +03:00

66 lines
2.0 KiB
Python

"""Cached merged JSONLint rules from app_config (`jsonlint` key)."""
from __future__ import annotations
import json
import logging
from typing import Any
from app.db.pool import get_pool
from app.services.jsonlint_rules import default_merged_rules, merge_jsonlint_saved
_logger = logging.getLogger(__name__)
_cached: dict[str, dict[str, Any]] | None = None
def _saved_from_row(row: object | None) -> dict:
if not row:
return {}
try:
raw = row["value"]
except (KeyError, TypeError):
return {}
if raw is None:
return {}
if isinstance(raw, dict):
return dict(raw)
if isinstance(raw, str):
try:
parsed = json.loads(raw)
except (json.JSONDecodeError, TypeError, ValueError):
_logger.warning("app_config.jsonlint value is invalid JSON string; ignoring")
return {}
return dict(parsed) if isinstance(parsed, dict) else {}
_logger.warning("app_config.jsonlint value is not an object; ignoring")
return {}
async def get_jsonlint_config_for_lint() -> dict[str, dict[str, Any]]:
"""Return cached merged rules; loads from DB on first use."""
global _cached
if _cached is not None:
return _cached
return await refresh_jsonlint_config_cache()
async def refresh_jsonlint_config_cache() -> dict[str, dict[str, Any]]:
"""Reload merged rules from `app_config.jsonlint` and replace cache."""
global _cached
pool = get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow("select value from app_config where key = $1", "jsonlint")
saved = _saved_from_row(row)
try:
merged = merge_jsonlint_saved(saved)
except Exception as exc: # noqa: BLE001
_logger.warning("Invalid JSONLint rules in DB; using defaults: %s", exc)
merged = default_merged_rules()
_cached = merged
return _cached
def invalidate_jsonlint_config_cache() -> None:
global _cached
_cached = None