"""审计数据中台 ORM 模型。 涵盖:数据版本、本体实体、知识图谱关系边、双时态属性、时序事件。 对应需求 R2 / R3,建模决策见 ADR-0002。 """ from __future__ import annotations import datetime as dt import uuid from sqlalchemy import ( DateTime, Float, ForeignKey, Index, Integer, String, Text, UniqueConstraint, ) from sqlalchemy.dialects.postgresql import JSONB, UUID from sqlalchemy.orm import Mapped, mapped_column, relationship from app.db import Base def _uuid() -> uuid.UUID: return uuid.uuid4() class DataVersion(Base): """数据版本登记:每批接入数据的来源/批次/时间/行数,支撑结论可追溯(R3)。""" __tablename__ = "data_version" id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=_uuid) source_system: Mapped[str] = mapped_column(String(64), nullable=False) batch_label: Mapped[str] = mapped_column(String(128), nullable=False) row_count: Mapped[int] = mapped_column(Integer, default=0) ingested_at: Mapped[dt.datetime] = mapped_column( DateTime(timezone=True), default=lambda: dt.datetime.now(dt.UTC) ) note: Mapped[str | None] = mapped_column(Text, nullable=True) class Entity(Base): """本体实体节点(知识图谱顶点)。 business_key 是源系统中的业务主键,用于主数据对齐(同一实体跨系统归一)。 """ __tablename__ = "entity" __table_args__ = ( UniqueConstraint("entity_type", "business_key", name="uq_entity_type_bizkey"), Index("ix_entity_type", "entity_type"), ) id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=_uuid) entity_type: Mapped[str] = mapped_column(String(32), nullable=False) business_key: Mapped[str] = mapped_column(String(128), nullable=False) display_name: Mapped[str | None] = mapped_column(String(256), nullable=True) attributes: Mapped[dict] = mapped_column(JSONB, default=dict) # 主数据对齐:被归并到的"金主"实体(同一实控人/同一主体)。NULL 表示自身即主实体。 canonical_id: Mapped[uuid.UUID | None] = mapped_column( UUID(as_uuid=True), ForeignKey("entity.id"), nullable=True ) data_version_id: Mapped[uuid.UUID | None] = mapped_column( UUID(as_uuid=True), ForeignKey("data_version.id"), nullable=True ) class EntityRelationship(Base): """知识图谱关系边(有向)。多跳穿透用递归 CTE 遍历本表。""" __tablename__ = "entity_relationship" __table_args__ = ( Index("ix_rel_source", "source_id"), Index("ix_rel_target", "target_id"), Index("ix_rel_type", "rel_type"), ) id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=_uuid) rel_type: Mapped[str] = mapped_column(String(32), nullable=False) source_id: Mapped[uuid.UUID] = mapped_column( UUID(as_uuid=True), ForeignKey("entity.id"), nullable=False ) target_id: Mapped[uuid.UUID] = mapped_column( UUID(as_uuid=True), ForeignKey("entity.id"), nullable=False ) attributes: Mapped[dict] = mapped_column(JSONB, default=dict) data_version_id: Mapped[uuid.UUID | None] = mapped_column( UUID(as_uuid=True), ForeignKey("data_version.id"), nullable=True ) source: Mapped[Entity] = relationship(foreign_keys=[source_id]) target: Mapped[Entity] = relationship(foreign_keys=[target_id]) class BitemporalFact(Base): """双时态事实:实体的某个属性/状态随时间变化的记录。 - 业务有效期 valid_from/valid_to(应用时间) - 系统记录期 system_from/system_to(事务时间) 回放历史 = 给定 (as_of_valid, as_of_system) 过滤两条时间线(见 repository)。 """ __tablename__ = "bitemporal_fact" __table_args__ = ( Index("ix_btf_entity_attr", "entity_id", "attr_name"), ) id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=_uuid) entity_id: Mapped[uuid.UUID] = mapped_column( UUID(as_uuid=True), ForeignKey("entity.id"), nullable=False ) attr_name: Mapped[str] = mapped_column(String(64), nullable=False) attr_value: Mapped[dict] = mapped_column(JSONB, default=dict) valid_from: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), nullable=False) valid_to: Mapped[dt.datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) system_from: Mapped[dt.datetime] = mapped_column( DateTime(timezone=True), default=lambda: dt.datetime.now(dt.UTC) ) system_to: Mapped[dt.datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) data_version_id: Mapped[uuid.UUID | None] = mapped_column( UUID(as_uuid=True), ForeignKey("data_version.id"), nullable=True ) class MetricEvent(Base): """时序事件:行为/指标类数据(用户生命周期、回款、话务、佣金、资源使用)。 部署后通过 TimescaleDB create_hypertable('metric_event', 'event_time') 转为超表。 """ __tablename__ = "metric_event" __table_args__ = ( Index("ix_metric_subject_time", "subject_type", "subject_key", "event_time"), Index("ix_metric_name_time", "metric_name", "event_time"), ) id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=_uuid) event_time: Mapped[dt.datetime] = mapped_column(DateTime(timezone=True), nullable=False) subject_type: Mapped[str] = mapped_column(String(32), nullable=False) # 如 msisdn/channel subject_key: Mapped[str] = mapped_column(String(128), nullable=False) metric_name: Mapped[str] = mapped_column(String(64), nullable=False) # 如 traffic_mb/commission metric_value: Mapped[float] = mapped_column(Float, default=0.0) attributes: Mapped[dict] = mapped_column(JSONB, default=dict) data_version_id: Mapped[uuid.UUID | None] = mapped_column( UUID(as_uuid=True), ForeignKey("data_version.id"), nullable=True )