"""系统自审计服务:写入哈希链审计日志、校验完整性(R19)。""" from __future__ import annotations import hashlib import json from sqlalchemy import select from sqlalchemy.orm import Session from app.audit.models import AuditLog def _compute_hash(prev_hash: str | None, payload: dict) -> str: body = json.dumps(payload, sort_keys=True, ensure_ascii=False, default=str) raw = f"{prev_hash or ''}|{body}" return hashlib.sha256(raw.encode("utf-8")).hexdigest() def record( session: Session, actor: str, action: str, *, role: str | None = None, target_type: str | None = None, target_id: str | None = None, detail: dict | None = None, ) -> AuditLog: """追加一条审计日志,自动接续哈希链。""" last = session.execute( select(AuditLog).order_by(AuditLog.seq.desc()).limit(1) ).scalar_one_or_none() prev_hash = last.entry_hash if last else None payload = { "actor": actor, "role": role, "action": action, "target_type": target_type, "target_id": target_id, "detail": detail or {}, } entry_hash = _compute_hash(prev_hash, payload) log = AuditLog( actor=actor, role=role, action=action, target_type=target_type, target_id=target_id, detail=detail or {}, prev_hash=prev_hash, entry_hash=entry_hash, ) session.add(log) session.flush() return log def verify_chain(session: Session) -> tuple[bool, int | None]: """校验审计日志哈希链完整性。 返回 (是否完整, 首个断链的 seq 或 None)。 """ rows = session.execute(select(AuditLog).order_by(AuditLog.seq.asc())).scalars().all() prev_hash: str | None = None for row in rows: payload = { "actor": row.actor, "role": row.role, "action": row.action, "target_type": row.target_type, "target_id": row.target_id, "detail": row.detail or {}, } expected = _compute_hash(prev_hash, payload) if expected != row.entry_hash or row.prev_hash != prev_hash: return False, row.seq prev_hash = row.entry_hash return True, None