diff --git a/TCS-IPTV-系统说明文档.md b/TCS-IPTV-系统说明文档.md new file mode 100644 index 0000000..821352a --- /dev/null +++ b/TCS-IPTV-系统说明文档.md @@ -0,0 +1,410 @@ +# TCS-IPTV 内容可信锁定系统 — 完整说明文档 + +> 版本:V1.0(一期 MVP + 二期贯通 + 三期生态 + 四期大小屏融合·后端可代码部分) +> 编制日期:2026 年 6 月 +> 运营主体:陕西 IPTV 运营公司(机构节点 `MA.156.8531.6101`) +> 配套文档:`0-req-IPTV.md`(需求)、`1-prd-IPTV.md`(PRD)、`2-task-IPTV-MVP.md` / `3-task-IPTV-二期.md` / `4-task-IPTV-三期.md` / `5-task-IPTV-四期.md`(任务)、`tcs-iptv/DELIVERY.md`(交付说明) + +本文面向业务专家、技术评审与试点联调三类读者,覆盖:业务背景与价值、核心原理、业务流程、系统架构、功能完成情况、试用说明、接口清单、质量与安全状况、后续路线。 + +--- + +## 一、项目背景与价值 + +### 1.1 要解决的问题 + +IPTV/网络视听内容在"**送审 → 入库 → 分发 → 终端播放**"链路中存在长期痛点: + +- **审与发脱节**:内容审核通过后,缺乏一个贯穿全链路、不可篡改的"身份",无法保证分发出去的就是审过的那一版("换壳重发""偷梁换柱")。 +- **多方编码割裂**:CP、审核/媒资、运营商各有一套编码,同一内容跨系统对不上号,监管难以"一码贯穿"。 +- **维权举证难**:版权归属、首次锁定时间缺乏可信凭证,侵权追责成本高。 +- **数据与分账不可信**:播放数据由各平台自报,结算缺乏可信依据。 +- **跨省/跨屏重复审核**:同一内容跨省、跨大小屏(IPTV/OTT/手机)重复审核,效率低。 + +### 1.2 解决思路与价值 + +在 **内容提供商(CP)、审核和监管部门、运营商** 三方现有系统之上,建立一层"**可信身份映射层**": + +> 以 **MA 码(监管身份锚点)+ 哈希码(技术指纹锚点)** 双锚定, +> 实现内容"**审过即锁定,锁定即通行,通行可追溯**"。 + +| 价值维度 | 解法 | +|----------|------| +| **权利**(确权维权) | 全链路存证 + 确权证据链 + 感知哈希侵权比对,"谁先锁定谁有权" | +| **效率**(少跑路) | 一次锁定跨省/跨屏复用,三重校验快速准入,追更增量赋码 | +| **利益**(可信分账) | 以 MA 码为维度聚合可信播放数据,自动分账 | +| **监管**(看得见管得住) | 一码贯穿三方映射,一键应急下架,全国统计大屏 | + +--- + +## 二、角色与术语 + +### 2.1 角色(三方) + +| 角色 | 链上角色码 | 现有系统 | 在 TCS 中的职责 | +|------|-----------|----------|-----------------| +| 内容提供商 CP | `cp` | 制作/媒资系统 | 送审节目信息 + 哈希包(**不传原片**) | +| 审核和监管部门 | `reviewer`(审核/媒资)、`regulator`(监管发码) | **CSPS 审核系统 + 媒体资源库** | 合规审核、媒资入库、发布;监管发码与应急下架 | +| 运营商 | `operator` | BOSS/CDN | 注入前哈希校验、播放数据回传 | + +> 说明:审核与监管部门内部细分两类链上权限 —— `reviewer`(CSPS/媒资库的审核与入库发布)与 `regulator`(监管主体,唯一可发码/下架)。 + +### 2.2 核心术语 + +- **MA 码**:监管身份锚点。六段式结构 `MA.156.{行业节点}.{机构节点}/{类目}/{年份}{序列}`,例:`MA.156.8531.6101/WD/20260000004`。 + - `156`=国家码(中国)、`8531`=行业节点(IPTV 视听)、`6101`=机构节点(陕西)、`WD/WJ/DY/DH`=微短剧/网络剧/网络电影/网络动画。 +- **集级子标识**:一剧一码下按集寻址,形如 `MA.156.8531.6101/WD/20260000004#E07`(第 7 集)。 +- **哈希码**:技术指纹锚点。文件 SHA-256 / 分段 Merkle 根 / 感知哈希(用于侵权比对)。 +- **CTID(Content Twin ID)**:内容机器主键,链下双锚定主键。 +- **可信数据空间**:联盟链(长安链 ChainMaker,国密),存哈希与映射、不存原片。 +- **模式 B 自行发码**:与 MA 发码机构对接获取"号段 + 备案规则",由 TCS 在本地按规则原子发码。 + +--- + +## 三、核心设计原理 + +1. **双锚定**:MA 码(监管/法律身份)与哈希(技术指纹)在发码时 **1:1 强绑定且不可解绑**,链上同时记录 CTID。 +2. **一剧一码 + 集级哈希**:MA 码按"剧/备案"颁发(对齐网标证),各集独立哈希挂在同一码下,支持集级验真、集级下架/恢复。 +3. **先审后发**:CP 送审 → CSPS 审核通过 → **才发码签发**(审过才发证发码),杜绝"先发码后审核"的空子。 +4. **不传原片**:链上只存哈希,原片仍走审核方既有渠道做内容审核 —— 最小侵入、不替代现有系统。 +5. **防换壳重发**:同一文件哈希再次送审被直接拦截并关联原 MA 码。 +6. **权限分离**:仅监管主体可发码与下架;发布必须携带"MA 码 + 哈希证书"。 + +--- + +## 四、业务流程 + +### 4.1 内容全生命周期(主流程) + +``` +CP 送审 监管发码签发 CSPS 审核 / 媒资 发布给运营商 CDN 注入校验 +(不传原片) ──▶ (审核通过后) ──▶ 入媒资库 ──▶ (携 MA+哈希证书) ──▶ (注入前哈希比对) +节目信息+哈希包 1:1 强绑定哈希 建媒资编码映射 匹配→放行 / 不匹配→拒绝告警 +``` + +详细步骤: + +1. **CP 送审**(`cp`):提交标题、集数、类目、文件哈希、Merkle 根、感知哈希、各集哈希;系统校验哈希包完整性 + 防换壳重发,返回送审流水号 `REV-…` 与 `CTID`。 +2. **CSPS 合规审核**(`reviewer`):审核通过/驳回(原片走既有审核渠道,TCS 记审核结论)。 +3. **发码签发**(`regulator`,审过才发):按类目从号段原子分配 MA 码,与哈希包 1:1 强绑定上链,生成"MA 码 + 哈希证书"。 +4. **媒资库入库**(`reviewer`):建立媒资编码映射,状态 → 已入库。 +5. **发布给运营商**(`reviewer`):校验证书(须含 MA 码)后,状态 → 已发布。 +6. **CDN 注入校验**(`operator`):注入前比对哈希;匹配则放行并注册运营商/CDN 映射,不匹配则拒绝并告警;同时做授权核验(地域/平台/期限)。 +7. **终端抽检**:终端按集抽检片段哈希,不匹配提示断流切备用源。 + +### 4.2 治理与权益流程(二期) + +- **应急下架**:监管主体一键下架,解析出该 MA 码绑定的三方编码与 CDN 端点;支持**集级下架**(只下某集,整剧其余集继续流通)与**恢复上架**。 +- **版本变更重审**:哈希变化判定绑定断裂,触发重审,并可定位被改的具体集。 +- **可信分账**:运营商以 MA 码为维度回传播放数据 → 聚合 → 按比例分账(示例 CP 60% / 平台 34% / 服务费 6%)。 +- **追责取证 / 确权举证**:全链路存证定位首次哈希变化环节与责任方;导出确权证据链;感知哈希侵权比对。 +- **授权链**:登记信息网络传播权(地域/平台/期限),发布与注入前核验。 +- **跨省复用**:B 省凭"MA 码 + 哈希证书"三重校验(码有效 + 哈希一致 + 非黑名单)快速准入。 + +### 4.3 大小屏融合流程(四期) + +- **跨域解析网关**:同一 MA 码在 IPTV/OTT/APP 统一解析(含集级子标识),返回一致的流通状态与跨屏可用性。 +- **扫码验真**:用户扫码返回真伪(链上存在且结构合法)+ 合规(仅 `published` 为合规流通)+ 流通状态,下架码判为"真码但不合规"。 +- **跨屏权益通兑**:以 MA 码为维度记录购买,**任一屏购买即全屏(电视/手机/OTT)通看,重复购买幂等不重复扣费**,权益归一到整剧 MA 码。 + +--- + +## 五、系统架构与工程结构 + +### 5.1 分层架构 + +``` + ┌───────────────── 监管大屏(React + AntD)─────────────────┐ + │ 角色工作台 │ 全流程演示 │ 监管片库(权益与治理) │ + └──────────────────────────┬──────────────────────────────┘ + │ 会话令牌(密钥不下发浏览器) + ┌─────────┴─────────┐ + │ console-bff :8090 │(BFF 安全层) + └─────────┬─────────┘ + │ HMAC-SHA256 鉴权 + ┌────────────────────────── api-svc :8080(业务编排)──────────────────────────┐ + │ service:送审/审核/发码/入库/发布/注入/下架/分账/追责/确权/授权/跨省/解析/权益 │ + │ macode:六段式发码与号段(PG 行锁防重号) │ hash:SHA256/Merkle/感知哈希 │ + └───────────────┬─────────────────────────────────────┬─────────────────────────┘ + │ chain.Client 接口 │ + ┌──────────┴──────────┐ ┌─────────┴─────────┐ + │ MemoryChain(等价实现)│ 平滑替换 ──▶ │ ChainMaker 国密链 │(合约源码已就绪) + └─────────────────────┘ └───────────────────┘ + │ + ┌──────────┴──────────┐ + │ PostgreSQL 16 / Redis │(号段游标、链上数据镜像、缓存) + └─────────────────────┘ +``` + +### 5.2 工程结构 + +``` +tcs-iptv/ +├── cmd/ +│ ├── api-svc/ # 业务后端(:8080) +│ ├── chain-svc/ # 链交互服务(:8081) +│ ├── hash-api/ # 哈希SDK HTTP API(:8082) +│ └── console-bff/ # 监管控制台 BFF(:8090) +├── internal/ +│ ├── hash/ # 哈希核心(SHA256/Merkle/感知哈希) +│ ├── macode/ # MA码生成/解析/号段(含 PG 存储) +│ ├── chain/ # 可信数据空间抽象 + MemoryChain +│ ├── service/ # 业务编排(含 phase4.go 大小屏融合) +│ ├── playback/ # 播放聚合与分账 +│ ├── provenance/ # 全链路存证与追责 +│ ├── bff/ # 控制台 BFF +│ ├── api/ # HTTP 路由与处理器 +│ ├── model/ # 领域模型(含 rights.go 跨屏权益) +│ └── config/ httpx/ # 配置、通用 HTTP / 鉴权 +├── contracts/tcs_registry/ # ChainMaker Go 合约(独立模块) +├── deploy/migrations/ # PostgreSQL 迁移(0001-0003) +├── web-console/ # React 监管大屏 +├── scripts/ # seed_demo.sh / e2e_smoke.sh +└── .gitlab-ci.yml +``` + +--- + +## 六、功能完成情况 + +> 图例:✅ 已完成(代码可交付,含测试);⏸ 需外部环境/流程(非本机代码可完成,已诚实标注)。 + +### 6.1 一期 MVP(核心闭环)✅ + +| 模块 | 状态 | 说明 | +|------|------|------| +| 哈希 SDK | ✅ | 文件 SHA-256 / 分段 Merkle / 感知哈希 | +| MA 码生成 | ✅ | 六段式、号段原子分配、PostgreSQL 行锁防重号 | +| 可信数据空间 | ✅ | 1:1 强绑定不可解绑、防换壳重发、权限控制 | +| 送审→审核→发码→入库→发布→注入→下架 | ✅ | 全闭环 | +| 一剧一码 + 集级哈希 | ✅ | 集级验真、集级下架/恢复、整剧下架/恢复 | +| HTTP API + HMAC 三角色权限 | ✅ | 四角色密钥 | +| 监管大屏 | ✅ | 角色工作台 / 全流程演示 / 监管片库 | + +### 6.2 二期 贯通(权益场景)✅ + +| 能力 | 状态 | 说明 | +|------|------|------| +| 数据回传聚合 + 可信分账 | ✅ | 以 MA 码聚合,CP60/平台34/服务费6 | +| 全链路追责取证 | ✅ | 定位首次哈希变化环节与责任方 | +| 确权证据链 + 感知哈希侵权比对 | ✅ | "谁先锁定谁有权" | +| 追更增量赋码 | ✅ | 不触发存量重审、不重新发码 | +| 跨省复用快速准入 | ✅ | 三重校验(码有效+哈希一致+非黑名单) | +| 授权链登记 + 发布/注入前核验 | ✅ | 地域/平台/期限拦截 | +| 终端片段抽检 | ✅ | 不匹配提示断流切源 | +| 前端"权益与治理"可视化 | ✅ | 分账/追责/确权/授权标签 | +| CI/CD | ✅ | GitLab CI 流水线 | + +### 6.3 三期 生态(代码可交付部分)✅ / ⏸ + +| 能力 | 状态 | 说明 | +|------|------|------| +| 备案对接(网标号/备案号关联) | ✅ | `/content/bind-filing`、`/content/filing` | +| 监管数据日报 | ✅ | `/regulatory/daily-report` | +| 号段管理 | ✅ | `/admin/segments` | +| 全国统计(按省/类目/状态) | ✅ | `/regulatory/national-stats` | +| 监管大屏 BFF 安全化 | ✅ | 密钥仅存后端,浏览器只用会话令牌 | +| 真实链合约源码 | ✅ | `contracts/tcs_registry/registry.go`(ChainMaker Go) | +| 真实链部署 / 等保测评 / 压测 / 行业标准 | ⏸ | 需外部环境与流程 | + +### 6.4 四期 大小屏融合(后端可代码部分)✅ / ⏸ + +| 任务 | 状态 | 说明 | +|------|------|------| +| C.1 MA 跨域解析网关 | ✅ | `GET /content/resolve` | +| C.2 大小屏身份互通 | ✅ | 同一 MA 码跨 iptv/ott/app 一致解析与哈希身份 | +| B.2 扫码验真 | ✅ | `POST /content/scan-verify` | +| D.1 跨屏权益通兑 | ✅ | `POST /rights/purchase`、`/rights/verify`,任一屏购买全屏通看 | +| A.1/A.2 OTT 端 SDK / 播放器抽检 | ⏸ | 复用后端 inject/verify/resolve/terminal 能力;端侧 SDK 待真实终端 | +| B.1 移动端壳接入 | ⏸ | 复用统一鉴权与后端校验;RN/小程序待移动端环境 | +| E.1 移动端 C2PA 内容凭证 | ⏸ | 依赖 C2PA 类水印 SDK,衔接 AVCC 体系 | + +--- + +## 七、试用说明 + +### 7.1 环境准备 + +> 本地直接使用已安装的 PostgreSQL / Redis,**无需 Docker**。 + +- Go 1.23+ +- Node 20+(前端) +- PostgreSQL 16(创建库 `tcs_iptv`,psql 已加入 PATH) +- Redis 7.x + +可选环境变量(缺省即适配本地): + +| 变量 | 默认值 | +|------|--------| +| `TCS_POSTGRES_DSN` | `postgres://postgres@localhost:5432/tcs_iptv?sslmode=disable` | +| `TCS_REDIS_ADDR` | `localhost:6379` | +| `TCS_API_ADDR` | `:8080` | + +### 7.2 启动步骤 + +```bash +cd tcs-iptv + +# 1. 数据库迁移(库 tcs_iptv 需已创建) +make migrate +make db-check # 列出已建表 +make redis-check # 应返回 PONG + +# 2. 运行测试(确认环境就绪) +make test + +# 3. 启动后端 +make run-api # api-svc :8080 +go run ./cmd/console-bff # BFF :8090(监管大屏走 BFF,可选) + +# 4. 启动前端监管大屏 +cd web-console && npm install && npm run dev # :5173/5174 + +# 5. 造演示数据(陕西 IPTV 场景) +bash scripts/seed_demo.sh + +# 6. 全相位端到端冒烟 +bash scripts/e2e_smoke.sh +``` + +监管大屏访问:`http://localhost:5174`(角色工作台 / 全流程演示 / 监管片库)。 +`seed_demo.sh` 会打印生成的 MA 码,可复制到大屏查询全链路三方映射。 + +### 7.3 演示场景(陕西 IPTV) + +| 参与方 | 示例 | +|--------|------| +| 管理方(审核+监管) | 陕西 IPTV 运营公司(机构节点 6101) | +| 内容提供商 CP | 西安曲江丝路文化传播 / 陕文投艺达影视 / 西部电影集团(西影) | +| 运营商 | 中国电信陕西(天翼高清)/ 中国移动陕西(魔百和)/ 中国联通陕西 | +| 示例内容 | 《长安少年行》(微短剧) /《白鹿原·麦客》(网络剧) /《丝路驼铃》(网络电影) | + +### 7.4 API 鉴权与调用 + +所有 `/api/v1/**` 接口需 HMAC-SHA256 鉴权。 + +- **签名串**:`base64( HMAC-SHA256( secret, "{METHOD}\n/api/v1{path不含query}" ) )` +- **请求头**:`Authorization: TCS {apiKey}:{signature}` + +预置四角色示例密钥(生产从 Vault/DB 加载): + +| 角色 | apiKey | secret | +|------|--------|--------| +| 监管主体 | `ak-regulator` | `sk-regulator` | +| 审核/媒资 | `ak-reviewer` | `sk-reviewer` | +| 内容提供商 | `ak-cp` | `sk-cp` | +| 运营商 | `ak-operator` | `sk-operator` | + +通用签名/调用函数(bash): + +```bash +BASE="http://localhost:8080/api/v1" +sign() { printf '%s\n%s' "$2" "$3" | openssl dgst -sha256 -hmac "$1" -binary | base64; } +call() { # key secret method path body + local sig; sig=$(sign "$2" "$3" "/api/v1${4%%\?*}") + if [ "$3" = "GET" ]; then curl -s "$BASE$4" -H "Authorization: TCS $1:$sig"; + else curl -s -X "$3" "$BASE$4" -H "Authorization: TCS $1:$sig" -H "Content-Type: application/json" -d "$5"; fi +} +``` + +### 7.5 四期新接口试用示例 + +```bash +# 先用 seed_demo.sh 生成一个已发布的 MA 码,记为 $MA + +# C.1/C.2 跨域解析(GET,跨屏统一解析;支持集级子标识 #E03) +call ak-regulator sk-regulator GET "/content/resolve?ma_code=$MA" +call ak-regulator sk-regulator GET "/content/resolve?ma_code=$MA#E03" + +# B.2 扫码验真(返回 authentic 真伪 / compliant 合规) +call ak-operator sk-operator POST /content/scan-verify "{\"ma_code\":\"$MA\"}" + +# D.1 跨屏权益:电视端购买 +call ak-operator sk-operator POST /rights/purchase \ + "{\"ma_code\":\"$MA\",\"user_hash\":\"user-1\",\"screen\":\"iptv\"}" + +# D.1 手机端核验权益 → 通看,不重复付费 +call ak-operator sk-operator POST /rights/verify \ + "{\"ma_code\":\"$MA\",\"user_hash\":\"user-1\",\"screen\":\"app\"}" +``` + +--- + +## 八、接口清单(节选,均在 `/api/v1` 下) + +| 分类 | 方法 路径 | 说明 | +|------|-----------|------| +| 送审/发码 | `POST /content/register` | CP 送审(哈希包,不传原片) | +| | `POST /content/csps-result` | CSPS 合规审核结论 | +| | `POST /content/issue` | 审核通过后发码签发(仅监管) | +| 验真 | `POST /content/verify` | 整剧哈希验真 | +| | `POST /content/verify-episode` | 集级验真 | +| 分发 | `POST /content/ingest` / `publish` / `inject` | 入库 / 发布 / CDN 注入校验 | +| 治理 | `POST /content/takedown` / `takedown-episode` | 整剧 / 集级应急下架(仅监管) | +| | `POST /content/restore` / `restore-episode` | 恢复上架 | +| | `GET /content/mappings` | 三方映射与 CDN 端点查询 | +| 权益 | `POST /data/playback` / `GET /data/playback-summary` | 播放回传 / 可信聚合 | +| | `POST /settlement/compute` | 可信分账 | +| 追责确权 | `GET /content/provenance` / `accountability` / `evidence` | 存证 / 追责 / 确权 | +| | `POST /content/infringe-match` | 感知哈希侵权比对 | +| 授权/追更/跨省 | `POST /content/authorize` / `auth-check` / `add-episodes` / `cross-province` | — | +| 终端 | `POST /terminal/verify-segment` | 终端片段抽检 | +| 三期生态 | `POST /content/bind-filing`、`GET /content/filing`、`GET /regulatory/national-stats`、`GET /regulatory/daily-report`、`GET /admin/segments` | 备案/统计/上报/号段 | +| **四期大小屏** | `GET /content/resolve` | **跨域解析网关** | +| | `POST /content/scan-verify` | **扫码验真** | +| | `POST /rights/purchase` | **记录跨屏购买** | +| | `POST /rights/verify` | **跨屏权益核验** | + +--- + +## 九、质量状况 + +| 指标 | 状况 | +|------|------| +| 测试用例 | 100+(含四期 18 个新单测),全部通过 | +| 核心覆盖率 | playback 100% / hash 88% / service 85% / macode 75% | +| `go build ./...` / `go vet` | 通过 | +| 前端构建 | 通过 | +| 端到端冒烟 | 一期 → 四期(后端可代码部分)全相位通过 | + +--- + +## 十、待外部环境/流程的事项(诚实标注,非代码可完成) + +| 项 | 说明 | 就绪度 | +|----|------|--------| +| 真实 ChainMaker 国密测试网 | 需多节点链环境 | 合约源码 + `chain.Client` 接口就绪,平滑替换 MemoryChain | +| 链上数据 PG 镜像接入 | 需真实链 | 镜像表已建(migrations) | +| 性能压测 / 高可用灾备 | 需集群 + 压测工具 | 架构支持,待环境 | +| 等保三级正式测评 | 需第三方机构 + 正式环境 | 安全设计就绪(BFF/HMAC/国密/审计) | +| HSM 密钥托管 | 需硬件 | 接口预留 | +| 行业分账标准发布 | 政策/行业协作 | 分账引擎已实现 | +| OTT / 移动端 SDK 接入 | 需 Android TV/OTT、RN/小程序真实终端 | 后端解析/校验/权益能力就绪可复用 | +| 移动端 C2PA 内容凭证 | 需 C2PA 类水印 SDK,衔接 AVCC | 待端侧环境 | + +--- + +## 十一、安全说明 + +- ✅ 已实现:HMAC-SHA256 鉴权、三角色权限矩阵、MA 码 1:1 不可解绑、哈希本地计算不上链原片、关键操作存证、**监管大屏 BFF 化(密钥不下发浏览器)**。 +- ⚠️ 上生产前需补齐:真实国密链替换、等保三级测评、HSM 密钥托管、生产凭证接入 Vault/SSO。 +- 网络暴露提示:当前示例服务以预置密钥启动,仅用于本机演示/联调;公网部署前必须更换密钥来源并启用 TLS、网关与审计。 + +--- + +## 十二、版本与后续路线 + +| 阶段 | 主题 | 状态 | +|------|------|------| +| 一期 MVP | 内容可信锁定核心闭环 | ✅ 完成 | +| 二期 贯通 | 权利/效率/利益/合规场景 | ✅ 完成 | +| 三期 生态 | 备案对接/全国监管/BFF 安全/真实链合约 | ✅ 代码部分完成;部署待环境 | +| 四期 大小屏融合 | 跨域解析/扫码验真/跨屏权益 | ✅ 后端完成;端侧 SDK 待环境 | + +> 四期完成后,TCS-IPTV 从"IPTV 内容可信锁定"升级为"全场景视听内容可信身份基础设施", +> 可与 AVCC(AIGC 视听内容编码)体系形成大小屏、传统/AIGC 内容的统一身份底座。 + +--- + +> 本系统一期至四期"可本机代码部分"均已实现并通过回归测试,可用于演示、试点联调与功能验收。 +> 剩余为真实链部署、等保测评、压测、HSM、行业标准、端侧 SDK 等需外部环境/流程的事项,已在上文逐项标注。 diff --git a/tcs-iptv/cmd/api-svc/main.go b/tcs-iptv/cmd/api-svc/main.go index 078633e..e811e03 100644 --- a/tcs-iptv/cmd/api-svc/main.go +++ b/tcs-iptv/cmd/api-svc/main.go @@ -15,25 +15,49 @@ import ( "github.com/tcs-iptv/tcs/internal/service" ) -// newAllocationStore 优先使用 PostgreSQL(持久、防重号),不可用时回退内存。 -func newAllocationStore(dsn string) macode.AllocationStore { +// openDB 尝试连接 PostgreSQL,连通则返回 *sql.DB,否则返回 nil(回退内存)。 +func openDB(dsn string) *sql.DB { db, err := sql.Open("postgres", dsn) - if err == nil { - if pingErr := db.Ping(); pingErr == nil { - log.Printf("macode: 使用 PostgreSQL 号段存储") - return macode.NewPostgresStore(db) - } + if err != nil { + return nil + } + if err := db.Ping(); err != nil { + return nil + } + return db +} + +// newAllocationStore 优先使用 PostgreSQL(持久、防重号),不可用时回退内存。 +func newAllocationStore(db *sql.DB) macode.AllocationStore { + if db != nil { + log.Printf("macode: 使用 PostgreSQL 号段存储") + return macode.NewPostgresStore(db) } log.Printf("macode: PostgreSQL 不可用,回退内存号段存储(仅开发用)") return macode.NewMemoryStore() } +// newChain 优先使用 PostgreSQL 持久化链(写穿 + 启动水合),不可用时回退纯内存。 +func newChain(db *sql.DB) chain.Client { + if db != nil { + if pc, err := chain.NewPersistentChain(db); err == nil { + log.Printf("chain: 使用 PostgreSQL 持久化链(写穿+水合,重启不丢数据)") + return pc + } else { + log.Printf("chain: PG 持久化链初始化失败(%v),回退内存链", err) + } + } + log.Printf("chain: 使用内存链(仅开发用,重启丢数据)") + return chain.NewMemoryChain() +} + func main() { cfg := config.Load() - // 装配依赖:链(MVP 用内存 mock)+ MA 码生成器(登记号段)+ 业务服务 - ch := chain.NewMemoryChain() - gen := macode.NewGenerator(newAllocationStore(cfg.PostgresDSN)) + // 装配依赖:共享一个 PG 连接给链持久化与号段存储 + db := openDB(cfg.PostgresDSN) + ch := newChain(db) + gen := macode.NewGenerator(newAllocationStore(db)) // 示例号段(生产由与发码机构对接后配置) // 机构节点 6101 = 陕西(管理方:陕西IPTV运营公司);行业节点 8531 = IPTV视听内容 _ = gen.RegisterSegment(macode.Segment{ diff --git a/tcs-iptv/deploy/migrations/0004_binding_revoked.sql b/tcs-iptv/deploy/migrations/0004_binding_revoked.sql new file mode 100644 index 0000000..138935e --- /dev/null +++ b/tcs-iptv/deploy/migrations/0004_binding_revoked.sql @@ -0,0 +1,12 @@ +-- 集级下架状态镜像:hash_binding 增加 revoked / revoked_reason 列。 +-- 对应需求11(集级应急下架/恢复),使集级下架状态可持久化镜像。 + +BEGIN; + +ALTER TABLE hash_binding + ADD COLUMN IF NOT EXISTS revoked BOOLEAN NOT NULL DEFAULT FALSE, + ADD COLUMN IF NOT EXISTS revoked_reason TEXT; + +COMMENT ON COLUMN hash_binding.revoked IS '集级下架标记:true=该集已下架'; + +COMMIT; diff --git a/tcs-iptv/internal/chain/persistent.go b/tcs-iptv/internal/chain/persistent.go new file mode 100644 index 0000000..11143aa --- /dev/null +++ b/tcs-iptv/internal/chain/persistent.go @@ -0,0 +1,297 @@ +package chain + +import ( + "database/sql" + "log" + + "github.com/tcs-iptv/tcs/internal/model" +) + +// PersistentChain 在 MemoryChain 之上叠加 PostgreSQL 持久化(最小改动模式)。 +// +// 设计要点(面向未来平滑替换真实链): +// - 业务规则(权限、1:1 强绑定、防换壳重发、状态机)全部复用 MemoryChain,单一真相来源; +// - 读路径直接走内存(快);写路径在内存变更成功后「写穿」到 PG 镜像表; +// - 启动时从 PG「水合」恢复内存状态,进程重启不丢数据; +// - 链上为权威数据源的设计不变:PG 仅作镜像,写穿失败仅记日志、不阻断主流程。 +// +// 未来接入真实 ChainMaker 时,整体以新的 chain.Client 实现替换即可,业务层零改动。 +type PersistentChain struct { + *MemoryChain + db *sql.DB +} + +var _ Client = (*PersistentChain)(nil) + +// NewPersistentChain 创建持久化链客户端,并从 PG 水合既有数据。 +func NewPersistentChain(db *sql.DB) (*PersistentChain, error) { + p := &PersistentChain{MemoryChain: NewMemoryChain(), db: db} + if err := p.hydrate(); err != nil { + return nil, err + } + return p, nil +} + +// ---- 写穿(在内存变更成功后镜像到 PG)---- + +// IssueMA 发码签发:内存绑定成功后镜像内容主记录与全部哈希绑定。 +func (p *PersistentChain) IssueMA(role Role, req IssueRequest) (string, error) { + tx, err := p.MemoryChain.IssueMA(role, req) + if err != nil { + return tx, err + } + c, _ := p.MemoryChain.QueryContent(req.MACode) + p.persistContent(c) + for _, b := range p.snapshotBindings(req.MACode) { + p.persistBinding(b) + } + p.persistTx(req.ContentTwinID, tx, "issueMA") + return tx, nil +} + +// RegisterHashBinding 追加哈希绑定(如转码版):镜像该条绑定。 +func (p *PersistentChain) RegisterHashBinding(role Role, b model.HashBinding) (string, error) { + tx, err := p.MemoryChain.RegisterHashBinding(role, b) + if err != nil { + return tx, err + } + p.persistBinding(b) + p.persistTx(b.ContentTwinID, tx, "registerHashBinding") + return tx, nil +} + +// RegisterMapping 注册三方映射:镜像该映射(按唯一键幂等)。 +func (p *PersistentChain) RegisterMapping(role Role, mp model.Mapping) (string, error) { + tx, err := p.MemoryChain.RegisterMapping(role, mp) + if err != nil { + return tx, err + } + p.persistMapping(mp) + p.persistTx(mp.ContentTwinID, tx, "registerMapping") + return tx, nil +} + +// RecordVersionChange 版本变更:镜像到 version_history。 +func (p *PersistentChain) RecordVersionChange(vc model.VersionChange) (string, error) { + tx, err := p.MemoryChain.RecordVersionChange(vc) + if err != nil { + return tx, err + } + p.exec(`INSERT INTO version_history + (content_twin_id, version, change_reason, prev_hash, new_hash, reaudit_required, reaudit_status, affected_episode) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8)`, + vc.ContentTwinID, vc.Version, vc.ChangeReason, vc.PrevHash, vc.NewHash, + vc.ReauditRequired, vc.ReauditStatus, vc.AffectedEpisode) + return tx, nil +} + +// Revoke 整剧下架:镜像内容状态。 +func (p *PersistentChain) Revoke(role Role, maCode, reason string) (MappingsResult, error) { + res, err := p.MemoryChain.Revoke(role, maCode, reason) + if err != nil { + return res, err + } + p.updateStatus(maCode, model.StatusRevoked) + return res, nil +} + +// Restore 整剧恢复上架:镜像内容状态。 +func (p *PersistentChain) Restore(role Role, maCode string) error { + if err := p.MemoryChain.Restore(role, maCode); err != nil { + return err + } + p.updateStatus(maCode, model.StatusPublished) + return nil +} + +// RevokeEpisode 集级下架:镜像该集 revoked 标记。 +func (p *PersistentChain) RevokeEpisode(role Role, maCode string, episode int, reason string) error { + if err := p.MemoryChain.RevokeEpisode(role, maCode, episode, reason); err != nil { + return err + } + p.updateEpisodeRevoked(maCode, episode, true, reason) + return nil +} + +// RestoreEpisode 集级恢复:镜像该集 revoked 标记。 +func (p *PersistentChain) RestoreEpisode(role Role, maCode string, episode int) error { + if err := p.MemoryChain.RestoreEpisode(role, maCode, episode); err != nil { + return err + } + p.updateEpisodeRevoked(maCode, episode, false, "") + return nil +} + +// SetContentStatus 状态流转(入库/发布等):镜像内容状态。 +func (p *PersistentChain) SetContentStatus(maCode, status string) error { + if err := p.MemoryChain.SetContentStatus(maCode, status); err != nil { + return err + } + p.updateStatus(maCode, status) + return nil +} + +// ---- PG 写入小工具(best-effort,失败仅记日志)---- + +func (p *PersistentChain) exec(q string, args ...any) { + if _, err := p.db.Exec(q, args...); err != nil { + log.Printf("chain/pg: 写穿失败(忽略,镜像为非权威): %v", err) + } +} + +func (p *PersistentChain) persistContent(c model.Content) { + var issueDate any + if c.IssueDate != "" { + issueDate = c.IssueDate + } + p.exec(`INSERT INTO content_registry + (content_twin_id, ma_code, ma_type, title, episode_count, status, issuer, issue_date, created_at) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9) + ON CONFLICT (content_twin_id) DO UPDATE SET status=EXCLUDED.status, updated_at=NOW()`, + c.ContentTwinID, c.MACode, c.MAType, c.Title, c.EpisodeCount, c.Status, c.Issuer, issueDate, c.CreatedAt) +} + +func (p *PersistentChain) persistBinding(b model.HashBinding) { + p.exec(`INSERT INTO hash_binding + (content_twin_id, hash_type, hash_value, merkle_root, file_format, resolution, duration, version, parent_hash, episode, revoked, revoked_reason, created_by) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13)`, + b.ContentTwinID, string(b.HashType), b.HashValue, b.MerkleRoot, b.FileFormat, b.Resolution, + b.Duration, b.Version, b.ParentHash, b.Episode, b.Revoked, b.RevokedReason, b.CreatedBy) +} + +func (p *PersistentChain) persistMapping(mp model.Mapping) { + p.exec(`INSERT INTO identity_mapping + (content_twin_id, party, party_id, party_name, cdn_endpoint) + VALUES ($1,$2,$3,$4,$5) + ON CONFLICT (content_twin_id, party, party_id) DO UPDATE SET cdn_endpoint=EXCLUDED.cdn_endpoint`, + mp.ContentTwinID, string(mp.Party), mp.PartyID, mp.PartyName, mp.CDNEndpoint) +} + +func (p *PersistentChain) persistTx(ctid, txID, method string) { + p.exec(`INSERT INTO chain_tx (content_twin_id, tx_id, method, status) + VALUES ($1,$2,$3,'confirmed') ON CONFLICT (tx_id) DO NOTHING`, + ctid, txID, method) +} + +func (p *PersistentChain) updateStatus(maCode, status string) { + p.exec(`UPDATE content_registry SET status=$1, updated_at=NOW() WHERE ma_code=$2`, status, maCode) +} + +func (p *PersistentChain) updateEpisodeRevoked(maCode string, episode int, revoked bool, reason string) { + p.exec(`UPDATE hash_binding hb SET revoked=$1, revoked_reason=$2 + FROM content_registry cr + WHERE hb.content_twin_id = cr.content_twin_id AND cr.ma_code=$3 AND hb.episode=$4`, + revoked, reason, maCode, episode) +} + +// snapshotBindings 复制某 MA 码当前的内存绑定(同包访问,读锁保护)。 +func (p *PersistentChain) snapshotBindings(maCode string) []model.HashBinding { + p.mu.RLock() + defer p.mu.RUnlock() + src := p.bindings[maCode] + out := make([]model.HashBinding, len(src)) + copy(out, src) + return out +} + +// ---- 启动水合:从 PG 镜像恢复内存状态 ---- + +func (p *PersistentChain) hydrate() error { + // 1) 内容主表 + 建立 ctid -> maCode 映射 + ctidToMA := map[string]string{} + rows, err := p.db.Query(`SELECT content_twin_id, ma_code, COALESCE(ma_type,''), title, + COALESCE(episode_count,1), status, COALESCE(issuer,''), + COALESCE(to_char(issue_date,'YYYY-MM-DD'),''), created_at FROM content_registry`) + if err != nil { + return err + } + n := 0 + for rows.Next() { + var c model.Content + if err := rows.Scan(&c.ContentTwinID, &c.MACode, &c.MAType, &c.Title, + &c.EpisodeCount, &c.Status, &c.Issuer, &c.IssueDate, &c.CreatedAt); err != nil { + rows.Close() + return err + } + p.contents[c.MACode] = c + ctidToMA[c.ContentTwinID] = c.MACode + n++ + } + rows.Close() + + // 2) 哈希绑定(含集级、转码、感知)+ 重建防换壳哈希索引 + bRows, err := p.db.Query(`SELECT content_twin_id, hash_type, hash_value, COALESCE(merkle_root,''), + COALESCE(file_format,''), COALESCE(resolution,''), COALESCE(duration,0), version, + COALESCE(parent_hash,''), episode, revoked, COALESCE(revoked_reason,''), COALESCE(created_by,'') + FROM hash_binding ORDER BY id`) + if err != nil { + return err + } + for bRows.Next() { + var b model.HashBinding + var ht string + if err := bRows.Scan(&b.ContentTwinID, &ht, &b.HashValue, &b.MerkleRoot, &b.FileFormat, + &b.Resolution, &b.Duration, &b.Version, &b.ParentHash, &b.Episode, &b.Revoked, + &b.RevokedReason, &b.CreatedBy); err != nil { + bRows.Close() + return err + } + b.HashType = model.HashType(ht) + ma := ctidToMA[b.ContentTwinID] + if ma == "" { + continue + } + p.bindings[ma] = append(p.bindings[ma], b) + if (b.HashType == model.HashFile || b.HashType == model.HashTranscoded) && b.HashValue != "" { + if _, ok := p.hashIndex[b.HashValue]; !ok { + p.hashIndex[b.HashValue] = ma + } + } + } + bRows.Close() + + // 3) 三方映射 + mRows, err := p.db.Query(`SELECT content_twin_id, party, party_id, COALESCE(party_name,''), COALESCE(cdn_endpoint,'') + FROM identity_mapping ORDER BY id`) + if err != nil { + return err + } + for mRows.Next() { + var mp model.Mapping + var party string + if err := mRows.Scan(&mp.ContentTwinID, &party, &mp.PartyID, &mp.PartyName, &mp.CDNEndpoint); err != nil { + mRows.Close() + return err + } + mp.Party = model.Party(party) + if ma := ctidToMA[mp.ContentTwinID]; ma != "" { + p.mappings[ma] = append(p.mappings[ma], mp) + } + } + mRows.Close() + + // 4) 版本变更 + vRows, err := p.db.Query(`SELECT content_twin_id, version, COALESCE(change_reason,''), COALESCE(prev_hash,''), + COALESCE(new_hash,''), reaudit_required, COALESCE(reaudit_status,''), COALESCE(affected_episode,0) + FROM version_history ORDER BY id`) + if err != nil { + return err + } + for vRows.Next() { + var vc model.VersionChange + if err := vRows.Scan(&vc.ContentTwinID, &vc.Version, &vc.ChangeReason, &vc.PrevHash, + &vc.NewHash, &vc.ReauditRequired, &vc.ReauditStatus, &vc.AffectedEpisode); err != nil { + vRows.Close() + return err + } + if ma := ctidToMA[vc.ContentTwinID]; ma != "" { + p.versions[ma] = append(p.versions[ma], vc) + } + } + vRows.Close() + + if n > 0 { + log.Printf("chain/pg: 已从 PostgreSQL 水合 %d 条内容记录", n) + } + return nil +}