Files
RoleForge/app/services/jsonlint_rules.py
Sergey Antropoff 1d2301fb09 first commit
2026-04-30 08:59:31 +03:00

238 lines
8.0 KiB
Python

"""JSON lint rule definitions for RoleForge admin JSONLint config UI and runtime."""
from __future__ import annotations
import copy
import json
import re
from typing import Any
from app.schemas.domain import LintRoleFileError
JSONLINT_RULE_IDS: tuple[str, ...] = (
"syntax",
"duplicate-keys",
"line-length",
"trailing-spaces",
"new-line-at-end-of-file",
"tab-characters",
"utf8-bom",
)
JSONLINT_RULE_META: tuple[dict[str, Any], ...] = (
{"id": "syntax", "title": "Syntax", "hint": "Valid JSON (Unicode decode + json.loads)."},
{"id": "duplicate-keys", "title": "Duplicate keys", "hint": "Object keys must be unique (RFC 8259)."},
{
"id": "line-length",
"title": "Line length",
"hint": "Maximum characters per line (excluding line ending).",
},
{"id": "trailing-spaces", "title": "Trailing spaces", "hint": "Whitespace at end of a line."},
{"id": "new-line-at-end-of-file", "title": "Newline at end of file", "hint": "POSIX text files end with a final newline."},
{"id": "tab-characters", "title": "Tab characters", "hint": "Horizontal tab inside a line."},
{"id": "utf8-bom", "title": "UTF-8 BOM", "hint": "Leading byte-order mark (U+FEFF)."},
)
def _default_state_for_rule(rule_id: str) -> dict[str, Any]:
st: dict[str, Any] = {"enabled": True, "blocking": True}
if rule_id == "line-length":
st["max"] = 160
return st
def default_merged_rules() -> dict[str, dict[str, Any]]:
return {rid: _default_state_for_rule(rid) for rid in JSONLINT_RULE_IDS}
def merge_jsonlint_saved(saved: dict[str, Any] | None) -> dict[str, dict[str, Any]]:
"""Overlay DB `saved` onto defaults."""
base = default_merged_rules()
if not saved or not isinstance(saved, dict):
return base
rules_in = saved.get("rules")
if not isinstance(rules_in, dict):
return base
for rid, patch in rules_in.items():
if rid not in base or not isinstance(patch, dict):
continue
if "enabled" in patch:
base[rid]["enabled"] = bool(patch["enabled"])
if "blocking" in patch:
base[rid]["blocking"] = bool(patch["blocking"])
if rid == "line-length" and "max" in patch:
try:
mx = int(patch["max"])
base[rid]["max"] = max(40, min(500, mx))
except (TypeError, ValueError):
pass
return base
def serialize_rules_for_api(merged: dict[str, dict[str, Any]]) -> dict[str, dict[str, Any]]:
return copy.deepcopy(merged)
def validate_jsonlint_put_payload(payload: dict[str, Any]) -> dict[str, Any]:
if not isinstance(payload, dict):
raise ValueError("Payload must be an object")
raw_rules = payload.get("rules")
if not isinstance(raw_rules, dict):
raise ValueError("`rules` must be an object")
merged = default_merged_rules()
for rid in JSONLINT_RULE_IDS:
patch = raw_rules.get(rid)
if patch is None:
continue
if not isinstance(patch, dict):
raise ValueError(f"Rule {rid} must be an object")
if "enabled" in patch:
merged[rid]["enabled"] = bool(patch["enabled"])
if "blocking" in patch:
merged[rid]["blocking"] = bool(patch["blocking"])
if rid == "line-length" and "max" in patch:
try:
mx = int(patch["max"])
merged[rid]["max"] = max(40, min(500, mx))
except (TypeError, ValueError) as exc:
raise ValueError("`line-length.max` must be an integer") from exc
for rid in raw_rules:
if rid not in JSONLINT_RULE_IDS:
raise ValueError(f"Unknown rule: {rid}")
return {"rules": merged}
_TRAILING_WS_RE = re.compile(r"[ \t]+$")
def _rule_active(rule_id: str, merged: dict[str, dict[str, Any]]) -> tuple[bool, bool]:
st = merged.get(rule_id) or {}
return bool(st.get("enabled", True)), bool(st.get("blocking", True))
def _emit(
errors: list[LintRoleFileError],
rule_id: str,
merged: dict[str, dict[str, Any]],
*,
line: int | None,
column: int | None,
message: str,
) -> None:
en, blk = _rule_active(rule_id, merged)
if not en:
return
level = "error" if blk else "warning"
errors.append(
LintRoleFileError(
line=line,
column=column,
level=level,
message=f"{rule_id}: {message}".strip(),
)
)
def run_json_lint(content: str, merged: dict[str, dict[str, Any]]) -> list[LintRoleFileError]:
"""Lint JSON text using merged admin rule states."""
errors: list[LintRoleFileError] = []
text = str(content or "")
if not text.strip():
return []
bom_en, _ = _rule_active("utf8-bom", merged)
if bom_en and text.startswith("\ufeff"):
_emit(errors, "utf8-bom", merged, line=1, column=1, message="UTF-8 BOM at start of file")
parse_base = text[1:] if text.startswith("\ufeff") else text
lines = parse_base.split("\n")
ends_with_nl = len(parse_base) > 0 and parse_base.endswith("\n")
max_cols = 160
st_ll = merged.get("line-length") or {}
try:
max_cols = max(40, min(500, int(st_ll.get("max") or 160)))
except (TypeError, ValueError):
max_cols = 160
for i, raw_line in enumerate(lines, start=1):
line = raw_line.rstrip("\r")
if _rule_active("line-length", merged)[0] and len(line) > max_cols:
_emit(
errors,
"line-length",
merged,
line=i,
column=max_cols + 1,
message=f"Line too long ({len(line)} > {max_cols})",
)
if _rule_active("trailing-spaces", merged)[0] and _TRAILING_WS_RE.search(line):
m = _TRAILING_WS_RE.search(line)
col = m.start() + 1 if m else len(line) + 1
_emit(errors, "trailing-spaces", merged, line=i, column=col, message="Trailing whitespace")
if _rule_active("tab-characters", merged)[0] and "\t" in line:
col = line.find("\t") + 1
_emit(errors, "tab-characters", merged, line=i, column=col, message="Tab character")
if _rule_active("new-line-at-end-of-file", merged)[0] and parse_base.strip() != "" and not ends_with_nl:
last_line = len(lines)
_emit(
errors,
"new-line-at-end-of-file",
merged,
line=last_line if last_line > 0 else 1,
column=None,
message="No newline at end of file",
)
syn_en, _ = _rule_active("syntax", merged)
dup_en, _ = _rule_active("duplicate-keys", merged)
if not syn_en and not dup_en:
return errors
def _dup_hook(pairs: list[tuple[str, Any]]) -> dict[str, Any]:
seen: set[Any] = set()
out: dict[str, Any] = {}
for k, v in pairs:
if k in seen:
raise ValueError(f"Duplicate key {k!r}")
seen.add(k)
out[k] = v
return out
try:
if dup_en:
json.loads(parse_base, object_pairs_hook=_dup_hook)
else:
json.loads(parse_base)
except json.JSONDecodeError as exc:
msg = str(exc.msg or "Invalid JSON")
if syn_en:
_emit(
errors,
"syntax",
merged,
line=getattr(exc, "lineno", None),
column=getattr(exc, "colno", None),
message=msg,
)
elif dup_en:
_emit(
errors,
"duplicate-keys",
merged,
line=getattr(exc, "lineno", None),
column=getattr(exc, "colno", None),
message=f"Invalid JSON (needed to check duplicate keys): {msg}",
)
except ValueError as exc:
msg = str(exc)
if dup_en and "duplicate key" in msg.lower():
_emit(errors, "duplicate-keys", merged, line=1, column=1, message=msg)
elif syn_en:
_emit(errors, "syntax", merged, line=1, column=1, message=msg)
return errors