Files
InternalAuditInterprise/backend/app/audit/service.py
T
2026-06-16 00:38:57 +08:00

82 lines
2.2 KiB
Python

"""系统自审计服务:写入哈希链审计日志、校验完整性(R19)。"""
from __future__ import annotations
import hashlib
import json
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.audit.models import AuditLog
def _compute_hash(prev_hash: str | None, payload: dict) -> str:
body = json.dumps(payload, sort_keys=True, ensure_ascii=False, default=str)
raw = f"{prev_hash or ''}|{body}"
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def record(
session: Session,
actor: str,
action: str,
*,
role: str | None = None,
target_type: str | None = None,
target_id: str | None = None,
detail: dict | None = None,
) -> AuditLog:
"""追加一条审计日志,自动接续哈希链。"""
last = session.execute(
select(AuditLog).order_by(AuditLog.seq.desc()).limit(1)
).scalar_one_or_none()
prev_hash = last.entry_hash if last else None
payload = {
"actor": actor,
"role": role,
"action": action,
"target_type": target_type,
"target_id": target_id,
"detail": detail or {},
}
entry_hash = _compute_hash(prev_hash, payload)
log = AuditLog(
actor=actor,
role=role,
action=action,
target_type=target_type,
target_id=target_id,
detail=detail or {},
prev_hash=prev_hash,
entry_hash=entry_hash,
)
session.add(log)
session.flush()
return log
def verify_chain(session: Session) -> tuple[bool, int | None]:
"""校验审计日志哈希链完整性。
返回 (是否完整, 首个断链的 seq 或 None)。
"""
rows = session.execute(select(AuditLog).order_by(AuditLog.seq.asc())).scalars().all()
prev_hash: str | None = None
for row in rows:
payload = {
"actor": row.actor,
"role": row.role,
"action": row.action,
"target_type": row.target_type,
"target_id": row.target_id,
"detail": row.detail or {},
}
expected = _compute_hash(prev_hash, payload)
if expected != row.entry_hash or row.prev_hash != prev_hash:
return False, row.seq
prev_hash = row.entry_hash
return True, None