feat(chain): ChainMaker 真实链接入脚手架(build tag 隔离)+ 契约测试

- internal/chain/chainmaker.go [//go:build chainmaker]: ChainMakerClient 适配器骨架,
  实现 chain.Client 全部方法到合约 Invoke/Query,按角色证书做链上鉴权,错误映射回标准错误
- internal/chain/chainmaker_stub.go [//go:build !chainmaker]: 占位构造函数,
  保证默认构建不依赖 SDK、主工程始终可编译
- contracts/tcs_registry/registry.go: 补齐合约方法
  RegisterHashBinding/VerifyEpisodeHash/ListEpisodes/HashExists/RecordVersionChange/
  RevokeEpisode/Restore/RestoreEpisode/SetContentStatus/QueryMappings/ListContents
  并增加集级哈希/映射/版本计数索引 KV 设计
- config: TCS_CHAIN_BACKEND=memory|pg|chainmaker + TCS_CHAINMAKER_SDK_CONF 开关
- cmd/api-svc: newChain 按 backend 选择,chainmaker 失败逐级降级 pg 到内存
- internal/chain/conformance_test.go: chain.Client 契约测试套件,双实现共用
  MemoryChain 默认跑;PersistentChain 经 TCS_TEST_PG_DSN;ChainMaker 经 -tags 与 env
- 验证: 默认 build/vet/test 全绿;MemoryChain 与 PersistentChain 契约套件均通过,行为一致
This commit is contained in:
selfrelease
2026-06-14 20:47:21 +08:00
parent 8a9ea6b40b
commit 166f460d57
9 changed files with 875 additions and 49 deletions
+15
View File
@@ -0,0 +1,15 @@
feat(chain): ChainMaker 真实链接入脚手架(build tag 隔离)+ 契约测试
- internal/chain/chainmaker.go [//go:build chainmaker]: ChainMakerClient 适配器骨架,
实现 chain.Client 全部方法到合约 Invoke/Query,按角色证书做链上鉴权,错误映射回标准错误
- internal/chain/chainmaker_stub.go [//go:build !chainmaker]: 占位构造函数,
保证默认构建不依赖 SDK、主工程始终可编译
- contracts/tcs_registry/registry.go: 补齐合约方法
RegisterHashBinding/VerifyEpisodeHash/ListEpisodes/HashExists/RecordVersionChange/
RevokeEpisode/Restore/RestoreEpisode/SetContentStatus/QueryMappings/ListContents
并增加集级哈希/映射/版本计数索引 KV 设计
- config: TCS_CHAIN_BACKEND=memory|pg|chainmaker + TCS_CHAINMAKER_SDK_CONF 开关
- cmd/api-svc: newChain 按 backend 选择,chainmaker 失败逐级降级 pg 到内存
- internal/chain/conformance_test.go: chain.Client 契约测试套件,双实现共用
MemoryChain 默认跑;PersistentChain 经 TCS_TEST_PG_DSNChainMaker 经 -tags 与 env
- 验证: 默认 build/vet/test 全绿;MemoryChain 与 PersistentChain 契约套件均通过,行为一致
+29 -11
View File
@@ -37,18 +37,36 @@ func newAllocationStore(db *sql.DB) macode.AllocationStore {
return macode.NewMemoryStore() return macode.NewMemoryStore()
} }
// newChain 优先使用 PostgreSQL 持久化链(写穿 + 启动水合),不可用时回退纯内存。 // newChain 按配置选择链后端:
func newChain(db *sql.DB) chain.Client { // - chainmaker:真实链(需 -tags chainmaker 构建并配置 SDK),失败回退 pg/内存
if db != nil { // - pg:PG 持久化链(写穿+水合,重启不丢数据)
if pc, err := chain.NewPersistentChain(db); err == nil { // - memory:纯内存(仅开发)
log.Printf("chain: 使用 PostgreSQL 持久化链(写穿+水合,重启不丢数据)") //
return pc // PG 不可用时自动降级到内存。
} else { func newChain(backend, sdkConf string, db *sql.DB) chain.Client {
log.Printf("chain: PG 持久化链初始化失败(%v),回退内存链", err) switch backend {
case "chainmaker":
cm, err := chain.NewChainMakerClient(sdkConf, db)
if err == nil {
log.Printf("chain: 使用 ChainMaker 真实链后端")
return cm
} }
log.Printf("chain: ChainMaker 后端不可用(%v),降级", err)
fallthrough
case "pg":
if db != nil {
if pc, err := chain.NewPersistentChain(db); err == nil {
log.Printf("chain: 使用 PostgreSQL 持久化链(写穿+水合,重启不丢数据)")
return pc
} else {
log.Printf("chain: PG 持久化链初始化失败(%v),回退内存链", err)
}
}
fallthrough
default:
log.Printf("chain: 使用内存链(仅开发用,重启丢数据)")
return chain.NewMemoryChain()
} }
log.Printf("chain: 使用内存链(仅开发用,重启丢数据)")
return chain.NewMemoryChain()
} }
func main() { func main() {
@@ -56,7 +74,7 @@ func main() {
// 装配依赖:共享一个 PG 连接给链持久化与号段存储 // 装配依赖:共享一个 PG 连接给链持久化与号段存储
db := openDB(cfg.PostgresDSN) db := openDB(cfg.PostgresDSN)
ch := newChain(db) ch := newChain(cfg.ChainBackend, cfg.ChainMakerSDKConf, db)
gen := macode.NewGenerator(newAllocationStore(db)) gen := macode.NewGenerator(newAllocationStore(db))
// 示例号段(生产由与发码机构对接后配置) // 示例号段(生产由与发码机构对接后配置)
// 机构节点 6101 = 陕西(管理方:陕西IPTV运营公司);行业节点 8531 = IPTV视听内容 // 机构节点 6101 = 陕西(管理方:陕西IPTV运营公司);行业节点 8531 = IPTV视听内容
+287 -38
View File
@@ -1,23 +1,31 @@
// Package main 是 TCS-IPTV 可信数据空间的 ChainMaker 智能合约(Go,三期 A.2)。 // Package main 是 TCS-IPTV 可信数据空间的 ChainMaker 智能合约(Go)。
// //
// 本合约为独立合约模块(独立 go.mod),按 ChainMaker docker-go/wasm 合约规范部署。 // 本合约为独立合约模块(独立 go.mod),按 ChainMaker docker-go 合约规范部署。
// 与 internal/chain.Client 接口语义一一对应;MVP/二期用 MemoryChain 等价实现, // 与 internal/chain.Client 接口语义一一对应;MVP/二期用 MemoryChain 等价实现,
// 具备链环境后部署本合约,由 chain-svc 通过 ChainMaker Go SDK 调用替换 MemoryChain // 具备链环境后部署本合约,由 chain-svc / ChainMakerClient 通过 SDK 调用替换内存实现
// //
// 状态键设计(KV): // 状态键设计(KV):
// //
// content:{maCode} -> Content JSON // content:{maCode} -> Content JSON(含 status
// binding:{maCode}:{idx} -> HashBinding JSON // binding:{maCode}:0 -> 整剧 file 哈希绑定 JSON
// binding:{maCode}:p -> 感知哈希绑定 JSON
// ep:{maCode}:{n} -> 集级绑定 JSON {episode,hash,revoked,reason}
// epcount:{maCode} -> 集数 N
// hashidx:{fileHash} -> maCode(防换壳重发) // hashidx:{fileHash} -> maCode(防换壳重发)
// mapping:{maCode}:{idx} -> Mapping JSON // mapping:{maCode}:{idx} -> Mapping JSON
// mapcount:{maCode} -> 映射数 N
// version:{maCode}:{idx} -> VersionChange JSON
// vercount:{maCode} -> 版本变更数 N
// ctid2ma:{ctid} -> maCode // ctid2ma:{ctid} -> maCode
// allmacodes -> []maCode(供 ListContents 遍历)
// //
// 权限:通过 sender 组织/角色证书判断(仅监管组织可 IssueMA/Revoke)。 // 权限:通过 sender 组织证书判断(仅监管组织可 IssueMA/Revoke/RevokeEpisode/Restore)。
package main package main
import ( import (
"encoding/json" "encoding/json"
"errors" "errors"
"strconv"
"chainmaker.org/chainmaker/contract-sdk-go/v2/pb/protogo" "chainmaker.org/chainmaker/contract-sdk-go/v2/pb/protogo"
"chainmaker.org/chainmaker/contract-sdk-go/v2/sandbox" "chainmaker.org/chainmaker/contract-sdk-go/v2/sandbox"
@@ -28,7 +36,8 @@ import (
type TCSRegistry struct{} type TCSRegistry struct{}
const ( const (
orgRegulator = "regulator" // 监管组织仅其可签发/下架 orgRegulator = "regulator" // 监管组织仅其可签发/下架
orgReviewer = "reviewer" // 审核组织:哈希绑定/版本变更/状态流转
) )
// InitContract 合约初始化。 // InitContract 合约初始化。
@@ -41,13 +50,46 @@ func (t *TCSRegistry) UpgradeContract() protogo.Response {
return sdk.Success([]byte("tcs_registry upgraded")) return sdk.Success([]byte("tcs_registry upgraded"))
} }
// senderOrg 取调用方组织标识(基于证书 OU/OrgId)。
func senderOrg() string { func senderOrg() string {
org, _ := sdk.Instance.GetSenderOrgId() org, _ := sdk.Instance.GetSenderOrgId()
return org return org
} }
// IssueMA 签发 MA 码并 1:1 强绑定哈希(仅监管组织)。 func getInt(key, field string) int {
v, _ := sdk.Instance.GetStateByte(key, field)
if len(v) == 0 {
return 0
}
n, _ := strconv.Atoi(string(v))
return n
}
func putInt(key, field string, n int) {
_ = sdk.Instance.PutStateByte(key, field, []byte(strconv.Itoa(n)))
}
// epBinding 集级绑定的链上结构。
type epBinding struct {
Episode int `json:"episode"`
HashValue string `json:"hash_value"`
Revoked bool `json:"revoked"`
Reason string `json:"revoked_reason,omitempty"`
}
// appendMACode 把新发码加入全局列表(供 ListContents 遍历)。
func appendMACode(maCode string) {
var all []string
if v, _ := sdk.Instance.GetStateByte("allmacodes", ""); len(v) > 0 {
_ = json.Unmarshal(v, &all)
}
all = append(all, maCode)
b, _ := json.Marshal(all)
_ = sdk.Instance.PutStateByte("allmacodes", "", b)
}
// ---- 写方法 ----
// IssueMA 签发 MA 码并 1:1 强绑定哈希;同时登记集级哈希(仅监管组织)。
func (t *TCSRegistry) IssueMA() protogo.Response { func (t *TCSRegistry) IssueMA() protogo.Response {
if senderOrg() != orgRegulator { if senderOrg() != orgRegulator {
return sdk.Error("permission denied: only regulator can issue MA") return sdk.Error("permission denied: only regulator can issue MA")
@@ -56,85 +98,292 @@ func (t *TCSRegistry) IssueMA() protogo.Response {
maCode := string(args["ma_code"]) maCode := string(args["ma_code"])
ctid := string(args["ctid"]) ctid := string(args["ctid"])
fileHash := string(args["file_hash"]) fileHash := string(args["file_hash"])
contentJSON := args["content"]
// MA 不可重复签发
if existing, _ := sdk.Instance.GetStateByte("content", maCode); len(existing) > 0 { if existing, _ := sdk.Instance.GetStateByte("content", maCode); len(existing) > 0 {
return sdk.Error("MA already issued (1:1 binding immutable)") return sdk.Error("MA already issued (1:1 binding immutable)")
} }
// 防换壳重发:同哈希不可绑定到不同 MA
if bound, _ := sdk.Instance.GetStateByte("hashidx", fileHash); len(bound) > 0 { if bound, _ := sdk.Instance.GetStateByte("hashidx", fileHash); len(bound) > 0 {
return sdk.Error("content hash already exists") return sdk.Error("content hash already exists")
} }
_ = sdk.Instance.PutStateByte("content", maCode, contentJSON) // 内容主记录(强制 status=approved,与 MemoryChain 一致)
binding := map[string]string{"hash_type": "file_sha256", "hash_value": fileHash, "version": "v1.0"} var content map[string]interface{}
bj, _ := json.Marshal(binding) _ = json.Unmarshal(args["content"], &content)
_ = sdk.Instance.PutStateByte("binding", maCode+":0", bj) if content == nil {
content = map[string]interface{}{}
}
content["ma_code"] = maCode
content["content_twin_id"] = ctid
content["status"] = "approved"
cj, _ := json.Marshal(content)
_ = sdk.Instance.PutStateByte("content", maCode, cj)
// 整剧 file 绑定 + 感知哈希绑定
fb, _ := json.Marshal(epBinding{Episode: 0, HashValue: fileHash})
_ = sdk.Instance.PutStateByte("binding", maCode+":0", fb)
if ph := string(args["perceptual_hash"]); ph != "" {
pb, _ := json.Marshal(map[string]string{"hash_type": "perceptual", "hash_value": ph})
_ = sdk.Instance.PutStateByte("binding", maCode+":p", pb)
}
_ = sdk.Instance.PutStateByte("hashidx", fileHash, []byte(maCode)) _ = sdk.Instance.PutStateByte("hashidx", fileHash, []byte(maCode))
_ = sdk.Instance.PutStateByte("ctid2ma", ctid, []byte(maCode)) _ = sdk.Instance.PutStateByte("ctid2ma", ctid, []byte(maCode))
// 集级哈希
var eps []map[string]interface{}
_ = json.Unmarshal(args["episodes"], &eps)
n := 0
for _, e := range eps {
ep := int(toFloat(e["episode"]))
hv, _ := e["file_sha256"].(string)
if ep <= 0 || hv == "" {
continue
}
eb, _ := json.Marshal(epBinding{Episode: ep, HashValue: hv})
_ = sdk.Instance.PutStateByte("ep", maCode+":"+strconv.Itoa(ep), eb)
if exist, _ := sdk.Instance.GetStateByte("hashidx", hv); len(exist) == 0 {
_ = sdk.Instance.PutStateByte("hashidx", hv, []byte(maCode))
}
if ep > n {
n = ep
}
}
putInt("epcount", maCode, n)
appendMACode(maCode)
sdk.Instance.EmitEvent("RegisterSuccess", []string{maCode, fileHash}) sdk.Instance.EmitEvent("RegisterSuccess", []string{maCode, fileHash})
return sdk.Success([]byte(maCode)) return sdk.Success([]byte(maCode))
} }
// RegisterMapping 注册三方编码映射(MA 必须已签发)。 func toFloat(v interface{}) float64 {
func (t *TCSRegistry) RegisterMapping() protogo.Response { if f, ok := v.(float64); ok {
return f
}
return 0
}
// RegisterHashBinding 追加哈希绑定(如转码版)。MA 必须已签发(审核/监管)。
func (t *TCSRegistry) RegisterHashBinding() protogo.Response {
if o := senderOrg(); o != orgReviewer && o != orgRegulator {
return sdk.Error("permission denied")
}
args := sdk.Instance.GetArgs() args := sdk.Instance.GetArgs()
maCode := string(args["ma_code"]) ctid := string(args["ctid"])
if v, _ := sdk.Instance.GetStateByte("content", maCode); len(v) == 0 { ma, _ := sdk.Instance.GetStateByte("ctid2ma", ctid)
if len(ma) == 0 {
return sdk.Error("MA not issued") return sdk.Error("MA not issued")
} }
idx := string(args["idx"]) idx := getInt("bindextra", string(ma)) + 1
_ = sdk.Instance.PutStateByte("mapping", maCode+":"+idx, args["mapping"]) _ = sdk.Instance.PutStateByte("bindextra", string(ma)+":"+strconv.Itoa(idx), args["binding"])
putInt("bindextra", string(ma), idx)
return sdk.Success([]byte("ok")) return sdk.Success([]byte("ok"))
} }
// VerifyHash 校验提交哈希是否与绑定哈希一致 // RegisterMapping 注册三方编码映射;MA 必须已签发(任意角色注册本方)
func (t *TCSRegistry) VerifyHash() protogo.Response { func (t *TCSRegistry) RegisterMapping() protogo.Response {
args := sdk.Instance.GetArgs() args := sdk.Instance.GetArgs()
maCode := string(args["ma_code"]) ctid := string(args["ctid"])
fileHash := string(args["file_hash"]) ma, _ := sdk.Instance.GetStateByte("ctid2ma", ctid)
bound, _ := sdk.Instance.GetStateByte("hashidx", fileHash) if len(ma) == 0 {
if string(bound) == maCode { return sdk.Error("MA not issued")
return sdk.Success([]byte("true"))
} }
return sdk.Success([]byte("false")) idx := getInt("mapcount", string(ma)) + 1
_ = sdk.Instance.PutStateByte("mapping", string(ma)+":"+strconv.Itoa(idx), args["mapping"])
putInt("mapcount", string(ma), idx)
return sdk.Success([]byte("ok"))
} }
// Revoke 下架(仅监管组织)。 // RecordVersionChange 记录版本变更(审核/监管)。
func (t *TCSRegistry) RecordVersionChange() protogo.Response {
if o := senderOrg(); o != orgReviewer && o != orgRegulator {
return sdk.Error("permission denied")
}
args := sdk.Instance.GetArgs()
ctid := string(args["ctid"])
ma, _ := sdk.Instance.GetStateByte("ctid2ma", ctid)
if len(ma) == 0 {
return sdk.Error("MA not issued")
}
idx := getInt("vercount", string(ma)) + 1
_ = sdk.Instance.PutStateByte("version", string(ma)+":"+strconv.Itoa(idx), args["vc"])
putInt("vercount", string(ma), idx)
return sdk.Success([]byte("ok"))
}
// Revoke 整剧下架(仅监管组织)。
func (t *TCSRegistry) Revoke() protogo.Response { func (t *TCSRegistry) Revoke() protogo.Response {
if senderOrg() != orgRegulator { if senderOrg() != orgRegulator {
return sdk.Error("permission denied: only regulator can revoke") return sdk.Error("permission denied: only regulator can revoke")
} }
return setStatus(string(sdk.Instance.GetArgs()["ma_code"]), "revoked", "Revoked")
}
// SetContentStatus 状态流转(审核/监管):入库/发布等。
func (t *TCSRegistry) SetContentStatus() protogo.Response {
if o := senderOrg(); o != orgReviewer && o != orgRegulator {
return sdk.Error("permission denied")
}
args := sdk.Instance.GetArgs() args := sdk.Instance.GetArgs()
maCode := string(args["ma_code"]) return setStatus(string(args["ma_code"]), string(args["status"]), "StatusChanged")
}
func setStatus(maCode, status, event string) protogo.Response {
cj, _ := sdk.Instance.GetStateByte("content", maCode) cj, _ := sdk.Instance.GetStateByte("content", maCode)
if len(cj) == 0 { if len(cj) == 0 {
return sdk.Error("not found") return sdk.Error("not found")
} }
var content map[string]interface{} var content map[string]interface{}
_ = json.Unmarshal(cj, &content) _ = json.Unmarshal(cj, &content)
content["status"] = "revoked" content["status"] = status
nj, _ := json.Marshal(content) nj, _ := json.Marshal(content)
_ = sdk.Instance.PutStateByte("content", maCode, nj) _ = sdk.Instance.PutStateByte("content", maCode, nj)
sdk.Instance.EmitEvent("Revoked", []string{maCode}) sdk.Instance.EmitEvent(event, []string{maCode, status})
return sdk.Success([]byte("ok")) return sdk.Success([]byte("ok"))
} }
// RevokeEpisode 集级下架(仅监管组织)。
func (t *TCSRegistry) RevokeEpisode() protogo.Response {
if senderOrg() != orgRegulator {
return sdk.Error("permission denied")
}
args := sdk.Instance.GetArgs()
return setEpisodeRevoked(string(args["ma_code"]), string(args["episode"]), true, string(args["reason"]))
}
// RestoreEpisode 集级恢复(仅监管组织)。
func (t *TCSRegistry) RestoreEpisode() protogo.Response {
if senderOrg() != orgRegulator {
return sdk.Error("permission denied")
}
args := sdk.Instance.GetArgs()
return setEpisodeRevoked(string(args["ma_code"]), string(args["episode"]), false, "")
}
// Restore 整剧恢复(仅监管组织)。
func (t *TCSRegistry) Restore() protogo.Response {
if senderOrg() != orgRegulator {
return sdk.Error("permission denied")
}
return setStatus(string(sdk.Instance.GetArgs()["ma_code"]), "published", "Restored")
}
func setEpisodeRevoked(maCode, epStr string, revoked bool, reason string) protogo.Response {
key := maCode + ":" + epStr
v, _ := sdk.Instance.GetStateByte("ep", key)
if len(v) == 0 {
return sdk.Error("not found")
}
var eb epBinding
_ = json.Unmarshal(v, &eb)
eb.Revoked = revoked
eb.Reason = reason
nb, _ := json.Marshal(eb)
_ = sdk.Instance.PutStateByte("ep", key, nb)
return sdk.Success([]byte("ok"))
}
// ---- 读方法 ----
// VerifyHash 校验整剧/转码哈希是否绑定到该 MA。
func (t *TCSRegistry) VerifyHash() protogo.Response {
args := sdk.Instance.GetArgs()
bound, _ := sdk.Instance.GetStateByte("hashidx", string(args["file_hash"]))
if string(bound) == string(args["ma_code"]) {
return sdk.Success([]byte("true"))
}
return sdk.Success([]byte("false"))
}
// VerifyEpisodeHash 校验某集哈希。
func (t *TCSRegistry) VerifyEpisodeHash() protogo.Response {
args := sdk.Instance.GetArgs()
v, _ := sdk.Instance.GetStateByte("ep", string(args["ma_code"])+":"+string(args["episode"]))
if len(v) == 0 {
return sdk.Success([]byte("false"))
}
var eb epBinding
_ = json.Unmarshal(v, &eb)
if eb.HashValue == string(args["file_hash"]) {
return sdk.Success([]byte("true"))
}
return sdk.Success([]byte("false"))
}
// HashExists 返回绑定的 MA(不存在返回空)。
func (t *TCSRegistry) HashExists() protogo.Response {
bound, _ := sdk.Instance.GetStateByte("hashidx", string(sdk.Instance.GetArgs()["file_hash"]))
return sdk.Success(bound)
}
// ListEpisodes 返回某 MA 的集级绑定数组(JSON)。
func (t *TCSRegistry) ListEpisodes() protogo.Response {
maCode := string(sdk.Instance.GetArgs()["ma_code"])
n := getInt("epcount", maCode)
out := make([]epBinding, 0, n)
for i := 1; i <= n; i++ {
if v, _ := sdk.Instance.GetStateByte("ep", maCode+":"+strconv.Itoa(i)); len(v) > 0 {
var eb epBinding
_ = json.Unmarshal(v, &eb)
out = append(out, eb)
}
}
b, _ := json.Marshal(out)
return sdk.Success(b)
}
// QueryContent 查询内容主记录。 // QueryContent 查询内容主记录。
func (t *TCSRegistry) QueryContent() protogo.Response { func (t *TCSRegistry) QueryContent() protogo.Response {
maCode := string(sdk.Instance.GetArgs()["ma_code"]) v, _ := sdk.Instance.GetStateByte("content", string(sdk.Instance.GetArgs()["ma_code"]))
v, _ := sdk.Instance.GetStateByte("content", maCode)
if len(v) == 0 { if len(v) == 0 {
return sdk.Error("not found") return sdk.Error("not found")
} }
return sdk.Success(v) return sdk.Success(v)
} }
// QueryMappings 返回某 MA 的全部映射(JSON {mappings:[],cdn_endpoints:[]})。
func (t *TCSRegistry) QueryMappings() protogo.Response {
maCode := string(sdk.Instance.GetArgs()["ma_code"])
n := getInt("mapcount", maCode)
maps := make([]map[string]interface{}, 0, n)
cdns := []string{}
for i := 1; i <= n; i++ {
if v, _ := sdk.Instance.GetStateByte("mapping", maCode+":"+strconv.Itoa(i)); len(v) > 0 {
var m map[string]interface{}
_ = json.Unmarshal(v, &m)
maps = append(maps, m)
if ep, ok := m["cdn_endpoint"].(string); ok && ep != "" {
cdns = append(cdns, ep)
}
}
}
b, _ := json.Marshal(map[string]interface{}{"ma_code": maCode, "mappings": maps, "cdn_endpoints": cdns})
return sdk.Success(b)
}
// ListContents 按状态返回内容数组(空状态返回全部)。
func (t *TCSRegistry) ListContents() protogo.Response {
status := string(sdk.Instance.GetArgs()["status"])
var all []string
if v, _ := sdk.Instance.GetStateByte("allmacodes", ""); len(v) > 0 {
_ = json.Unmarshal(v, &all)
}
out := make([]map[string]interface{}, 0, len(all))
for _, ma := range all {
v, _ := sdk.Instance.GetStateByte("content", ma)
if len(v) == 0 {
continue
}
var c map[string]interface{}
_ = json.Unmarshal(v, &c)
if status == "" || c["status"] == status {
out = append(out, c)
}
}
b, _ := json.Marshal(out)
return sdk.Success(b)
}
func main() { func main() {
err := sandbox.Start(new(TCSRegistry)) if err := sandbox.Start(new(TCSRegistry)); err != nil {
if err != nil {
_ = errors.New(err.Error()) _ = errors.New(err.Error())
} }
} }
+294
View File
@@ -0,0 +1,294 @@
//go:build chainmaker
// Package chain — 真实链后端(长安链 ChainMaker)适配器骨架。
//
// 仅在 `go build -tags chainmaker` 时编译;默认构建由 chainmaker_stub.go 提供占位,
// 因此主工程在没有 ChainMaker Go SDK 依赖时也始终可编译。
//
// 接入步骤(需真实环境):
// 1. 引入 SDK 依赖:
// go get chainmaker.org/chainmaker/sdk-go/v2
// 2. 准备 sdk_config.yml(节点地址、TLS、四角色组织证书),路径由 TCS_CHAINMAKER_SDK_CONF 指定。
// 3. 部署 contracts/tcs_registry 合约,合约名见 contractName 常量。
// 4. 启动:TCS_CHAIN_BACKEND=chainmaker go run -tags chainmaker ./cmd/api-svc
//
// 设计:每个业务角色(监管/审核/CP/运营商)使用各自组织证书的 ChainClient
// 合约内 senderOrg() 据此做链上权限判定(IssueMA/Revoke 仅监管组织)。
// 写操作走 InvokeContract(同步等待上链确认),读操作走 QueryContract(不产生交易)。
package chain
import (
"database/sql"
"encoding/json"
"fmt"
"strings"
"chainmaker.org/chainmaker/pb-go/v2/common"
sdk "chainmaker.org/chainmaker/sdk-go/v2"
"github.com/tcs-iptv/tcs/internal/model"
)
const contractName = "tcs_registry"
// ChainMakerClient 是 chain.Client 的真实链实现。
type ChainMakerClient struct {
// clients 为四角色各自证书初始化的链客户端(组织/用户证书不同)。
clients map[Role]*sdk.ChainClient
// mirror 可选 PG 镜像(链为权威,镜像供高效查询)。为 nil 时仅用链。
mirror *sql.DB
}
var _ Client = (*ChainMakerClient)(nil)
// NewChainMakerClient 按 sdk_config.yml 初始化四角色链客户端。
//
// 真实实现需为每个角色加载其组织/用户证书(可在 sdk_config.yml 用多 user 段,
// 或为每个角色单独一个 config 文件)。此处给出装配骨架,证书细节随部署而定。
func NewChainMakerClient(sdkConfPath string, mirror *sql.DB) (Client, error) {
roles := []Role{RoleRegulator, RoleReviewer, RoleCP, RoleOperator}
clients := make(map[Role]*sdk.ChainClient, len(roles))
for _, r := range roles {
// TODO(deploy): 为每个角色加载其证书。示例:约定每角色一个配置文件
// conf := fmt.Sprintf("%s.%s.yml", strings.TrimSuffix(sdkConfPath, ".yml"), r)
cli, err := sdk.NewChainClient(sdk.WithConfPath(sdkConfPath))
if err != nil {
return nil, fmt.Errorf("chainmaker: 初始化角色 %s 客户端失败: %w", r, err)
}
clients[r] = cli
}
return &ChainMakerClient{clients: clients, mirror: mirror}, nil
}
// kv 构造合约入参键值对。
func kv(m map[string][]byte) []*common.KeyValuePair {
out := make([]*common.KeyValuePair, 0, len(m))
for k, v := range m {
out = append(out, &common.KeyValuePair{Key: k, Value: v})
}
return out
}
// invoke 以指定角色身份提交合约写交易(同步等待上链)。
func (c *ChainMakerClient) invoke(role Role, method string, args map[string][]byte) (*common.TxResponse, error) {
cli, ok := c.clients[role]
if !ok {
return nil, fmt.Errorf("chainmaker: 未配置角色 %s 的链客户端", role)
}
// withSyncResult=true:等待交易上链并返回合约执行结果
resp, err := cli.InvokeContract(contractName, method, "", kv(args), -1, true)
if err != nil {
return nil, err
}
if resp.Code != common.TxStatusCode_SUCCESS {
return nil, fmt.Errorf("chainmaker: tx 失败: %s", resp.Message)
}
if resp.ContractResult != nil && resp.ContractResult.Code != 0 {
return nil, mapContractError(string(resp.ContractResult.Message))
}
return resp, nil
}
// query 以指定角色身份发起合约查询(不产生交易)。
func (c *ChainMakerClient) query(role Role, method string, args map[string][]byte) ([]byte, error) {
cli := c.clients[role]
resp, err := cli.QueryContract(contractName, method, "", kv(args), -1)
if err != nil {
return nil, err
}
if resp.ContractResult != nil && resp.ContractResult.Code != 0 {
return nil, mapContractError(string(resp.ContractResult.Message))
}
if resp.ContractResult == nil {
return nil, ErrNotFound
}
return resp.ContractResult.Result, nil
}
// mapContractError 把合约返回的错误消息映射回 chain 包标准错误,保证与 MemoryChain 行为一致。
func mapContractError(msg string) error {
switch {
case strings.Contains(msg, "permission denied"):
return ErrPermissionDenied
case strings.Contains(msg, "already issued"):
return ErrMAAlreadyIssued
case strings.Contains(msg, "hash already exists"):
return ErrHashExists
case strings.Contains(msg, "not issued"):
return ErrMANotIssued
case strings.Contains(msg, "not found"):
return ErrNotFound
default:
return fmt.Errorf("chainmaker: %s", msg)
}
}
// ---- chain.Client 实现(写操作)----
func (c *ChainMakerClient) IssueMA(role Role, req IssueRequest) (string, error) {
contentJSON, _ := json.Marshal(req.Content)
epJSON, _ := json.Marshal(req.Episodes)
resp, err := c.invoke(role, "IssueMA", map[string][]byte{
"ma_code": []byte(req.MACode),
"ctid": []byte(req.ContentTwinID),
"merkle_root": []byte(req.MerkleRoot),
"file_hash": []byte(req.FileHash),
"perceptual_hash": []byte(req.PerceptualHash),
"episodes": epJSON,
"content": contentJSON,
})
if err != nil {
return "", err
}
// TODO(mirror): 成功后写 PG 镜像(可复用 PersistentChain 的 persist* 逻辑)
return resp.TxId, nil
}
func (c *ChainMakerClient) RegisterHashBinding(role Role, b model.HashBinding) (string, error) {
bj, _ := json.Marshal(b)
resp, err := c.invoke(role, "RegisterHashBinding", map[string][]byte{
"ctid": []byte(b.ContentTwinID), "binding": bj,
})
if err != nil {
return "", err
}
return resp.TxId, nil
}
func (c *ChainMakerClient) RegisterMapping(role Role, m model.Mapping) (string, error) {
mj, _ := json.Marshal(m)
resp, err := c.invoke(role, "RegisterMapping", map[string][]byte{
"ctid": []byte(m.ContentTwinID), "mapping": mj,
})
if err != nil {
return "", err
}
return resp.TxId, nil
}
func (c *ChainMakerClient) RecordVersionChange(vc model.VersionChange) (string, error) {
vj, _ := json.Marshal(vc)
resp, err := c.invoke(RoleReviewer, "RecordVersionChange", map[string][]byte{
"ctid": []byte(vc.ContentTwinID), "vc": vj,
})
if err != nil {
return "", err
}
return resp.TxId, nil
}
func (c *ChainMakerClient) Revoke(role Role, maCode, reason string) (MappingsResult, error) {
if _, err := c.invoke(role, "Revoke", map[string][]byte{
"ma_code": []byte(maCode), "reason": []byte(reason),
}); err != nil {
return MappingsResult{}, err
}
return c.QueryMappings(maCode)
}
func (c *ChainMakerClient) RevokeEpisode(role Role, maCode string, episode int, reason string) error {
_, err := c.invoke(role, "RevokeEpisode", map[string][]byte{
"ma_code": []byte(maCode), "episode": []byte(fmt.Sprint(episode)), "reason": []byte(reason),
})
return err
}
func (c *ChainMakerClient) Restore(role Role, maCode string) error {
_, err := c.invoke(role, "Restore", map[string][]byte{"ma_code": []byte(maCode)})
return err
}
func (c *ChainMakerClient) RestoreEpisode(role Role, maCode string, episode int) error {
_, err := c.invoke(role, "RestoreEpisode", map[string][]byte{
"ma_code": []byte(maCode), "episode": []byte(fmt.Sprint(episode)),
})
return err
}
func (c *ChainMakerClient) SetContentStatus(maCode, status string) error {
_, err := c.invoke(RoleReviewer, "SetContentStatus", map[string][]byte{
"ma_code": []byte(maCode), "status": []byte(status),
})
return err
}
// ---- chain.Client 实现(读操作)----
func (c *ChainMakerClient) VerifyHash(maCode, fileHash string) (VerifyResult, error) {
res, err := c.query(RoleOperator, "VerifyHash", map[string][]byte{
"ma_code": []byte(maCode), "file_hash": []byte(fileHash),
})
if err != nil {
return VerifyResult{MACode: maCode, SubmittedHash: fileHash}, err
}
match := string(res) == "true"
return VerifyResult{Valid: true, MACode: maCode, SubmittedHash: fileHash, Match: match}, nil
}
func (c *ChainMakerClient) VerifyEpisodeHash(maCode string, episode int, fileHash string) (VerifyResult, error) {
res, err := c.query(RoleOperator, "VerifyEpisodeHash", map[string][]byte{
"ma_code": []byte(maCode), "episode": []byte(fmt.Sprint(episode)), "file_hash": []byte(fileHash),
})
if err != nil {
return VerifyResult{MACode: maCode, SubmittedHash: fileHash}, err
}
return VerifyResult{Valid: true, MACode: maCode, SubmittedHash: fileHash, Match: string(res) == "true"}, nil
}
func (c *ChainMakerClient) ListEpisodes(maCode string) ([]model.HashBinding, error) {
res, err := c.query(RoleRegulator, "ListEpisodes", map[string][]byte{"ma_code": []byte(maCode)})
if err != nil {
return nil, err
}
var out []model.HashBinding
if err := json.Unmarshal(res, &out); err != nil {
return nil, err
}
return out, nil
}
func (c *ChainMakerClient) HashExists(fileHash string) (string, bool) {
res, err := c.query(RoleRegulator, "HashExists", map[string][]byte{"file_hash": []byte(fileHash)})
if err != nil || len(res) == 0 {
return "", false
}
return string(res), true
}
func (c *ChainMakerClient) QueryContent(maCode string) (model.Content, error) {
res, err := c.query(RoleRegulator, "QueryContent", map[string][]byte{"ma_code": []byte(maCode)})
if err != nil {
return model.Content{}, err
}
var content model.Content
if err := json.Unmarshal(res, &content); err != nil {
return model.Content{}, err
}
return content, nil
}
func (c *ChainMakerClient) ListContents(status string) ([]model.Content, error) {
// 优先走 PG 镜像(链上范围扫描代价高);无镜像时回源合约范围查询。
res, err := c.query(RoleRegulator, "ListContents", map[string][]byte{"status": []byte(status)})
if err != nil {
return nil, err
}
var out []model.Content
if err := json.Unmarshal(res, &out); err != nil {
return nil, err
}
return out, nil
}
func (c *ChainMakerClient) QueryMappings(maCode string) (MappingsResult, error) {
res, err := c.query(RoleRegulator, "QueryMappings", map[string][]byte{"ma_code": []byte(maCode)})
if err != nil {
return MappingsResult{}, err
}
var out MappingsResult
if err := json.Unmarshal(res, &out); err != nil {
return MappingsResult{}, err
}
out.MACode = maCode
return out, nil
}
@@ -0,0 +1,29 @@
//go:build chainmaker
package chain
import (
"os"
"testing"
"github.com/stretchr/testify/require"
)
// TestChainMakerClient_Conformance 让真实链实现跑同一套契约套件。
//
// 仅在 `go test -tags chainmaker` 且配置了测试链时运行:
// - TCS_TEST_CHAINMAKER_CONF:测试链 sdk_config.yml 路径
//
// 注意:真实链不易"清空状态",建议每次用全新 maCode/合约实例,或对接专用测试链。
// 本用例提供接线骨架,实际跑通需真实 ChainMaker 测试网与已部署的 tcs_registry 合约。
func TestChainMakerClient_Conformance(t *testing.T) {
conf := os.Getenv("TCS_TEST_CHAINMAKER_CONF")
if conf == "" {
t.Skip("未设置 TCS_TEST_CHAINMAKER_CONF,跳过真实链契约测试")
}
RunClientConformance(t, func(t *testing.T) Client {
c, err := NewChainMakerClient(conf, nil)
require.NoError(t, err)
return c
})
}
@@ -0,0 +1,18 @@
//go:build !chainmaker
package chain
import (
"database/sql"
"errors"
)
// ErrChainMakerNotBuilt 表示二进制未启用 chainmaker 构建标签,无法使用真实链后端。
var ErrChainMakerNotBuilt = errors.New("chain: 未启用 chainmaker 构建标签,请使用 `go build -tags chainmaker` 并引入 ChainMaker Go SDK")
// NewChainMakerClient 是真实链后端的占位实现(默认构建)。
// 真正的实现位于 chainmaker.go//go:build chainmaker),需引入 ChainMaker Go SDK。
// 这样默认构建不依赖链 SDK,主工程始终可编译;装配处可统一引用本构造函数。
func NewChainMakerClient(sdkConfPath string, mirror *sql.DB) (Client, error) {
return nil, ErrChainMakerNotBuilt
}
+160
View File
@@ -0,0 +1,160 @@
package chain
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tcs-iptv/tcs/internal/model"
)
// RunClientConformance 是 chain.Client 的契约测试套件,校验任意实现都满足同一组
// 不可变业务规则(权限/1:1强绑定/防换壳/状态机/集级粒度)。
//
// 各实现(MemoryChain / PersistentChain / ChainMakerClient)复用本套件,
// 保证「换实现不换行为」,这是平滑替换真实链的安全保障。
//
// newClient 必须返回一个干净(空状态)的 Client 实例。
func RunClientConformance(t *testing.T, newClient func(t *testing.T) Client) {
// 构造一条标准发码请求(集级 3 集)。
issueReq := func(ma, ctid, fh string) IssueRequest {
return IssueRequest{
MACode: ma, ContentTwinID: ctid, MerkleRoot: "mr-" + fh, FileHash: fh,
PerceptualHash: "ph-" + fh,
Episodes: []model.EpisodeHash{
{Episode: 1, FileSHA256: fh + "-E1"},
{Episode: 2, FileSHA256: fh + "-E2"},
{Episode: 3, FileSHA256: fh + "-E3"},
},
Content: model.Content{Title: "契约测试剧", EpisodeCount: 3, MAType: "WD", Issuer: "测试局"},
}
}
const ma = "MA.156.8531.6101/WD/20260000001"
const ctid = "ctid-conf-001"
const fh = "fh-conf-001"
t.Run("IssueMA_仅监管可发码", func(t *testing.T) {
c := newClient(t)
_, err := c.IssueMA(RoleCP, issueReq(ma, ctid, fh))
assert.ErrorIs(t, err, ErrPermissionDenied)
})
t.Run("IssueMA_成功并可查询", func(t *testing.T) {
c := newClient(t)
tx, err := c.IssueMA(RoleRegulator, issueReq(ma, ctid, fh))
require.NoError(t, err)
assert.NotEmpty(t, tx)
got, err := c.QueryContent(ma)
require.NoError(t, err)
assert.Equal(t, "契约测试剧", got.Title)
assert.Equal(t, model.StatusApproved, got.Status)
})
t.Run("IssueMA_不可重复签发", func(t *testing.T) {
c := newClient(t)
_, err := c.IssueMA(RoleRegulator, issueReq(ma, ctid, fh))
require.NoError(t, err)
_, err = c.IssueMA(RoleRegulator, issueReq(ma, ctid, fh))
assert.ErrorIs(t, err, ErrMAAlreadyIssued)
})
t.Run("防换壳_同哈希不可绑不同MA", func(t *testing.T) {
c := newClient(t)
_, err := c.IssueMA(RoleRegulator, issueReq(ma, ctid, fh))
require.NoError(t, err)
_, exists := c.HashExists(fh)
assert.True(t, exists)
_, err = c.IssueMA(RoleRegulator, issueReq("MA.156.8531.6101/WD/20260000002", "ctid-x", fh))
assert.ErrorIs(t, err, ErrHashExists)
})
t.Run("VerifyHash_匹配与不匹配", func(t *testing.T) {
c := newClient(t)
_, err := c.IssueMA(RoleRegulator, issueReq(ma, ctid, fh))
require.NoError(t, err)
r, err := c.VerifyHash(ma, fh)
require.NoError(t, err)
assert.True(t, r.Match)
r2, _ := c.VerifyHash(ma, "tampered")
assert.False(t, r2.Match)
})
t.Run("集级验真与列出", func(t *testing.T) {
c := newClient(t)
_, err := c.IssueMA(RoleRegulator, issueReq(ma, ctid, fh))
require.NoError(t, err)
r, err := c.VerifyEpisodeHash(ma, 2, fh+"-E2")
require.NoError(t, err)
assert.True(t, r.Match)
eps, err := c.ListEpisodes(ma)
require.NoError(t, err)
assert.Len(t, eps, 3)
})
t.Run("映射注册需先发码且可查", func(t *testing.T) {
c := newClient(t)
// 未发码不可注册映射
_, err := c.RegisterMapping(RoleCP, model.Mapping{ContentTwinID: "ctid-none", Party: model.PartyCP, PartyID: "X"})
assert.ErrorIs(t, err, ErrMANotIssued)
// 发码后可注册并查询
_, err = c.IssueMA(RoleRegulator, issueReq(ma, ctid, fh))
require.NoError(t, err)
_, err = c.RegisterMapping(RoleOperator, model.Mapping{
ContentTwinID: ctid, Party: model.PartyOperator, PartyID: "OP-1", CDNEndpoint: "cdn://x",
})
require.NoError(t, err)
mr, err := c.QueryMappings(ma)
require.NoError(t, err)
assert.GreaterOrEqual(t, len(mr.Mappings), 1)
assert.Contains(t, mr.CDNEndpoints, "cdn://x")
})
t.Run("下架_仅监管且状态变更", func(t *testing.T) {
c := newClient(t)
_, err := c.IssueMA(RoleRegulator, issueReq(ma, ctid, fh))
require.NoError(t, err)
_, err = c.Revoke(RoleCP, ma, "x")
assert.ErrorIs(t, err, ErrPermissionDenied)
_, err = c.Revoke(RoleRegulator, ma, "违规")
require.NoError(t, err)
got, _ := c.QueryContent(ma)
assert.Equal(t, model.StatusRevoked, got.Status)
})
t.Run("集级下架与恢复", func(t *testing.T) {
c := newClient(t)
_, err := c.IssueMA(RoleRegulator, issueReq(ma, ctid, fh))
require.NoError(t, err)
require.NoError(t, c.RevokeEpisode(RoleRegulator, ma, 2, "违规集"))
eps, _ := c.ListEpisodes(ma)
for _, e := range eps {
if e.Episode == 2 {
assert.True(t, e.Revoked)
}
}
require.NoError(t, c.RestoreEpisode(RoleRegulator, ma, 2))
eps, _ = c.ListEpisodes(ma)
for _, e := range eps {
if e.Episode == 2 {
assert.False(t, e.Revoked)
}
}
})
t.Run("状态流转与按状态列举", func(t *testing.T) {
c := newClient(t)
_, err := c.IssueMA(RoleRegulator, issueReq(ma, ctid, fh))
require.NoError(t, err)
require.NoError(t, c.SetContentStatus(ma, model.StatusPublished))
pub, err := c.ListContents(model.StatusPublished)
require.NoError(t, err)
assert.Len(t, pub, 1)
})
}
// TestMemoryChain_Conformance 让内存实现跑契约套件(始终运行)。
func TestMemoryChain_Conformance(t *testing.T) {
RunClientConformance(t, func(t *testing.T) Client {
return NewMemoryChain()
})
}
@@ -0,0 +1,36 @@
package chain
import (
"database/sql"
"os"
"testing"
_ "github.com/lib/pq"
"github.com/stretchr/testify/require"
)
// TestPersistentChain_Conformance 让 PG 持久化实现跑同一套契约套件。
//
// 需要一个可写的测试库:设置 TCS_TEST_PG_DSN 后运行,例如
//
// TCS_TEST_PG_DSN='postgres://postgres@localhost:5432/tcs_iptv_test?sslmode=disable' go test ./internal/chain/
//
// 未设置则跳过(不污染开发库)。每个子用例前清空镜像表,保证干净状态。
func TestPersistentChain_Conformance(t *testing.T) {
dsn := os.Getenv("TCS_TEST_PG_DSN")
if dsn == "" {
t.Skip("未设置 TCS_TEST_PG_DSN,跳过 PG 契约测试")
}
db, err := sql.Open("postgres", dsn)
require.NoError(t, err)
require.NoError(t, db.Ping())
t.Cleanup(func() { _ = db.Close() })
RunClientConformance(t, func(t *testing.T) Client {
_, err := db.Exec(`TRUNCATE content_registry, hash_binding, identity_mapping, version_history, chain_tx CASCADE`)
require.NoError(t, err)
pc, err := NewPersistentChain(db)
require.NoError(t, err)
return pc
})
}
+7
View File
@@ -12,6 +12,10 @@ type Config struct {
HashAddr string HashAddr string
PostgresDSN string PostgresDSN string
RedisAddr string RedisAddr string
// ChainBackend 选择链实现:memory(纯内存)| pg(内存+PG镜像)| chainmaker(真实链,需 -tags chainmaker 构建)
ChainBackend string
// ChainMakerSDKConf ChainMaker Go SDK 配置文件路径(节点地址/TLS/组织证书),仅 chainmaker 后端使用
ChainMakerSDKConf string
} }
func getEnv(key, def string) string { func getEnv(key, def string) string {
@@ -29,5 +33,8 @@ func Load() Config {
HashAddr: getEnv("TCS_HASH_ADDR", ":8082"), HashAddr: getEnv("TCS_HASH_ADDR", ":8082"),
PostgresDSN: getEnv("TCS_POSTGRES_DSN", "postgres://postgres@localhost:5432/tcs_iptv?sslmode=disable"), PostgresDSN: getEnv("TCS_POSTGRES_DSN", "postgres://postgres@localhost:5432/tcs_iptv?sslmode=disable"),
RedisAddr: getEnv("TCS_REDIS_ADDR", "localhost:6379"), RedisAddr: getEnv("TCS_REDIS_ADDR", "localhost:6379"),
// 默认 pg:PG 可用则内存+镜像持久化,不可用自动回退内存(见 api-svc 装配)
ChainBackend: getEnv("TCS_CHAIN_BACKEND", "pg"),
ChainMakerSDKConf: getEnv("TCS_CHAINMAKER_SDK_CONF", "deploy/chainmaker/sdk_config.yml"),
} }
} }