Files
MAcode/tcs-iptv/internal/chain/persistent.go
T
selfrelease 599ccfe1c8 feat(chain): PostgreSQL 持久化链(最小改动·写穿+启动水合)
- internal/chain/persistent.go: PersistentChain 装饰 MemoryChain
  - 复用 MemoryChain 全部业务规则(权限/1:1绑定/防换壳),读走内存
  - 写路径在内存变更成功后写穿 PG 镜像表(content_registry/hash_binding/identity_mapping/version_history/chain_tx)
  - 启动从 PG 水合恢复内存状态,重启不丢数据;PG 为非权威镜像,写穿失败仅记日志
- deploy/migrations/0004_binding_revoked.sql: hash_binding 增加 revoked/revoked_reason 列(集级下架镜像)
- cmd/api-svc/main.go: 共享一个 *sql.DB,PG 可用时启用 PersistentChain+PostgresStore,否则回退内存
- 验证: seed_demo 后内容/映射/哈希落库;重启水合3条内容,resolve/mappings 正常恢复
- 面向未来: 接真实 ChainMaker 时整体替换 chain.Client,业务层零改动
2026-06-14 19:24:38 +08:00

298 lines
10 KiB
Go

package chain
import (
"database/sql"
"log"
"github.com/tcs-iptv/tcs/internal/model"
)
// PersistentChain 在 MemoryChain 之上叠加 PostgreSQL 持久化(最小改动模式)。
//
// 设计要点(面向未来平滑替换真实链):
// - 业务规则(权限、1:1 强绑定、防换壳重发、状态机)全部复用 MemoryChain,单一真相来源;
// - 读路径直接走内存(快);写路径在内存变更成功后「写穿」到 PG 镜像表;
// - 启动时从 PG「水合」恢复内存状态,进程重启不丢数据;
// - 链上为权威数据源的设计不变:PG 仅作镜像,写穿失败仅记日志、不阻断主流程。
//
// 未来接入真实 ChainMaker 时,整体以新的 chain.Client 实现替换即可,业务层零改动。
type PersistentChain struct {
*MemoryChain
db *sql.DB
}
var _ Client = (*PersistentChain)(nil)
// NewPersistentChain 创建持久化链客户端,并从 PG 水合既有数据。
func NewPersistentChain(db *sql.DB) (*PersistentChain, error) {
p := &PersistentChain{MemoryChain: NewMemoryChain(), db: db}
if err := p.hydrate(); err != nil {
return nil, err
}
return p, nil
}
// ---- 写穿(在内存变更成功后镜像到 PG)----
// IssueMA 发码签发:内存绑定成功后镜像内容主记录与全部哈希绑定。
func (p *PersistentChain) IssueMA(role Role, req IssueRequest) (string, error) {
tx, err := p.MemoryChain.IssueMA(role, req)
if err != nil {
return tx, err
}
c, _ := p.MemoryChain.QueryContent(req.MACode)
p.persistContent(c)
for _, b := range p.snapshotBindings(req.MACode) {
p.persistBinding(b)
}
p.persistTx(req.ContentTwinID, tx, "issueMA")
return tx, nil
}
// RegisterHashBinding 追加哈希绑定(如转码版):镜像该条绑定。
func (p *PersistentChain) RegisterHashBinding(role Role, b model.HashBinding) (string, error) {
tx, err := p.MemoryChain.RegisterHashBinding(role, b)
if err != nil {
return tx, err
}
p.persistBinding(b)
p.persistTx(b.ContentTwinID, tx, "registerHashBinding")
return tx, nil
}
// RegisterMapping 注册三方映射:镜像该映射(按唯一键幂等)。
func (p *PersistentChain) RegisterMapping(role Role, mp model.Mapping) (string, error) {
tx, err := p.MemoryChain.RegisterMapping(role, mp)
if err != nil {
return tx, err
}
p.persistMapping(mp)
p.persistTx(mp.ContentTwinID, tx, "registerMapping")
return tx, nil
}
// RecordVersionChange 版本变更:镜像到 version_history。
func (p *PersistentChain) RecordVersionChange(vc model.VersionChange) (string, error) {
tx, err := p.MemoryChain.RecordVersionChange(vc)
if err != nil {
return tx, err
}
p.exec(`INSERT INTO version_history
(content_twin_id, version, change_reason, prev_hash, new_hash, reaudit_required, reaudit_status, affected_episode)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8)`,
vc.ContentTwinID, vc.Version, vc.ChangeReason, vc.PrevHash, vc.NewHash,
vc.ReauditRequired, vc.ReauditStatus, vc.AffectedEpisode)
return tx, nil
}
// Revoke 整剧下架:镜像内容状态。
func (p *PersistentChain) Revoke(role Role, maCode, reason string) (MappingsResult, error) {
res, err := p.MemoryChain.Revoke(role, maCode, reason)
if err != nil {
return res, err
}
p.updateStatus(maCode, model.StatusRevoked)
return res, nil
}
// Restore 整剧恢复上架:镜像内容状态。
func (p *PersistentChain) Restore(role Role, maCode string) error {
if err := p.MemoryChain.Restore(role, maCode); err != nil {
return err
}
p.updateStatus(maCode, model.StatusPublished)
return nil
}
// RevokeEpisode 集级下架:镜像该集 revoked 标记。
func (p *PersistentChain) RevokeEpisode(role Role, maCode string, episode int, reason string) error {
if err := p.MemoryChain.RevokeEpisode(role, maCode, episode, reason); err != nil {
return err
}
p.updateEpisodeRevoked(maCode, episode, true, reason)
return nil
}
// RestoreEpisode 集级恢复:镜像该集 revoked 标记。
func (p *PersistentChain) RestoreEpisode(role Role, maCode string, episode int) error {
if err := p.MemoryChain.RestoreEpisode(role, maCode, episode); err != nil {
return err
}
p.updateEpisodeRevoked(maCode, episode, false, "")
return nil
}
// SetContentStatus 状态流转(入库/发布等):镜像内容状态。
func (p *PersistentChain) SetContentStatus(maCode, status string) error {
if err := p.MemoryChain.SetContentStatus(maCode, status); err != nil {
return err
}
p.updateStatus(maCode, status)
return nil
}
// ---- PG 写入小工具(best-effort,失败仅记日志)----
func (p *PersistentChain) exec(q string, args ...any) {
if _, err := p.db.Exec(q, args...); err != nil {
log.Printf("chain/pg: 写穿失败(忽略,镜像为非权威): %v", err)
}
}
func (p *PersistentChain) persistContent(c model.Content) {
var issueDate any
if c.IssueDate != "" {
issueDate = c.IssueDate
}
p.exec(`INSERT INTO content_registry
(content_twin_id, ma_code, ma_type, title, episode_count, status, issuer, issue_date, created_at)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9)
ON CONFLICT (content_twin_id) DO UPDATE SET status=EXCLUDED.status, updated_at=NOW()`,
c.ContentTwinID, c.MACode, c.MAType, c.Title, c.EpisodeCount, c.Status, c.Issuer, issueDate, c.CreatedAt)
}
func (p *PersistentChain) persistBinding(b model.HashBinding) {
p.exec(`INSERT INTO hash_binding
(content_twin_id, hash_type, hash_value, merkle_root, file_format, resolution, duration, version, parent_hash, episode, revoked, revoked_reason, created_by)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13)`,
b.ContentTwinID, string(b.HashType), b.HashValue, b.MerkleRoot, b.FileFormat, b.Resolution,
b.Duration, b.Version, b.ParentHash, b.Episode, b.Revoked, b.RevokedReason, b.CreatedBy)
}
func (p *PersistentChain) persistMapping(mp model.Mapping) {
p.exec(`INSERT INTO identity_mapping
(content_twin_id, party, party_id, party_name, cdn_endpoint)
VALUES ($1,$2,$3,$4,$5)
ON CONFLICT (content_twin_id, party, party_id) DO UPDATE SET cdn_endpoint=EXCLUDED.cdn_endpoint`,
mp.ContentTwinID, string(mp.Party), mp.PartyID, mp.PartyName, mp.CDNEndpoint)
}
func (p *PersistentChain) persistTx(ctid, txID, method string) {
p.exec(`INSERT INTO chain_tx (content_twin_id, tx_id, method, status)
VALUES ($1,$2,$3,'confirmed') ON CONFLICT (tx_id) DO NOTHING`,
ctid, txID, method)
}
func (p *PersistentChain) updateStatus(maCode, status string) {
p.exec(`UPDATE content_registry SET status=$1, updated_at=NOW() WHERE ma_code=$2`, status, maCode)
}
func (p *PersistentChain) updateEpisodeRevoked(maCode string, episode int, revoked bool, reason string) {
p.exec(`UPDATE hash_binding hb SET revoked=$1, revoked_reason=$2
FROM content_registry cr
WHERE hb.content_twin_id = cr.content_twin_id AND cr.ma_code=$3 AND hb.episode=$4`,
revoked, reason, maCode, episode)
}
// snapshotBindings 复制某 MA 码当前的内存绑定(同包访问,读锁保护)。
func (p *PersistentChain) snapshotBindings(maCode string) []model.HashBinding {
p.mu.RLock()
defer p.mu.RUnlock()
src := p.bindings[maCode]
out := make([]model.HashBinding, len(src))
copy(out, src)
return out
}
// ---- 启动水合:从 PG 镜像恢复内存状态 ----
func (p *PersistentChain) hydrate() error {
// 1) 内容主表 + 建立 ctid -> maCode 映射
ctidToMA := map[string]string{}
rows, err := p.db.Query(`SELECT content_twin_id, ma_code, COALESCE(ma_type,''), title,
COALESCE(episode_count,1), status, COALESCE(issuer,''),
COALESCE(to_char(issue_date,'YYYY-MM-DD'),''), created_at FROM content_registry`)
if err != nil {
return err
}
n := 0
for rows.Next() {
var c model.Content
if err := rows.Scan(&c.ContentTwinID, &c.MACode, &c.MAType, &c.Title,
&c.EpisodeCount, &c.Status, &c.Issuer, &c.IssueDate, &c.CreatedAt); err != nil {
rows.Close()
return err
}
p.contents[c.MACode] = c
ctidToMA[c.ContentTwinID] = c.MACode
n++
}
rows.Close()
// 2) 哈希绑定(含集级、转码、感知)+ 重建防换壳哈希索引
bRows, err := p.db.Query(`SELECT content_twin_id, hash_type, hash_value, COALESCE(merkle_root,''),
COALESCE(file_format,''), COALESCE(resolution,''), COALESCE(duration,0), version,
COALESCE(parent_hash,''), episode, revoked, COALESCE(revoked_reason,''), COALESCE(created_by,'')
FROM hash_binding ORDER BY id`)
if err != nil {
return err
}
for bRows.Next() {
var b model.HashBinding
var ht string
if err := bRows.Scan(&b.ContentTwinID, &ht, &b.HashValue, &b.MerkleRoot, &b.FileFormat,
&b.Resolution, &b.Duration, &b.Version, &b.ParentHash, &b.Episode, &b.Revoked,
&b.RevokedReason, &b.CreatedBy); err != nil {
bRows.Close()
return err
}
b.HashType = model.HashType(ht)
ma := ctidToMA[b.ContentTwinID]
if ma == "" {
continue
}
p.bindings[ma] = append(p.bindings[ma], b)
if (b.HashType == model.HashFile || b.HashType == model.HashTranscoded) && b.HashValue != "" {
if _, ok := p.hashIndex[b.HashValue]; !ok {
p.hashIndex[b.HashValue] = ma
}
}
}
bRows.Close()
// 3) 三方映射
mRows, err := p.db.Query(`SELECT content_twin_id, party, party_id, COALESCE(party_name,''), COALESCE(cdn_endpoint,'')
FROM identity_mapping ORDER BY id`)
if err != nil {
return err
}
for mRows.Next() {
var mp model.Mapping
var party string
if err := mRows.Scan(&mp.ContentTwinID, &party, &mp.PartyID, &mp.PartyName, &mp.CDNEndpoint); err != nil {
mRows.Close()
return err
}
mp.Party = model.Party(party)
if ma := ctidToMA[mp.ContentTwinID]; ma != "" {
p.mappings[ma] = append(p.mappings[ma], mp)
}
}
mRows.Close()
// 4) 版本变更
vRows, err := p.db.Query(`SELECT content_twin_id, version, COALESCE(change_reason,''), COALESCE(prev_hash,''),
COALESCE(new_hash,''), reaudit_required, COALESCE(reaudit_status,''), COALESCE(affected_episode,0)
FROM version_history ORDER BY id`)
if err != nil {
return err
}
for vRows.Next() {
var vc model.VersionChange
if err := vRows.Scan(&vc.ContentTwinID, &vc.Version, &vc.ChangeReason, &vc.PrevHash,
&vc.NewHash, &vc.ReauditRequired, &vc.ReauditStatus, &vc.AffectedEpisode); err != nil {
vRows.Close()
return err
}
if ma := ctidToMA[vc.ContentTwinID]; ma != "" {
p.versions[ma] = append(p.versions[ma], vc)
}
}
vRows.Close()
if n > 0 {
log.Printf("chain/pg: 已从 PostgreSQL 水合 %d 条内容记录", n)
}
return nil
}