package chain import ( "database/sql" "log" "github.com/tcs-iptv/tcs/internal/model" ) // PersistentChain 在 MemoryChain 之上叠加 PostgreSQL 持久化(最小改动模式)。 // // 设计要点(面向未来平滑替换真实链): // - 业务规则(权限、1:1 强绑定、防换壳重发、状态机)全部复用 MemoryChain,单一真相来源; // - 读路径直接走内存(快);写路径在内存变更成功后「写穿」到 PG 镜像表; // - 启动时从 PG「水合」恢复内存状态,进程重启不丢数据; // - 链上为权威数据源的设计不变:PG 仅作镜像,写穿失败仅记日志、不阻断主流程。 // // 未来接入真实 ChainMaker 时,整体以新的 chain.Client 实现替换即可,业务层零改动。 type PersistentChain struct { *MemoryChain db *sql.DB } var _ Client = (*PersistentChain)(nil) // NewPersistentChain 创建持久化链客户端,并从 PG 水合既有数据。 func NewPersistentChain(db *sql.DB) (*PersistentChain, error) { p := &PersistentChain{MemoryChain: NewMemoryChain(), db: db} if err := p.hydrate(); err != nil { return nil, err } return p, nil } // ---- 写穿(在内存变更成功后镜像到 PG)---- // IssueMA 发码签发:内存绑定成功后镜像内容主记录与全部哈希绑定。 func (p *PersistentChain) IssueMA(role Role, req IssueRequest) (string, error) { tx, err := p.MemoryChain.IssueMA(role, req) if err != nil { return tx, err } c, _ := p.MemoryChain.QueryContent(req.MACode) p.persistContent(c) for _, b := range p.snapshotBindings(req.MACode) { p.persistBinding(b) } p.persistTx(req.ContentTwinID, tx, "issueMA") return tx, nil } // RegisterHashBinding 追加哈希绑定(如转码版):镜像该条绑定。 func (p *PersistentChain) RegisterHashBinding(role Role, b model.HashBinding) (string, error) { tx, err := p.MemoryChain.RegisterHashBinding(role, b) if err != nil { return tx, err } p.persistBinding(b) p.persistTx(b.ContentTwinID, tx, "registerHashBinding") return tx, nil } // RegisterMapping 注册三方映射:镜像该映射(按唯一键幂等)。 func (p *PersistentChain) RegisterMapping(role Role, mp model.Mapping) (string, error) { tx, err := p.MemoryChain.RegisterMapping(role, mp) if err != nil { return tx, err } p.persistMapping(mp) p.persistTx(mp.ContentTwinID, tx, "registerMapping") return tx, nil } // RecordVersionChange 版本变更:镜像到 version_history。 func (p *PersistentChain) RecordVersionChange(vc model.VersionChange) (string, error) { tx, err := p.MemoryChain.RecordVersionChange(vc) if err != nil { return tx, err } p.exec(`INSERT INTO version_history (content_twin_id, version, change_reason, prev_hash, new_hash, reaudit_required, reaudit_status, affected_episode) VALUES ($1,$2,$3,$4,$5,$6,$7,$8)`, vc.ContentTwinID, vc.Version, vc.ChangeReason, vc.PrevHash, vc.NewHash, vc.ReauditRequired, vc.ReauditStatus, vc.AffectedEpisode) return tx, nil } // Revoke 整剧下架:镜像内容状态。 func (p *PersistentChain) Revoke(role Role, maCode, reason string) (MappingsResult, error) { res, err := p.MemoryChain.Revoke(role, maCode, reason) if err != nil { return res, err } p.updateStatus(maCode, model.StatusRevoked) return res, nil } // Restore 整剧恢复上架:镜像内容状态。 func (p *PersistentChain) Restore(role Role, maCode string) error { if err := p.MemoryChain.Restore(role, maCode); err != nil { return err } p.updateStatus(maCode, model.StatusPublished) return nil } // RevokeEpisode 集级下架:镜像该集 revoked 标记。 func (p *PersistentChain) RevokeEpisode(role Role, maCode string, episode int, reason string) error { if err := p.MemoryChain.RevokeEpisode(role, maCode, episode, reason); err != nil { return err } p.updateEpisodeRevoked(maCode, episode, true, reason) return nil } // RestoreEpisode 集级恢复:镜像该集 revoked 标记。 func (p *PersistentChain) RestoreEpisode(role Role, maCode string, episode int) error { if err := p.MemoryChain.RestoreEpisode(role, maCode, episode); err != nil { return err } p.updateEpisodeRevoked(maCode, episode, false, "") return nil } // SetContentStatus 状态流转(入库/发布等):镜像内容状态。 func (p *PersistentChain) SetContentStatus(maCode, status string) error { if err := p.MemoryChain.SetContentStatus(maCode, status); err != nil { return err } p.updateStatus(maCode, status) return nil } // ---- PG 写入小工具(best-effort,失败仅记日志)---- func (p *PersistentChain) exec(q string, args ...any) { if _, err := p.db.Exec(q, args...); err != nil { log.Printf("chain/pg: 写穿失败(忽略,镜像为非权威): %v", err) } } func (p *PersistentChain) persistContent(c model.Content) { var issueDate any if c.IssueDate != "" { issueDate = c.IssueDate } p.exec(`INSERT INTO content_registry (content_twin_id, ma_code, ma_type, title, episode_count, status, issuer, issue_date, created_at) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9) ON CONFLICT (content_twin_id) DO UPDATE SET status=EXCLUDED.status, updated_at=NOW()`, c.ContentTwinID, c.MACode, c.MAType, c.Title, c.EpisodeCount, c.Status, c.Issuer, issueDate, c.CreatedAt) } func (p *PersistentChain) persistBinding(b model.HashBinding) { p.exec(`INSERT INTO hash_binding (content_twin_id, hash_type, hash_value, merkle_root, file_format, resolution, duration, version, parent_hash, episode, revoked, revoked_reason, created_by) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13)`, b.ContentTwinID, string(b.HashType), b.HashValue, b.MerkleRoot, b.FileFormat, b.Resolution, b.Duration, b.Version, b.ParentHash, b.Episode, b.Revoked, b.RevokedReason, b.CreatedBy) } func (p *PersistentChain) persistMapping(mp model.Mapping) { p.exec(`INSERT INTO identity_mapping (content_twin_id, party, party_id, party_name, cdn_endpoint) VALUES ($1,$2,$3,$4,$5) ON CONFLICT (content_twin_id, party, party_id) DO UPDATE SET cdn_endpoint=EXCLUDED.cdn_endpoint`, mp.ContentTwinID, string(mp.Party), mp.PartyID, mp.PartyName, mp.CDNEndpoint) } func (p *PersistentChain) persistTx(ctid, txID, method string) { p.exec(`INSERT INTO chain_tx (content_twin_id, tx_id, method, status) VALUES ($1,$2,$3,'confirmed') ON CONFLICT (tx_id) DO NOTHING`, ctid, txID, method) } func (p *PersistentChain) updateStatus(maCode, status string) { p.exec(`UPDATE content_registry SET status=$1, updated_at=NOW() WHERE ma_code=$2`, status, maCode) } func (p *PersistentChain) updateEpisodeRevoked(maCode string, episode int, revoked bool, reason string) { p.exec(`UPDATE hash_binding hb SET revoked=$1, revoked_reason=$2 FROM content_registry cr WHERE hb.content_twin_id = cr.content_twin_id AND cr.ma_code=$3 AND hb.episode=$4`, revoked, reason, maCode, episode) } // snapshotBindings 复制某 MA 码当前的内存绑定(同包访问,读锁保护)。 func (p *PersistentChain) snapshotBindings(maCode string) []model.HashBinding { p.mu.RLock() defer p.mu.RUnlock() src := p.bindings[maCode] out := make([]model.HashBinding, len(src)) copy(out, src) return out } // ---- 启动水合:从 PG 镜像恢复内存状态 ---- func (p *PersistentChain) hydrate() error { // 1) 内容主表 + 建立 ctid -> maCode 映射 ctidToMA := map[string]string{} rows, err := p.db.Query(`SELECT content_twin_id, ma_code, COALESCE(ma_type,''), title, COALESCE(episode_count,1), status, COALESCE(issuer,''), COALESCE(to_char(issue_date,'YYYY-MM-DD'),''), created_at FROM content_registry`) if err != nil { return err } n := 0 for rows.Next() { var c model.Content if err := rows.Scan(&c.ContentTwinID, &c.MACode, &c.MAType, &c.Title, &c.EpisodeCount, &c.Status, &c.Issuer, &c.IssueDate, &c.CreatedAt); err != nil { rows.Close() return err } p.contents[c.MACode] = c ctidToMA[c.ContentTwinID] = c.MACode n++ } rows.Close() // 2) 哈希绑定(含集级、转码、感知)+ 重建防换壳哈希索引 bRows, err := p.db.Query(`SELECT content_twin_id, hash_type, hash_value, COALESCE(merkle_root,''), COALESCE(file_format,''), COALESCE(resolution,''), COALESCE(duration,0), version, COALESCE(parent_hash,''), episode, revoked, COALESCE(revoked_reason,''), COALESCE(created_by,'') FROM hash_binding ORDER BY id`) if err != nil { return err } for bRows.Next() { var b model.HashBinding var ht string if err := bRows.Scan(&b.ContentTwinID, &ht, &b.HashValue, &b.MerkleRoot, &b.FileFormat, &b.Resolution, &b.Duration, &b.Version, &b.ParentHash, &b.Episode, &b.Revoked, &b.RevokedReason, &b.CreatedBy); err != nil { bRows.Close() return err } b.HashType = model.HashType(ht) ma := ctidToMA[b.ContentTwinID] if ma == "" { continue } p.bindings[ma] = append(p.bindings[ma], b) if (b.HashType == model.HashFile || b.HashType == model.HashTranscoded) && b.HashValue != "" { if _, ok := p.hashIndex[b.HashValue]; !ok { p.hashIndex[b.HashValue] = ma } } } bRows.Close() // 3) 三方映射 mRows, err := p.db.Query(`SELECT content_twin_id, party, party_id, COALESCE(party_name,''), COALESCE(cdn_endpoint,'') FROM identity_mapping ORDER BY id`) if err != nil { return err } for mRows.Next() { var mp model.Mapping var party string if err := mRows.Scan(&mp.ContentTwinID, &party, &mp.PartyID, &mp.PartyName, &mp.CDNEndpoint); err != nil { mRows.Close() return err } mp.Party = model.Party(party) if ma := ctidToMA[mp.ContentTwinID]; ma != "" { p.mappings[ma] = append(p.mappings[ma], mp) } } mRows.Close() // 4) 版本变更 vRows, err := p.db.Query(`SELECT content_twin_id, version, COALESCE(change_reason,''), COALESCE(prev_hash,''), COALESCE(new_hash,''), reaudit_required, COALESCE(reaudit_status,''), COALESCE(affected_episode,0) FROM version_history ORDER BY id`) if err != nil { return err } for vRows.Next() { var vc model.VersionChange if err := vRows.Scan(&vc.ContentTwinID, &vc.Version, &vc.ChangeReason, &vc.PrevHash, &vc.NewHash, &vc.ReauditRequired, &vc.ReauditStatus, &vc.AffectedEpisode); err != nil { vRows.Close() return err } if ma := ctidToMA[vc.ContentTwinID]; ma != "" { p.versions[ma] = append(p.versions[ma], vc) } } vRows.Close() if n > 0 { log.Printf("chain/pg: 已从 PostgreSQL 水合 %d 条内容记录", n) } return nil }