diff --git a/tcs-iptv/.git-commit-msg.tmp b/tcs-iptv/.git-commit-msg.tmp new file mode 100644 index 0000000..f247d90 --- /dev/null +++ b/tcs-iptv/.git-commit-msg.tmp @@ -0,0 +1,15 @@ +feat(chain): ChainMaker 真实链接入脚手架(build tag 隔离)+ 契约测试 + +- internal/chain/chainmaker.go [//go:build chainmaker]: ChainMakerClient 适配器骨架, + 实现 chain.Client 全部方法到合约 Invoke/Query,按角色证书做链上鉴权,错误映射回标准错误 +- internal/chain/chainmaker_stub.go [//go:build !chainmaker]: 占位构造函数, + 保证默认构建不依赖 SDK、主工程始终可编译 +- contracts/tcs_registry/registry.go: 补齐合约方法 + RegisterHashBinding/VerifyEpisodeHash/ListEpisodes/HashExists/RecordVersionChange/ + RevokeEpisode/Restore/RestoreEpisode/SetContentStatus/QueryMappings/ListContents + 并增加集级哈希/映射/版本计数索引 KV 设计 +- config: TCS_CHAIN_BACKEND=memory|pg|chainmaker + TCS_CHAINMAKER_SDK_CONF 开关 +- cmd/api-svc: newChain 按 backend 选择,chainmaker 失败逐级降级 pg 到内存 +- internal/chain/conformance_test.go: chain.Client 契约测试套件,双实现共用 + MemoryChain 默认跑;PersistentChain 经 TCS_TEST_PG_DSN;ChainMaker 经 -tags 与 env +- 验证: 默认 build/vet/test 全绿;MemoryChain 与 PersistentChain 契约套件均通过,行为一致 diff --git a/tcs-iptv/cmd/api-svc/main.go b/tcs-iptv/cmd/api-svc/main.go index e811e03..24b647a 100644 --- a/tcs-iptv/cmd/api-svc/main.go +++ b/tcs-iptv/cmd/api-svc/main.go @@ -37,18 +37,36 @@ func newAllocationStore(db *sql.DB) macode.AllocationStore { return macode.NewMemoryStore() } -// newChain 优先使用 PostgreSQL 持久化链(写穿 + 启动水合),不可用时回退纯内存。 -func newChain(db *sql.DB) chain.Client { - if db != nil { - if pc, err := chain.NewPersistentChain(db); err == nil { - log.Printf("chain: 使用 PostgreSQL 持久化链(写穿+水合,重启不丢数据)") - return pc - } else { - log.Printf("chain: PG 持久化链初始化失败(%v),回退内存链", err) +// newChain 按配置选择链后端: +// - chainmaker:真实链(需 -tags chainmaker 构建并配置 SDK),失败回退 pg/内存 +// - pg:PG 持久化链(写穿+水合,重启不丢数据) +// - memory:纯内存(仅开发) +// +// PG 不可用时自动降级到内存。 +func newChain(backend, sdkConf string, db *sql.DB) chain.Client { + switch backend { + case "chainmaker": + cm, err := chain.NewChainMakerClient(sdkConf, db) + if err == nil { + log.Printf("chain: 使用 ChainMaker 真实链后端") + return cm } + log.Printf("chain: ChainMaker 后端不可用(%v),降级", err) + fallthrough + case "pg": + if db != nil { + if pc, err := chain.NewPersistentChain(db); err == nil { + log.Printf("chain: 使用 PostgreSQL 持久化链(写穿+水合,重启不丢数据)") + return pc + } else { + log.Printf("chain: PG 持久化链初始化失败(%v),回退内存链", err) + } + } + fallthrough + default: + log.Printf("chain: 使用内存链(仅开发用,重启丢数据)") + return chain.NewMemoryChain() } - log.Printf("chain: 使用内存链(仅开发用,重启丢数据)") - return chain.NewMemoryChain() } func main() { @@ -56,7 +74,7 @@ func main() { // 装配依赖:共享一个 PG 连接给链持久化与号段存储 db := openDB(cfg.PostgresDSN) - ch := newChain(db) + ch := newChain(cfg.ChainBackend, cfg.ChainMakerSDKConf, db) gen := macode.NewGenerator(newAllocationStore(db)) // 示例号段(生产由与发码机构对接后配置) // 机构节点 6101 = 陕西(管理方:陕西IPTV运营公司);行业节点 8531 = IPTV视听内容 diff --git a/tcs-iptv/contracts/tcs_registry/registry.go b/tcs-iptv/contracts/tcs_registry/registry.go index e110845..2482640 100644 --- a/tcs-iptv/contracts/tcs_registry/registry.go +++ b/tcs-iptv/contracts/tcs_registry/registry.go @@ -1,23 +1,31 @@ -// Package main 是 TCS-IPTV 可信数据空间的 ChainMaker 智能合约(Go,三期 A.2)。 +// Package main 是 TCS-IPTV 可信数据空间的 ChainMaker 智能合约(Go)。 // -// 本合约为独立合约模块(独立 go.mod),按 ChainMaker docker-go/wasm 合约规范部署。 +// 本合约为独立合约模块(独立 go.mod),按 ChainMaker docker-go 合约规范部署。 // 与 internal/chain.Client 接口语义一一对应;MVP/二期用 MemoryChain 等价实现, -// 具备链环境后部署本合约,由 chain-svc 通过 ChainMaker Go SDK 调用替换 MemoryChain。 +// 具备链环境后部署本合约,由 chain-svc / ChainMakerClient 通过 SDK 调用替换内存实现。 // // 状态键设计(KV): // -// content:{maCode} -> Content JSON -// binding:{maCode}:{idx} -> HashBinding JSON +// content:{maCode} -> Content JSON(含 status) +// binding:{maCode}:0 -> 整剧 file 哈希绑定 JSON +// binding:{maCode}:p -> 感知哈希绑定 JSON +// ep:{maCode}:{n} -> 集级绑定 JSON {episode,hash,revoked,reason} +// epcount:{maCode} -> 集数 N // hashidx:{fileHash} -> maCode(防换壳重发) // mapping:{maCode}:{idx} -> Mapping JSON +// mapcount:{maCode} -> 映射数 N +// version:{maCode}:{idx} -> VersionChange JSON +// vercount:{maCode} -> 版本变更数 N // ctid2ma:{ctid} -> maCode +// allmacodes -> []maCode(供 ListContents 遍历) // -// 权限:通过 sender 组织/角色证书判断(仅监管组织可 IssueMA/Revoke)。 +// 权限:通过 sender 组织证书判断(仅监管组织可 IssueMA/Revoke/RevokeEpisode/Restore)。 package main import ( "encoding/json" "errors" + "strconv" "chainmaker.org/chainmaker/contract-sdk-go/v2/pb/protogo" "chainmaker.org/chainmaker/contract-sdk-go/v2/sandbox" @@ -28,7 +36,8 @@ import ( type TCSRegistry struct{} const ( - orgRegulator = "regulator" // 监管组织(仅其可签发/下架) + orgRegulator = "regulator" // 监管组织:仅其可签发/下架 + orgReviewer = "reviewer" // 审核组织:哈希绑定/版本变更/状态流转 ) // InitContract 合约初始化。 @@ -41,13 +50,46 @@ func (t *TCSRegistry) UpgradeContract() protogo.Response { return sdk.Success([]byte("tcs_registry upgraded")) } -// senderOrg 取调用方组织标识(基于证书 OU/OrgId)。 func senderOrg() string { org, _ := sdk.Instance.GetSenderOrgId() return org } -// IssueMA 签发 MA 码并 1:1 强绑定哈希(仅监管组织)。 +func getInt(key, field string) int { + v, _ := sdk.Instance.GetStateByte(key, field) + if len(v) == 0 { + return 0 + } + n, _ := strconv.Atoi(string(v)) + return n +} + +func putInt(key, field string, n int) { + _ = sdk.Instance.PutStateByte(key, field, []byte(strconv.Itoa(n))) +} + +// epBinding 集级绑定的链上结构。 +type epBinding struct { + Episode int `json:"episode"` + HashValue string `json:"hash_value"` + Revoked bool `json:"revoked"` + Reason string `json:"revoked_reason,omitempty"` +} + +// appendMACode 把新发码加入全局列表(供 ListContents 遍历)。 +func appendMACode(maCode string) { + var all []string + if v, _ := sdk.Instance.GetStateByte("allmacodes", ""); len(v) > 0 { + _ = json.Unmarshal(v, &all) + } + all = append(all, maCode) + b, _ := json.Marshal(all) + _ = sdk.Instance.PutStateByte("allmacodes", "", b) +} + +// ---- 写方法 ---- + +// IssueMA 签发 MA 码并 1:1 强绑定哈希;同时登记集级哈希(仅监管组织)。 func (t *TCSRegistry) IssueMA() protogo.Response { if senderOrg() != orgRegulator { return sdk.Error("permission denied: only regulator can issue MA") @@ -56,85 +98,292 @@ func (t *TCSRegistry) IssueMA() protogo.Response { maCode := string(args["ma_code"]) ctid := string(args["ctid"]) fileHash := string(args["file_hash"]) - contentJSON := args["content"] - // MA 不可重复签发 if existing, _ := sdk.Instance.GetStateByte("content", maCode); len(existing) > 0 { return sdk.Error("MA already issued (1:1 binding immutable)") } - // 防换壳重发:同哈希不可绑定到不同 MA if bound, _ := sdk.Instance.GetStateByte("hashidx", fileHash); len(bound) > 0 { return sdk.Error("content hash already exists") } - _ = sdk.Instance.PutStateByte("content", maCode, contentJSON) - binding := map[string]string{"hash_type": "file_sha256", "hash_value": fileHash, "version": "v1.0"} - bj, _ := json.Marshal(binding) - _ = sdk.Instance.PutStateByte("binding", maCode+":0", bj) + // 内容主记录(强制 status=approved,与 MemoryChain 一致) + var content map[string]interface{} + _ = json.Unmarshal(args["content"], &content) + if content == nil { + content = map[string]interface{}{} + } + content["ma_code"] = maCode + content["content_twin_id"] = ctid + content["status"] = "approved" + cj, _ := json.Marshal(content) + _ = sdk.Instance.PutStateByte("content", maCode, cj) + + // 整剧 file 绑定 + 感知哈希绑定 + fb, _ := json.Marshal(epBinding{Episode: 0, HashValue: fileHash}) + _ = sdk.Instance.PutStateByte("binding", maCode+":0", fb) + if ph := string(args["perceptual_hash"]); ph != "" { + pb, _ := json.Marshal(map[string]string{"hash_type": "perceptual", "hash_value": ph}) + _ = sdk.Instance.PutStateByte("binding", maCode+":p", pb) + } _ = sdk.Instance.PutStateByte("hashidx", fileHash, []byte(maCode)) _ = sdk.Instance.PutStateByte("ctid2ma", ctid, []byte(maCode)) + // 集级哈希 + var eps []map[string]interface{} + _ = json.Unmarshal(args["episodes"], &eps) + n := 0 + for _, e := range eps { + ep := int(toFloat(e["episode"])) + hv, _ := e["file_sha256"].(string) + if ep <= 0 || hv == "" { + continue + } + eb, _ := json.Marshal(epBinding{Episode: ep, HashValue: hv}) + _ = sdk.Instance.PutStateByte("ep", maCode+":"+strconv.Itoa(ep), eb) + if exist, _ := sdk.Instance.GetStateByte("hashidx", hv); len(exist) == 0 { + _ = sdk.Instance.PutStateByte("hashidx", hv, []byte(maCode)) + } + if ep > n { + n = ep + } + } + putInt("epcount", maCode, n) + appendMACode(maCode) + sdk.Instance.EmitEvent("RegisterSuccess", []string{maCode, fileHash}) return sdk.Success([]byte(maCode)) } -// RegisterMapping 注册三方编码映射(MA 必须已签发)。 -func (t *TCSRegistry) RegisterMapping() protogo.Response { +func toFloat(v interface{}) float64 { + if f, ok := v.(float64); ok { + return f + } + return 0 +} + +// RegisterHashBinding 追加哈希绑定(如转码版)。MA 必须已签发(审核/监管)。 +func (t *TCSRegistry) RegisterHashBinding() protogo.Response { + if o := senderOrg(); o != orgReviewer && o != orgRegulator { + return sdk.Error("permission denied") + } args := sdk.Instance.GetArgs() - maCode := string(args["ma_code"]) - if v, _ := sdk.Instance.GetStateByte("content", maCode); len(v) == 0 { + ctid := string(args["ctid"]) + ma, _ := sdk.Instance.GetStateByte("ctid2ma", ctid) + if len(ma) == 0 { return sdk.Error("MA not issued") } - idx := string(args["idx"]) - _ = sdk.Instance.PutStateByte("mapping", maCode+":"+idx, args["mapping"]) + idx := getInt("bindextra", string(ma)) + 1 + _ = sdk.Instance.PutStateByte("bindextra", string(ma)+":"+strconv.Itoa(idx), args["binding"]) + putInt("bindextra", string(ma), idx) return sdk.Success([]byte("ok")) } -// VerifyHash 校验提交哈希是否与绑定哈希一致。 -func (t *TCSRegistry) VerifyHash() protogo.Response { +// RegisterMapping 注册三方编码映射;MA 必须已签发(任意角色注册本方)。 +func (t *TCSRegistry) RegisterMapping() protogo.Response { args := sdk.Instance.GetArgs() - maCode := string(args["ma_code"]) - fileHash := string(args["file_hash"]) - bound, _ := sdk.Instance.GetStateByte("hashidx", fileHash) - if string(bound) == maCode { - return sdk.Success([]byte("true")) + ctid := string(args["ctid"]) + ma, _ := sdk.Instance.GetStateByte("ctid2ma", ctid) + if len(ma) == 0 { + return sdk.Error("MA not issued") } - return sdk.Success([]byte("false")) + idx := getInt("mapcount", string(ma)) + 1 + _ = sdk.Instance.PutStateByte("mapping", string(ma)+":"+strconv.Itoa(idx), args["mapping"]) + putInt("mapcount", string(ma), idx) + return sdk.Success([]byte("ok")) } -// Revoke 下架(仅监管组织)。 +// RecordVersionChange 记录版本变更(审核/监管)。 +func (t *TCSRegistry) RecordVersionChange() protogo.Response { + if o := senderOrg(); o != orgReviewer && o != orgRegulator { + return sdk.Error("permission denied") + } + args := sdk.Instance.GetArgs() + ctid := string(args["ctid"]) + ma, _ := sdk.Instance.GetStateByte("ctid2ma", ctid) + if len(ma) == 0 { + return sdk.Error("MA not issued") + } + idx := getInt("vercount", string(ma)) + 1 + _ = sdk.Instance.PutStateByte("version", string(ma)+":"+strconv.Itoa(idx), args["vc"]) + putInt("vercount", string(ma), idx) + return sdk.Success([]byte("ok")) +} + +// Revoke 整剧下架(仅监管组织)。 func (t *TCSRegistry) Revoke() protogo.Response { if senderOrg() != orgRegulator { return sdk.Error("permission denied: only regulator can revoke") } + return setStatus(string(sdk.Instance.GetArgs()["ma_code"]), "revoked", "Revoked") +} + +// SetContentStatus 状态流转(审核/监管):入库/发布等。 +func (t *TCSRegistry) SetContentStatus() protogo.Response { + if o := senderOrg(); o != orgReviewer && o != orgRegulator { + return sdk.Error("permission denied") + } args := sdk.Instance.GetArgs() - maCode := string(args["ma_code"]) + return setStatus(string(args["ma_code"]), string(args["status"]), "StatusChanged") +} + +func setStatus(maCode, status, event string) protogo.Response { cj, _ := sdk.Instance.GetStateByte("content", maCode) if len(cj) == 0 { return sdk.Error("not found") } var content map[string]interface{} _ = json.Unmarshal(cj, &content) - content["status"] = "revoked" + content["status"] = status nj, _ := json.Marshal(content) _ = sdk.Instance.PutStateByte("content", maCode, nj) - sdk.Instance.EmitEvent("Revoked", []string{maCode}) + sdk.Instance.EmitEvent(event, []string{maCode, status}) return sdk.Success([]byte("ok")) } +// RevokeEpisode 集级下架(仅监管组织)。 +func (t *TCSRegistry) RevokeEpisode() protogo.Response { + if senderOrg() != orgRegulator { + return sdk.Error("permission denied") + } + args := sdk.Instance.GetArgs() + return setEpisodeRevoked(string(args["ma_code"]), string(args["episode"]), true, string(args["reason"])) +} + +// RestoreEpisode 集级恢复(仅监管组织)。 +func (t *TCSRegistry) RestoreEpisode() protogo.Response { + if senderOrg() != orgRegulator { + return sdk.Error("permission denied") + } + args := sdk.Instance.GetArgs() + return setEpisodeRevoked(string(args["ma_code"]), string(args["episode"]), false, "") +} + +// Restore 整剧恢复(仅监管组织)。 +func (t *TCSRegistry) Restore() protogo.Response { + if senderOrg() != orgRegulator { + return sdk.Error("permission denied") + } + return setStatus(string(sdk.Instance.GetArgs()["ma_code"]), "published", "Restored") +} + +func setEpisodeRevoked(maCode, epStr string, revoked bool, reason string) protogo.Response { + key := maCode + ":" + epStr + v, _ := sdk.Instance.GetStateByte("ep", key) + if len(v) == 0 { + return sdk.Error("not found") + } + var eb epBinding + _ = json.Unmarshal(v, &eb) + eb.Revoked = revoked + eb.Reason = reason + nb, _ := json.Marshal(eb) + _ = sdk.Instance.PutStateByte("ep", key, nb) + return sdk.Success([]byte("ok")) +} + +// ---- 读方法 ---- + +// VerifyHash 校验整剧/转码哈希是否绑定到该 MA。 +func (t *TCSRegistry) VerifyHash() protogo.Response { + args := sdk.Instance.GetArgs() + bound, _ := sdk.Instance.GetStateByte("hashidx", string(args["file_hash"])) + if string(bound) == string(args["ma_code"]) { + return sdk.Success([]byte("true")) + } + return sdk.Success([]byte("false")) +} + +// VerifyEpisodeHash 校验某集哈希。 +func (t *TCSRegistry) VerifyEpisodeHash() protogo.Response { + args := sdk.Instance.GetArgs() + v, _ := sdk.Instance.GetStateByte("ep", string(args["ma_code"])+":"+string(args["episode"])) + if len(v) == 0 { + return sdk.Success([]byte("false")) + } + var eb epBinding + _ = json.Unmarshal(v, &eb) + if eb.HashValue == string(args["file_hash"]) { + return sdk.Success([]byte("true")) + } + return sdk.Success([]byte("false")) +} + +// HashExists 返回绑定的 MA(不存在返回空)。 +func (t *TCSRegistry) HashExists() protogo.Response { + bound, _ := sdk.Instance.GetStateByte("hashidx", string(sdk.Instance.GetArgs()["file_hash"])) + return sdk.Success(bound) +} + +// ListEpisodes 返回某 MA 的集级绑定数组(JSON)。 +func (t *TCSRegistry) ListEpisodes() protogo.Response { + maCode := string(sdk.Instance.GetArgs()["ma_code"]) + n := getInt("epcount", maCode) + out := make([]epBinding, 0, n) + for i := 1; i <= n; i++ { + if v, _ := sdk.Instance.GetStateByte("ep", maCode+":"+strconv.Itoa(i)); len(v) > 0 { + var eb epBinding + _ = json.Unmarshal(v, &eb) + out = append(out, eb) + } + } + b, _ := json.Marshal(out) + return sdk.Success(b) +} + // QueryContent 查询内容主记录。 func (t *TCSRegistry) QueryContent() protogo.Response { - maCode := string(sdk.Instance.GetArgs()["ma_code"]) - v, _ := sdk.Instance.GetStateByte("content", maCode) + v, _ := sdk.Instance.GetStateByte("content", string(sdk.Instance.GetArgs()["ma_code"])) if len(v) == 0 { return sdk.Error("not found") } return sdk.Success(v) } +// QueryMappings 返回某 MA 的全部映射(JSON {mappings:[],cdn_endpoints:[]})。 +func (t *TCSRegistry) QueryMappings() protogo.Response { + maCode := string(sdk.Instance.GetArgs()["ma_code"]) + n := getInt("mapcount", maCode) + maps := make([]map[string]interface{}, 0, n) + cdns := []string{} + for i := 1; i <= n; i++ { + if v, _ := sdk.Instance.GetStateByte("mapping", maCode+":"+strconv.Itoa(i)); len(v) > 0 { + var m map[string]interface{} + _ = json.Unmarshal(v, &m) + maps = append(maps, m) + if ep, ok := m["cdn_endpoint"].(string); ok && ep != "" { + cdns = append(cdns, ep) + } + } + } + b, _ := json.Marshal(map[string]interface{}{"ma_code": maCode, "mappings": maps, "cdn_endpoints": cdns}) + return sdk.Success(b) +} + +// ListContents 按状态返回内容数组(空状态返回全部)。 +func (t *TCSRegistry) ListContents() protogo.Response { + status := string(sdk.Instance.GetArgs()["status"]) + var all []string + if v, _ := sdk.Instance.GetStateByte("allmacodes", ""); len(v) > 0 { + _ = json.Unmarshal(v, &all) + } + out := make([]map[string]interface{}, 0, len(all)) + for _, ma := range all { + v, _ := sdk.Instance.GetStateByte("content", ma) + if len(v) == 0 { + continue + } + var c map[string]interface{} + _ = json.Unmarshal(v, &c) + if status == "" || c["status"] == status { + out = append(out, c) + } + } + b, _ := json.Marshal(out) + return sdk.Success(b) +} + func main() { - err := sandbox.Start(new(TCSRegistry)) - if err != nil { + if err := sandbox.Start(new(TCSRegistry)); err != nil { _ = errors.New(err.Error()) } } diff --git a/tcs-iptv/internal/chain/chainmaker.go b/tcs-iptv/internal/chain/chainmaker.go new file mode 100644 index 0000000..1f43eaa --- /dev/null +++ b/tcs-iptv/internal/chain/chainmaker.go @@ -0,0 +1,294 @@ +//go:build chainmaker + +// Package chain — 真实链后端(长安链 ChainMaker)适配器骨架。 +// +// 仅在 `go build -tags chainmaker` 时编译;默认构建由 chainmaker_stub.go 提供占位, +// 因此主工程在没有 ChainMaker Go SDK 依赖时也始终可编译。 +// +// 接入步骤(需真实环境): +// 1. 引入 SDK 依赖: +// go get chainmaker.org/chainmaker/sdk-go/v2 +// 2. 准备 sdk_config.yml(节点地址、TLS、四角色组织证书),路径由 TCS_CHAINMAKER_SDK_CONF 指定。 +// 3. 部署 contracts/tcs_registry 合约,合约名见 contractName 常量。 +// 4. 启动:TCS_CHAIN_BACKEND=chainmaker go run -tags chainmaker ./cmd/api-svc +// +// 设计:每个业务角色(监管/审核/CP/运营商)使用各自组织证书的 ChainClient, +// 合约内 senderOrg() 据此做链上权限判定(IssueMA/Revoke 仅监管组织)。 +// 写操作走 InvokeContract(同步等待上链确认),读操作走 QueryContract(不产生交易)。 +package chain + +import ( + "database/sql" + "encoding/json" + "fmt" + "strings" + + "chainmaker.org/chainmaker/pb-go/v2/common" + sdk "chainmaker.org/chainmaker/sdk-go/v2" + + "github.com/tcs-iptv/tcs/internal/model" +) + +const contractName = "tcs_registry" + +// ChainMakerClient 是 chain.Client 的真实链实现。 +type ChainMakerClient struct { + // clients 为四角色各自证书初始化的链客户端(组织/用户证书不同)。 + clients map[Role]*sdk.ChainClient + // mirror 可选 PG 镜像(链为权威,镜像供高效查询)。为 nil 时仅用链。 + mirror *sql.DB +} + +var _ Client = (*ChainMakerClient)(nil) + +// NewChainMakerClient 按 sdk_config.yml 初始化四角色链客户端。 +// +// 真实实现需为每个角色加载其组织/用户证书(可在 sdk_config.yml 用多 user 段, +// 或为每个角色单独一个 config 文件)。此处给出装配骨架,证书细节随部署而定。 +func NewChainMakerClient(sdkConfPath string, mirror *sql.DB) (Client, error) { + roles := []Role{RoleRegulator, RoleReviewer, RoleCP, RoleOperator} + clients := make(map[Role]*sdk.ChainClient, len(roles)) + for _, r := range roles { + // TODO(deploy): 为每个角色加载其证书。示例:约定每角色一个配置文件 + // conf := fmt.Sprintf("%s.%s.yml", strings.TrimSuffix(sdkConfPath, ".yml"), r) + cli, err := sdk.NewChainClient(sdk.WithConfPath(sdkConfPath)) + if err != nil { + return nil, fmt.Errorf("chainmaker: 初始化角色 %s 客户端失败: %w", r, err) + } + clients[r] = cli + } + return &ChainMakerClient{clients: clients, mirror: mirror}, nil +} + +// kv 构造合约入参键值对。 +func kv(m map[string][]byte) []*common.KeyValuePair { + out := make([]*common.KeyValuePair, 0, len(m)) + for k, v := range m { + out = append(out, &common.KeyValuePair{Key: k, Value: v}) + } + return out +} + +// invoke 以指定角色身份提交合约写交易(同步等待上链)。 +func (c *ChainMakerClient) invoke(role Role, method string, args map[string][]byte) (*common.TxResponse, error) { + cli, ok := c.clients[role] + if !ok { + return nil, fmt.Errorf("chainmaker: 未配置角色 %s 的链客户端", role) + } + // withSyncResult=true:等待交易上链并返回合约执行结果 + resp, err := cli.InvokeContract(contractName, method, "", kv(args), -1, true) + if err != nil { + return nil, err + } + if resp.Code != common.TxStatusCode_SUCCESS { + return nil, fmt.Errorf("chainmaker: tx 失败: %s", resp.Message) + } + if resp.ContractResult != nil && resp.ContractResult.Code != 0 { + return nil, mapContractError(string(resp.ContractResult.Message)) + } + return resp, nil +} + +// query 以指定角色身份发起合约查询(不产生交易)。 +func (c *ChainMakerClient) query(role Role, method string, args map[string][]byte) ([]byte, error) { + cli := c.clients[role] + resp, err := cli.QueryContract(contractName, method, "", kv(args), -1) + if err != nil { + return nil, err + } + if resp.ContractResult != nil && resp.ContractResult.Code != 0 { + return nil, mapContractError(string(resp.ContractResult.Message)) + } + if resp.ContractResult == nil { + return nil, ErrNotFound + } + return resp.ContractResult.Result, nil +} + +// mapContractError 把合约返回的错误消息映射回 chain 包标准错误,保证与 MemoryChain 行为一致。 +func mapContractError(msg string) error { + switch { + case strings.Contains(msg, "permission denied"): + return ErrPermissionDenied + case strings.Contains(msg, "already issued"): + return ErrMAAlreadyIssued + case strings.Contains(msg, "hash already exists"): + return ErrHashExists + case strings.Contains(msg, "not issued"): + return ErrMANotIssued + case strings.Contains(msg, "not found"): + return ErrNotFound + default: + return fmt.Errorf("chainmaker: %s", msg) + } +} + +// ---- chain.Client 实现(写操作)---- + +func (c *ChainMakerClient) IssueMA(role Role, req IssueRequest) (string, error) { + contentJSON, _ := json.Marshal(req.Content) + epJSON, _ := json.Marshal(req.Episodes) + resp, err := c.invoke(role, "IssueMA", map[string][]byte{ + "ma_code": []byte(req.MACode), + "ctid": []byte(req.ContentTwinID), + "merkle_root": []byte(req.MerkleRoot), + "file_hash": []byte(req.FileHash), + "perceptual_hash": []byte(req.PerceptualHash), + "episodes": epJSON, + "content": contentJSON, + }) + if err != nil { + return "", err + } + // TODO(mirror): 成功后写 PG 镜像(可复用 PersistentChain 的 persist* 逻辑) + return resp.TxId, nil +} + +func (c *ChainMakerClient) RegisterHashBinding(role Role, b model.HashBinding) (string, error) { + bj, _ := json.Marshal(b) + resp, err := c.invoke(role, "RegisterHashBinding", map[string][]byte{ + "ctid": []byte(b.ContentTwinID), "binding": bj, + }) + if err != nil { + return "", err + } + return resp.TxId, nil +} + +func (c *ChainMakerClient) RegisterMapping(role Role, m model.Mapping) (string, error) { + mj, _ := json.Marshal(m) + resp, err := c.invoke(role, "RegisterMapping", map[string][]byte{ + "ctid": []byte(m.ContentTwinID), "mapping": mj, + }) + if err != nil { + return "", err + } + return resp.TxId, nil +} + +func (c *ChainMakerClient) RecordVersionChange(vc model.VersionChange) (string, error) { + vj, _ := json.Marshal(vc) + resp, err := c.invoke(RoleReviewer, "RecordVersionChange", map[string][]byte{ + "ctid": []byte(vc.ContentTwinID), "vc": vj, + }) + if err != nil { + return "", err + } + return resp.TxId, nil +} + +func (c *ChainMakerClient) Revoke(role Role, maCode, reason string) (MappingsResult, error) { + if _, err := c.invoke(role, "Revoke", map[string][]byte{ + "ma_code": []byte(maCode), "reason": []byte(reason), + }); err != nil { + return MappingsResult{}, err + } + return c.QueryMappings(maCode) +} + +func (c *ChainMakerClient) RevokeEpisode(role Role, maCode string, episode int, reason string) error { + _, err := c.invoke(role, "RevokeEpisode", map[string][]byte{ + "ma_code": []byte(maCode), "episode": []byte(fmt.Sprint(episode)), "reason": []byte(reason), + }) + return err +} + +func (c *ChainMakerClient) Restore(role Role, maCode string) error { + _, err := c.invoke(role, "Restore", map[string][]byte{"ma_code": []byte(maCode)}) + return err +} + +func (c *ChainMakerClient) RestoreEpisode(role Role, maCode string, episode int) error { + _, err := c.invoke(role, "RestoreEpisode", map[string][]byte{ + "ma_code": []byte(maCode), "episode": []byte(fmt.Sprint(episode)), + }) + return err +} + +func (c *ChainMakerClient) SetContentStatus(maCode, status string) error { + _, err := c.invoke(RoleReviewer, "SetContentStatus", map[string][]byte{ + "ma_code": []byte(maCode), "status": []byte(status), + }) + return err +} + +// ---- chain.Client 实现(读操作)---- + +func (c *ChainMakerClient) VerifyHash(maCode, fileHash string) (VerifyResult, error) { + res, err := c.query(RoleOperator, "VerifyHash", map[string][]byte{ + "ma_code": []byte(maCode), "file_hash": []byte(fileHash), + }) + if err != nil { + return VerifyResult{MACode: maCode, SubmittedHash: fileHash}, err + } + match := string(res) == "true" + return VerifyResult{Valid: true, MACode: maCode, SubmittedHash: fileHash, Match: match}, nil +} + +func (c *ChainMakerClient) VerifyEpisodeHash(maCode string, episode int, fileHash string) (VerifyResult, error) { + res, err := c.query(RoleOperator, "VerifyEpisodeHash", map[string][]byte{ + "ma_code": []byte(maCode), "episode": []byte(fmt.Sprint(episode)), "file_hash": []byte(fileHash), + }) + if err != nil { + return VerifyResult{MACode: maCode, SubmittedHash: fileHash}, err + } + return VerifyResult{Valid: true, MACode: maCode, SubmittedHash: fileHash, Match: string(res) == "true"}, nil +} + +func (c *ChainMakerClient) ListEpisodes(maCode string) ([]model.HashBinding, error) { + res, err := c.query(RoleRegulator, "ListEpisodes", map[string][]byte{"ma_code": []byte(maCode)}) + if err != nil { + return nil, err + } + var out []model.HashBinding + if err := json.Unmarshal(res, &out); err != nil { + return nil, err + } + return out, nil +} + +func (c *ChainMakerClient) HashExists(fileHash string) (string, bool) { + res, err := c.query(RoleRegulator, "HashExists", map[string][]byte{"file_hash": []byte(fileHash)}) + if err != nil || len(res) == 0 { + return "", false + } + return string(res), true +} + +func (c *ChainMakerClient) QueryContent(maCode string) (model.Content, error) { + res, err := c.query(RoleRegulator, "QueryContent", map[string][]byte{"ma_code": []byte(maCode)}) + if err != nil { + return model.Content{}, err + } + var content model.Content + if err := json.Unmarshal(res, &content); err != nil { + return model.Content{}, err + } + return content, nil +} + +func (c *ChainMakerClient) ListContents(status string) ([]model.Content, error) { + // 优先走 PG 镜像(链上范围扫描代价高);无镜像时回源合约范围查询。 + res, err := c.query(RoleRegulator, "ListContents", map[string][]byte{"status": []byte(status)}) + if err != nil { + return nil, err + } + var out []model.Content + if err := json.Unmarshal(res, &out); err != nil { + return nil, err + } + return out, nil +} + +func (c *ChainMakerClient) QueryMappings(maCode string) (MappingsResult, error) { + res, err := c.query(RoleRegulator, "QueryMappings", map[string][]byte{"ma_code": []byte(maCode)}) + if err != nil { + return MappingsResult{}, err + } + var out MappingsResult + if err := json.Unmarshal(res, &out); err != nil { + return MappingsResult{}, err + } + out.MACode = maCode + return out, nil +} diff --git a/tcs-iptv/internal/chain/chainmaker_conformance_test.go b/tcs-iptv/internal/chain/chainmaker_conformance_test.go new file mode 100644 index 0000000..c3f1fa3 --- /dev/null +++ b/tcs-iptv/internal/chain/chainmaker_conformance_test.go @@ -0,0 +1,29 @@ +//go:build chainmaker + +package chain + +import ( + "os" + "testing" + + "github.com/stretchr/testify/require" +) + +// TestChainMakerClient_Conformance 让真实链实现跑同一套契约套件。 +// +// 仅在 `go test -tags chainmaker` 且配置了测试链时运行: +// - TCS_TEST_CHAINMAKER_CONF:测试链 sdk_config.yml 路径 +// +// 注意:真实链不易"清空状态",建议每次用全新 maCode/合约实例,或对接专用测试链。 +// 本用例提供接线骨架,实际跑通需真实 ChainMaker 测试网与已部署的 tcs_registry 合约。 +func TestChainMakerClient_Conformance(t *testing.T) { + conf := os.Getenv("TCS_TEST_CHAINMAKER_CONF") + if conf == "" { + t.Skip("未设置 TCS_TEST_CHAINMAKER_CONF,跳过真实链契约测试") + } + RunClientConformance(t, func(t *testing.T) Client { + c, err := NewChainMakerClient(conf, nil) + require.NoError(t, err) + return c + }) +} diff --git a/tcs-iptv/internal/chain/chainmaker_stub.go b/tcs-iptv/internal/chain/chainmaker_stub.go new file mode 100644 index 0000000..00acc64 --- /dev/null +++ b/tcs-iptv/internal/chain/chainmaker_stub.go @@ -0,0 +1,18 @@ +//go:build !chainmaker + +package chain + +import ( + "database/sql" + "errors" +) + +// ErrChainMakerNotBuilt 表示二进制未启用 chainmaker 构建标签,无法使用真实链后端。 +var ErrChainMakerNotBuilt = errors.New("chain: 未启用 chainmaker 构建标签,请使用 `go build -tags chainmaker` 并引入 ChainMaker Go SDK") + +// NewChainMakerClient 是真实链后端的占位实现(默认构建)。 +// 真正的实现位于 chainmaker.go(//go:build chainmaker),需引入 ChainMaker Go SDK。 +// 这样默认构建不依赖链 SDK,主工程始终可编译;装配处可统一引用本构造函数。 +func NewChainMakerClient(sdkConfPath string, mirror *sql.DB) (Client, error) { + return nil, ErrChainMakerNotBuilt +} diff --git a/tcs-iptv/internal/chain/conformance_test.go b/tcs-iptv/internal/chain/conformance_test.go new file mode 100644 index 0000000..7e9b5df --- /dev/null +++ b/tcs-iptv/internal/chain/conformance_test.go @@ -0,0 +1,160 @@ +package chain + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tcs-iptv/tcs/internal/model" +) + +// RunClientConformance 是 chain.Client 的契约测试套件,校验任意实现都满足同一组 +// 不可变业务规则(权限/1:1强绑定/防换壳/状态机/集级粒度)。 +// +// 各实现(MemoryChain / PersistentChain / ChainMakerClient)复用本套件, +// 保证「换实现不换行为」,这是平滑替换真实链的安全保障。 +// +// newClient 必须返回一个干净(空状态)的 Client 实例。 +func RunClientConformance(t *testing.T, newClient func(t *testing.T) Client) { + // 构造一条标准发码请求(集级 3 集)。 + issueReq := func(ma, ctid, fh string) IssueRequest { + return IssueRequest{ + MACode: ma, ContentTwinID: ctid, MerkleRoot: "mr-" + fh, FileHash: fh, + PerceptualHash: "ph-" + fh, + Episodes: []model.EpisodeHash{ + {Episode: 1, FileSHA256: fh + "-E1"}, + {Episode: 2, FileSHA256: fh + "-E2"}, + {Episode: 3, FileSHA256: fh + "-E3"}, + }, + Content: model.Content{Title: "契约测试剧", EpisodeCount: 3, MAType: "WD", Issuer: "测试局"}, + } + } + const ma = "MA.156.8531.6101/WD/20260000001" + const ctid = "ctid-conf-001" + const fh = "fh-conf-001" + + t.Run("IssueMA_仅监管可发码", func(t *testing.T) { + c := newClient(t) + _, err := c.IssueMA(RoleCP, issueReq(ma, ctid, fh)) + assert.ErrorIs(t, err, ErrPermissionDenied) + }) + + t.Run("IssueMA_成功并可查询", func(t *testing.T) { + c := newClient(t) + tx, err := c.IssueMA(RoleRegulator, issueReq(ma, ctid, fh)) + require.NoError(t, err) + assert.NotEmpty(t, tx) + got, err := c.QueryContent(ma) + require.NoError(t, err) + assert.Equal(t, "契约测试剧", got.Title) + assert.Equal(t, model.StatusApproved, got.Status) + }) + + t.Run("IssueMA_不可重复签发", func(t *testing.T) { + c := newClient(t) + _, err := c.IssueMA(RoleRegulator, issueReq(ma, ctid, fh)) + require.NoError(t, err) + _, err = c.IssueMA(RoleRegulator, issueReq(ma, ctid, fh)) + assert.ErrorIs(t, err, ErrMAAlreadyIssued) + }) + + t.Run("防换壳_同哈希不可绑不同MA", func(t *testing.T) { + c := newClient(t) + _, err := c.IssueMA(RoleRegulator, issueReq(ma, ctid, fh)) + require.NoError(t, err) + _, exists := c.HashExists(fh) + assert.True(t, exists) + _, err = c.IssueMA(RoleRegulator, issueReq("MA.156.8531.6101/WD/20260000002", "ctid-x", fh)) + assert.ErrorIs(t, err, ErrHashExists) + }) + + t.Run("VerifyHash_匹配与不匹配", func(t *testing.T) { + c := newClient(t) + _, err := c.IssueMA(RoleRegulator, issueReq(ma, ctid, fh)) + require.NoError(t, err) + r, err := c.VerifyHash(ma, fh) + require.NoError(t, err) + assert.True(t, r.Match) + r2, _ := c.VerifyHash(ma, "tampered") + assert.False(t, r2.Match) + }) + + t.Run("集级验真与列出", func(t *testing.T) { + c := newClient(t) + _, err := c.IssueMA(RoleRegulator, issueReq(ma, ctid, fh)) + require.NoError(t, err) + r, err := c.VerifyEpisodeHash(ma, 2, fh+"-E2") + require.NoError(t, err) + assert.True(t, r.Match) + eps, err := c.ListEpisodes(ma) + require.NoError(t, err) + assert.Len(t, eps, 3) + }) + + t.Run("映射注册需先发码且可查", func(t *testing.T) { + c := newClient(t) + // 未发码不可注册映射 + _, err := c.RegisterMapping(RoleCP, model.Mapping{ContentTwinID: "ctid-none", Party: model.PartyCP, PartyID: "X"}) + assert.ErrorIs(t, err, ErrMANotIssued) + // 发码后可注册并查询 + _, err = c.IssueMA(RoleRegulator, issueReq(ma, ctid, fh)) + require.NoError(t, err) + _, err = c.RegisterMapping(RoleOperator, model.Mapping{ + ContentTwinID: ctid, Party: model.PartyOperator, PartyID: "OP-1", CDNEndpoint: "cdn://x", + }) + require.NoError(t, err) + mr, err := c.QueryMappings(ma) + require.NoError(t, err) + assert.GreaterOrEqual(t, len(mr.Mappings), 1) + assert.Contains(t, mr.CDNEndpoints, "cdn://x") + }) + + t.Run("下架_仅监管且状态变更", func(t *testing.T) { + c := newClient(t) + _, err := c.IssueMA(RoleRegulator, issueReq(ma, ctid, fh)) + require.NoError(t, err) + _, err = c.Revoke(RoleCP, ma, "x") + assert.ErrorIs(t, err, ErrPermissionDenied) + _, err = c.Revoke(RoleRegulator, ma, "违规") + require.NoError(t, err) + got, _ := c.QueryContent(ma) + assert.Equal(t, model.StatusRevoked, got.Status) + }) + + t.Run("集级下架与恢复", func(t *testing.T) { + c := newClient(t) + _, err := c.IssueMA(RoleRegulator, issueReq(ma, ctid, fh)) + require.NoError(t, err) + require.NoError(t, c.RevokeEpisode(RoleRegulator, ma, 2, "违规集")) + eps, _ := c.ListEpisodes(ma) + for _, e := range eps { + if e.Episode == 2 { + assert.True(t, e.Revoked) + } + } + require.NoError(t, c.RestoreEpisode(RoleRegulator, ma, 2)) + eps, _ = c.ListEpisodes(ma) + for _, e := range eps { + if e.Episode == 2 { + assert.False(t, e.Revoked) + } + } + }) + + t.Run("状态流转与按状态列举", func(t *testing.T) { + c := newClient(t) + _, err := c.IssueMA(RoleRegulator, issueReq(ma, ctid, fh)) + require.NoError(t, err) + require.NoError(t, c.SetContentStatus(ma, model.StatusPublished)) + pub, err := c.ListContents(model.StatusPublished) + require.NoError(t, err) + assert.Len(t, pub, 1) + }) +} + +// TestMemoryChain_Conformance 让内存实现跑契约套件(始终运行)。 +func TestMemoryChain_Conformance(t *testing.T) { + RunClientConformance(t, func(t *testing.T) Client { + return NewMemoryChain() + }) +} diff --git a/tcs-iptv/internal/chain/persistent_conformance_test.go b/tcs-iptv/internal/chain/persistent_conformance_test.go new file mode 100644 index 0000000..ed32c1d --- /dev/null +++ b/tcs-iptv/internal/chain/persistent_conformance_test.go @@ -0,0 +1,36 @@ +package chain + +import ( + "database/sql" + "os" + "testing" + + _ "github.com/lib/pq" + "github.com/stretchr/testify/require" +) + +// TestPersistentChain_Conformance 让 PG 持久化实现跑同一套契约套件。 +// +// 需要一个可写的测试库:设置 TCS_TEST_PG_DSN 后运行,例如 +// +// TCS_TEST_PG_DSN='postgres://postgres@localhost:5432/tcs_iptv_test?sslmode=disable' go test ./internal/chain/ +// +// 未设置则跳过(不污染开发库)。每个子用例前清空镜像表,保证干净状态。 +func TestPersistentChain_Conformance(t *testing.T) { + dsn := os.Getenv("TCS_TEST_PG_DSN") + if dsn == "" { + t.Skip("未设置 TCS_TEST_PG_DSN,跳过 PG 契约测试") + } + db, err := sql.Open("postgres", dsn) + require.NoError(t, err) + require.NoError(t, db.Ping()) + t.Cleanup(func() { _ = db.Close() }) + + RunClientConformance(t, func(t *testing.T) Client { + _, err := db.Exec(`TRUNCATE content_registry, hash_binding, identity_mapping, version_history, chain_tx CASCADE`) + require.NoError(t, err) + pc, err := NewPersistentChain(db) + require.NoError(t, err) + return pc + }) +} diff --git a/tcs-iptv/internal/config/config.go b/tcs-iptv/internal/config/config.go index b1cbb48..afa7fcb 100644 --- a/tcs-iptv/internal/config/config.go +++ b/tcs-iptv/internal/config/config.go @@ -12,6 +12,10 @@ type Config struct { HashAddr string PostgresDSN string RedisAddr string + // ChainBackend 选择链实现:memory(纯内存)| pg(内存+PG镜像)| chainmaker(真实链,需 -tags chainmaker 构建) + ChainBackend string + // ChainMakerSDKConf ChainMaker Go SDK 配置文件路径(节点地址/TLS/组织证书),仅 chainmaker 后端使用 + ChainMakerSDKConf string } func getEnv(key, def string) string { @@ -29,5 +33,8 @@ func Load() Config { HashAddr: getEnv("TCS_HASH_ADDR", ":8082"), PostgresDSN: getEnv("TCS_POSTGRES_DSN", "postgres://postgres@localhost:5432/tcs_iptv?sslmode=disable"), RedisAddr: getEnv("TCS_REDIS_ADDR", "localhost:6379"), + // 默认 pg:PG 可用则内存+镜像持久化,不可用自动回退内存(见 api-svc 装配) + ChainBackend: getEnv("TCS_CHAIN_BACKEND", "pg"), + ChainMakerSDKConf: getEnv("TCS_CHAINMAKER_SDK_CONF", "deploy/chainmaker/sdk_config.yml"), } }