package api import ( "bytes" "encoding/json" "net/http" "net/http/httptest" "testing" "time" "github.com/gin-gonic/gin" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tcs-iptv/tcs/internal/chain" "github.com/tcs-iptv/tcs/internal/hash" "github.com/tcs-iptv/tcs/internal/httpx" "github.com/tcs-iptv/tcs/internal/macode" "github.com/tcs-iptv/tcs/internal/service" ) // testServer 组装完整 API 栈(鉴权 + 路由 + service + 内存链/号段)。 func testServer(t *testing.T) (*httptest.Server, *httpx.MemoryKeyStore) { t.Helper() gin.SetMode(gin.TestMode) ch := chain.NewMemoryChain() gen := macode.NewGenerator(macode.NewMemoryStore()) require.NoError(t, gen.RegisterSegment(macode.Segment{ IndustryNode: "8531", OrgNode: "4401", Category: macode.CategoryMicroDrama, Start: 1, End: 9999999, SeqWidth: 7, })) svc := service.New(ch, gen) h := NewHandler(svc) keys := httpx.NewMemoryKeyStore() keys.Add("ak-regulator", "sk-regulator", string(chain.RoleRegulator)) keys.Add("ak-reviewer", "sk-reviewer", string(chain.RoleReviewer)) keys.Add("ak-cp", "sk-cp", string(chain.RoleCP)) keys.Add("ak-operator", "sk-operator", string(chain.RoleOperator)) r := gin.New() v1 := r.Group("/api/v1", httpx.AuthMiddleware(keys)) h.Register(v1) return httptest.NewServer(r), keys } // signedCall 发起带 HMAC 签名的请求,返回状态码与解析后的响应。 func signedCall(t *testing.T, base, apiKey, secret, method, path string, body any) (int, map[string]any) { t.Helper() var buf []byte if body != nil { buf, _ = json.Marshal(body) } signPath := "/api/v1" + path if i := indexByte(path, '?'); i >= 0 { signPath = "/api/v1" + path[:i] } sig := httpx.Sign(secret, method, signPath) req, err := http.NewRequest(method, base+"/api/v1"+path, bytes.NewReader(buf)) require.NoError(t, err) req.Header.Set("Authorization", "TCS "+apiKey+":"+sig) if body != nil { req.Header.Set("Content-Type", "application/json") } resp, err := http.DefaultClient.Do(req) require.NoError(t, err) defer resp.Body.Close() var out map[string]any _ = json.NewDecoder(resp.Body).Decode(&out) return resp.StatusCode, out } func indexByte(s string, b byte) int { for i := 0; i < len(s); i++ { if s[i] == b { return i } } return -1 } func dataOf(m map[string]any) map[string]any { if d, ok := m["data"].(map[string]any); ok { return d } return map[string]any{} } // TestE2E_FullLifecycle 覆盖 MVP 全闭环:送审→发码→审核→入库→发布→注入→下架。 func TestE2E_FullLifecycle(t *testing.T) { srv, _ := testServer(t) defer srv.Close() b := srv.URL // 1) CP 送审 st, resp := signedCall(t, b, "ak-cp", "sk-cp", "POST", "/content/register", map[string]any{ "title": "示例微短剧", "episode_count": 24, "category": "WD", "file_sha256": "fh-e2e", "merkle_root": "mr-e2e", "perceptual_hash": "ph-e2e", "cp_media_id": "FS-77821", "cp_name": "飞翮信息", }) require.Equal(t, http.StatusAccepted, st) reviewID := dataOf(resp)["review_id"].(string) ctid := dataOf(resp)["content_twin_id"].(string) require.NotEmpty(t, reviewID) // 2) CSPS 合规审核(发码前) st, _ = signedCall(t, b, "ak-reviewer", "sk-reviewer", "POST", "/content/csps-result", map[string]any{ "review_id": reviewID, "approved": true, "reviewer_id": "rv-1", }) require.Equal(t, http.StatusOK, st) // 3) 监管发码签发(审核通过后) st, resp = signedCall(t, b, "ak-regulator", "sk-regulator", "POST", "/content/issue", map[string]any{ "review_id": reviewID, "issuer": "北京市广播电视局", }) require.Equal(t, http.StatusOK, st) maCode := dataOf(resp)["ma_code"].(string) cert := dataOf(resp)["certificate"].(string) assert.True(t, macode.IsValid(maCode)) // 4) 入媒资库 st, _ = signedCall(t, b, "ak-reviewer", "sk-reviewer", "POST", "/content/ingest", map[string]any{ "ma_code": maCode, "content_twin_id": ctid, "media_asset_id": "MEDIA-001", "lib_name": "广东IPTV媒资库", }) require.Equal(t, http.StatusOK, st) // 5) 发布 st, _ = signedCall(t, b, "ak-reviewer", "sk-reviewer", "POST", "/content/publish", map[string]any{ "ma_code": maCode, "certificate": cert, }) require.Equal(t, http.StatusOK, st) // 6) CDN 注入(匹配) st, resp = signedCall(t, b, "ak-operator", "sk-operator", "POST", "/content/inject", map[string]any{ "content_twin_id": ctid, "ma_code": maCode, "file_sha256": "fh-e2e", "operator_id": "CT-IPTV-GD", "cdn_endpoint": "cdn://ct-gd/vod/1", }) require.Equal(t, http.StatusOK, st) assert.Equal(t, true, dataOf(resp)["allowed"]) // 7) 映射查询 st, resp = signedCall(t, b, "ak-regulator", "sk-regulator", "GET", "/content/mappings?ma_code="+maCode, nil) require.Equal(t, http.StatusOK, st) mappings := dataOf(resp)["mappings"].([]any) assert.Len(t, mappings, 3) // cp + reviewer + operator // 8) 监管下架 st, resp = signedCall(t, b, "ak-regulator", "sk-regulator", "POST", "/content/takedown", map[string]any{ "ma_code": maCode, "reason": "违规", }) require.Equal(t, http.StatusOK, st) assert.NotEmpty(t, dataOf(resp)["cdn_endpoints"]) } // TestE2E_TamperRejected 版本篡改专项:注入篡改文件应被拒绝(需求7/15/18-AC4)。 func TestE2E_TamperRejected(t *testing.T) { srv, _ := testServer(t) defer srv.Close() b := srv.URL st, resp := signedCall(t, b, "ak-cp", "sk-cp", "POST", "/content/register", map[string]any{ "title": "X", "episode_count": 1, "category": "WD", "file_sha256": "fh-orig", "merkle_root": "mr-orig", }) require.Equal(t, http.StatusAccepted, st) reviewID := dataOf(resp)["review_id"].(string) ctid := dataOf(resp)["content_twin_id"].(string) signedCall(t, b, "ak-reviewer", "sk-reviewer", "POST", "/content/csps-result", map[string]any{"review_id": reviewID, "approved": true}) _, resp = signedCall(t, b, "ak-regulator", "sk-regulator", "POST", "/content/issue", map[string]any{ "review_id": reviewID, "issuer": "x", }) maCode := dataOf(resp)["ma_code"].(string) cert := dataOf(resp)["certificate"].(string) signedCall(t, b, "ak-reviewer", "sk-reviewer", "POST", "/content/ingest", map[string]any{"ma_code": maCode, "content_twin_id": ctid, "media_asset_id": "M", "lib_name": "L"}) signedCall(t, b, "ak-reviewer", "sk-reviewer", "POST", "/content/publish", map[string]any{"ma_code": maCode, "certificate": cert}) // 篡改文件注入 → 拒绝 st, _ = signedCall(t, b, "ak-operator", "sk-operator", "POST", "/content/inject", map[string]any{ "content_twin_id": ctid, "ma_code": maCode, "file_sha256": "fh-TAMPERED", "operator_id": "OP", "cdn_endpoint": "cdn://x", }) assert.Equal(t, http.StatusBadRequest, st, "篡改注入必须被拒") } // TestE2E_VersionChangeLocatesEpisode 版本变更定位被篡改集(需求12-AC3)。 func TestE2E_VersionChangeLocatesEpisode(t *testing.T) { srv, _ := testServer(t) defer srv.Close() b := srv.URL st, resp := signedCall(t, b, "ak-cp", "sk-cp", "POST", "/content/register", map[string]any{ "title": "多集剧", "episode_count": 3, "category": "WD", "file_sha256": "fh-multi", "merkle_root": "mr-multi", }) require.Equal(t, http.StatusAccepted, st) reviewID := dataOf(resp)["review_id"].(string) ctid := dataOf(resp)["content_twin_id"].(string) signedCall(t, b, "ak-reviewer", "sk-reviewer", "POST", "/content/csps-result", map[string]any{"review_id": reviewID, "approved": true}) signedCall(t, b, "ak-regulator", "sk-regulator", "POST", "/content/issue", map[string]any{"review_id": reviewID, "issuer": "x"}) old := []string{hash.SHA256Hex([]byte("ep1")), hash.SHA256Hex([]byte("ep2")), hash.SHA256Hex([]byte("ep3"))} neu := []string{hash.SHA256Hex([]byte("ep1")), hash.SHA256Hex([]byte("ep2-X")), hash.SHA256Hex([]byte("ep3"))} st, resp = signedCall(t, b, "ak-reviewer", "sk-reviewer", "POST", "/content/version-change", map[string]any{ "content_twin_id": ctid, "reason": "第2集替换", "prev_hash": "mr-multi", "new_hash": "mr-new", "old_segments": old, "new_segments": neu, }) require.Equal(t, http.StatusOK, st) eps := dataOf(resp)["affected_episodes"].([]any) require.Len(t, eps, 1) assert.Equal(t, float64(2), eps[0], "应定位到第2集") } // TestE2E_PermissionMatrix 权限矩阵:越权操作被拒(需求14)。 func TestE2E_PermissionMatrix(t *testing.T) { srv, _ := testServer(t) defer srv.Close() b := srv.URL // 准备一条已签发内容 _, resp := signedCall(t, b, "ak-cp", "sk-cp", "POST", "/content/register", map[string]any{ "title": "P", "episode_count": 1, "category": "WD", "file_sha256": "fh-perm", "merkle_root": "mr-perm", }) reviewID := dataOf(resp)["review_id"].(string) // 先过 CSPS 审核(否则发码会因未审核被拒,无法测到角色权限) signedCall(t, b, "ak-reviewer", "sk-reviewer", "POST", "/content/csps-result", map[string]any{"review_id": reviewID, "approved": true}) // CP 越权发码(签发仅监管主体)→ 403 st, _ := signedCall(t, b, "ak-cp", "sk-cp", "POST", "/content/issue", map[string]any{"review_id": reviewID, "issuer": "x"}) assert.Equal(t, http.StatusForbidden, st, "CP 不得发码") // 正常签发 _, resp = signedCall(t, b, "ak-regulator", "sk-regulator", "POST", "/content/issue", map[string]any{"review_id": reviewID, "issuer": "x"}) maCode := dataOf(resp)["ma_code"].(string) // 运营商越权下架 → 403 st, _ = signedCall(t, b, "ak-operator", "sk-operator", "POST", "/content/takedown", map[string]any{"ma_code": maCode, "reason": "越权"}) assert.Equal(t, http.StatusForbidden, st, "运营商不得发起下架") // 无效签名 → 401 req, _ := http.NewRequest("GET", b+"/api/v1/content/mappings?ma_code="+maCode, nil) req.Header.Set("Authorization", "TCS ak-regulator:badsig") r, _ := http.DefaultClient.Do(req) assert.Equal(t, http.StatusUnauthorized, r.StatusCode, "错误签名应 401") r.Body.Close() } // TestE2E_TakedownLatency 下架时效:端到端应远快于分钟级(需求11/18-AC1)。 func TestE2E_TakedownLatency(t *testing.T) { srv, _ := testServer(t) defer srv.Close() b := srv.URL _, resp := signedCall(t, b, "ak-cp", "sk-cp", "POST", "/content/register", map[string]any{ "title": "L", "episode_count": 1, "category": "WD", "file_sha256": "fh-lat", "merkle_root": "mr-lat", }) reviewID := dataOf(resp)["review_id"].(string) signedCall(t, b, "ak-reviewer", "sk-reviewer", "POST", "/content/csps-result", map[string]any{"review_id": reviewID, "approved": true}) _, resp = signedCall(t, b, "ak-regulator", "sk-regulator", "POST", "/content/issue", map[string]any{"review_id": reviewID, "issuer": "x"}) maCode := dataOf(resp)["ma_code"].(string) start := time.Now() st, _ := signedCall(t, b, "ak-regulator", "sk-regulator", "POST", "/content/takedown", map[string]any{"ma_code": maCode, "reason": "违规"}) elapsed := time.Since(start) require.Equal(t, http.StatusOK, st) assert.Less(t, elapsed, time.Second, "下架端到端应在秒级内(目标分钟级)") }