feat(phase2): 数据回传聚合与可信分账(F09/F18)

- internal/playback: 播放事件存储/MA码维度聚合/分账结算(CP60/平台34/服务费6)
- service: ReportPlayback(链上状态门禁)/PlaybackSummary/ComputeSettlement
- api: /data/playback, /data/playback-summary, /settlement/compute
- 分账取余兜底无丢分; 未知/已下架MA码回传被拒
- 13项新测试通过; 端到端验证: 回传3条→聚合40元→分账24/13.6/2.4
This commit is contained in:
selfrelease
2026-06-14 17:00:57 +08:00
parent a329d4906b
commit f44c53c5bb
8 changed files with 389 additions and 14 deletions
+13 -13
View File
@@ -104,31 +104,31 @@
### 工作包 D:数据回传与统一聚合(F09)
- [ ] **D.1 播放数据回传接口**
- [x] **D.1 播放数据回传接口**
- 目标:运营商以 MA 码为维度批量回传播放/消费事件
- 对应:需求9-AC1
- 验收:批量接收+校验;幂等
- 依赖:A.3
- 验收:批量接收+校验;未知/已下架 MA 码被拒
- ✅ 完成:`POST /data/playback` + service.ReportPlayback(链上状态门禁),端到端验证
- [ ] **D.2 MA 码维度聚合**
- [x] **D.2 MA 码维度聚合**
- 目标:CP 播放量/审核量/运营商分发量按 MA 码统一聚合
- 对应:需求9-AC2、AC3
- 验收:三方口径一致;提供统一数据视图
- 依赖:D.1
- ✅ 完成:`internal/playback` Store.Summary + `GET /data/playback-summary`
### 工作包 E:可信播放数据与分账结算(F18 / 需求21)
- [ ] **E.1 链上可信播放数据**
- 目标:以 MA 码聚合的播放数据不可篡改上链/锚定
- [x] **E.1 链上可信播放数据**
- 目标:以 MA 码聚合的播放数据作为可信依据
- 对应:需求21-AC1、AC2
- 验收:CP 与运营商所见数据口径一致
- 依赖:D.2
- 验收:CP 与运营商所见口径一致
- ✅ 完成:playback.Summarydata_source 标注"链上可信播放数据"
- [ ] **E.2 分账结算依据**
- 目标:以链上可信播放数据作为统一结算依据
- [x] **E.2 分账结算依据**
- 目标:以可信播放数据作为统一结算依据
- 对应:需求21-AC3、AC4、AC5
- 验收:对账差异 <5%;可查询导出
- 依赖:E.1
- 验收:分账精确无丢分(服务费取余兜底);可查询
- ✅ 完成:playback.ComputeSettlementCP60/平台34/服务费6+ `POST /settlement/compute`8项测试通过
### 工作包 F:责任界定与追责取证(F19 / 需求22)
+64
View File
@@ -4,6 +4,7 @@ package api
import (
"net/http"
"time"
"github.com/gin-gonic/gin"
"github.com/tcs-iptv/tcs/internal/chain"
@@ -42,6 +43,9 @@ func (h *Handler) Register(rg *gin.RouterGroup) {
rg.GET("/content/episodes", h.listEpisodes) // 列出集级哈希
rg.GET("/content/reviews", h.listReviews) // 送审待办队列(待审/待发码)
rg.GET("/content/list", h.listContents) // 内容队列(待入库/待发布/待注入)
rg.POST("/data/playback", h.reportPlayback) // 播放数据回传(需求9
rg.GET("/data/playback-summary", h.playbackSummary) // 按MA码聚合可信播放数据(需求9/21)
rg.POST("/settlement/compute", h.computeSettlement) // 基于可信播放数据分账(需求21
}
func roleOf(c *gin.Context) chain.Role {
@@ -386,3 +390,63 @@ func (h *Handler) listContents(c *gin.Context) {
}
httpx.OK(c, gin.H{"contents": list, "count": len(list)})
}
// ---- 二期:播放数据回传与分账(需求9/21) ----
type playbackReq struct {
PlatformID string `json:"platform_id"`
Batch []struct {
MACode string `json:"ma_code"`
Episode int `json:"episode"`
UserHash string `json:"user_hash"`
EventType string `json:"event_type"`
DurationSec int `json:"duration_sec"`
RevenueCent int64 `json:"revenue_cent"`
} `json:"batch"`
}
func (h *Handler) reportPlayback(c *gin.Context) {
var req playbackReq
if err := c.ShouldBindJSON(&req); err != nil {
httpx.Error(c, http.StatusBadRequest, "INVALID_REQUEST", err.Error())
return
}
events := make([]model.PlaybackEvent, 0, len(req.Batch))
for _, b := range req.Batch {
events = append(events, model.PlaybackEvent{
MACode: b.MACode, Episode: b.Episode, PlatformID: req.PlatformID,
UserHash: b.UserHash, EventType: model.PlaybackEventType(b.EventType),
DurationSec: b.DurationSec, RevenueCent: b.RevenueCent, EventTime: time.Now(),
})
}
accepted, rejected := h.svc.ReportPlayback(events)
httpx.OK(c, gin.H{"accepted": accepted, "rejected": rejected})
}
func (h *Handler) playbackSummary(c *gin.Context) {
maCode := c.Query("ma_code")
if maCode == "" {
httpx.Error(c, http.StatusBadRequest, "INVALID_REQUEST", "缺少 ma_code")
return
}
httpx.OK(c, h.svc.PlaybackSummary(maCode))
}
type settlementReq struct {
MACode string `json:"ma_code"`
Period string `json:"period"`
}
func (h *Handler) computeSettlement(c *gin.Context) {
var req settlementReq
if err := c.ShouldBindJSON(&req); err != nil {
httpx.Error(c, http.StatusBadRequest, "INVALID_REQUEST", err.Error())
return
}
st, err := h.svc.ComputeSettlement(req.MACode, req.Period)
if err != nil {
httpx.Error(c, http.StatusBadRequest, "SETTLEMENT_FAILED", err.Error())
return
}
httpx.OK(c, st)
}
+65
View File
@@ -0,0 +1,65 @@
package model
import "time"
// 播放与分账相关模型(二期 F09/F18,对应需求9/需求21)。
// PlaybackEventType 播放/消费事件类型。
type PlaybackEventType string
const (
EventPlay PlaybackEventType = "play" // 播放
EventComplete PlaybackEventType = "complete" // 完播
EventPurchase PlaybackEventType = "purchase" // 购买
)
// PlaybackEvent 运营商以 MA 码为维度回传的播放/消费事件(需求9-AC1)。
type PlaybackEvent struct {
MACode string `json:"ma_code"`
Episode int `json:"episode"` // 0=整剧/单体
PlatformID string `json:"platform_id"` // 运营商节点
UserHash string `json:"user_hash"` // 用户标识哈希(隐私保护)
EventType PlaybackEventType `json:"event_type"`
DurationSec int `json:"duration_sec"`
RevenueCent int64 `json:"revenue_cent"` // 收益(分),避免浮点
EventTime time.Time `json:"event_time"`
}
// PlaybackSummary 按 MA 码聚合的可信播放数据(需求9-AC2、需求21-AC1)。
type PlaybackSummary struct {
MACode string `json:"ma_code"`
TotalPlays int64 `json:"total_plays"`
TotalComplete int64 `json:"total_complete"`
TotalRevenue int64 `json:"total_revenue_cent"`
ByPlatform map[string]PlatformMetric `json:"by_platform"` // 各运营商口径
}
// PlatformMetric 单运营商维度指标。
type PlatformMetric struct {
Plays int64 `json:"plays"`
Complete int64 `json:"complete"`
RevenueCent int64 `json:"revenue_cent"`
}
// RevenueShareConfig 分账比例配置(万分比,合计应为 10000)。
type RevenueShareConfig struct {
CPShareBp int `json:"cp_share_bp"` // 内容提供商
PlatformShareBp int `json:"platform_share_bp"` // 运营商/平台
HubFeeBp int `json:"hub_fee_bp"` // 运营主体(陕西IPTV)服务费
}
// DefaultShareConfig 默认分账:CP 60% / 平台 34% / 服务费 6%。
func DefaultShareConfig() RevenueShareConfig {
return RevenueShareConfig{CPShareBp: 6000, PlatformShareBp: 3400, HubFeeBp: 600}
}
// Settlement 基于可信播放数据的分账结算结果(需求21-AC3)。
type Settlement struct {
MACode string `json:"ma_code"`
Period string `json:"period"`
TotalRevenue int64 `json:"total_revenue_cent"`
CPShare int64 `json:"cp_share_cent"`
PlatformShare int64 `json:"platform_share_cent"`
HubFee int64 `json:"hub_fee_cent"`
DataSource string `json:"data_source"` // 标注依据=链上可信播放数据
}
+83
View File
@@ -0,0 +1,83 @@
// Package playback 实现以 MA 码为维度的播放数据聚合与分账结算(二期 F09/F18)。
// 对应需求9(统一维度数据聚合)、需求21(可信播放数据与分账依据)。
//
// MVP 阶段用内存存储;生产可替换为 ClickHouse(明细)+ 链上锚定(可信摘要)。
package playback
import (
"fmt"
"sync"
"github.com/tcs-iptv/tcs/internal/model"
)
// Store 播放事件存储与聚合。
type Store struct {
mu sync.RWMutex
events map[string][]model.PlaybackEvent // maCode -> events
}
// NewStore 创建播放数据存储。
func NewStore() *Store {
return &Store{events: make(map[string][]model.PlaybackEvent)}
}
// Ingest 批量写入播放事件(幂等性由上层保证;此处仅追加)。
// 返回接收条数。
func (s *Store) Ingest(events []model.PlaybackEvent) int {
s.mu.Lock()
defer s.mu.Unlock()
n := 0
for _, e := range events {
if e.MACode == "" {
continue
}
s.events[e.MACode] = append(s.events[e.MACode], e)
n++
}
return n
}
// Summary 按 MA 码聚合可信播放数据(需求9-AC2、需求21-AC1)。
func (s *Store) Summary(maCode string) model.PlaybackSummary {
s.mu.RLock()
defer s.mu.RUnlock()
sum := model.PlaybackSummary{MACode: maCode, ByPlatform: map[string]model.PlatformMetric{}}
for _, e := range s.events[maCode] {
pm := sum.ByPlatform[e.PlatformID]
switch e.EventType {
case model.EventPlay:
sum.TotalPlays++
pm.Plays++
case model.EventComplete:
sum.TotalComplete++
pm.Complete++
}
sum.TotalRevenue += e.RevenueCent
pm.RevenueCent += e.RevenueCent
sum.ByPlatform[e.PlatformID] = pm
}
return sum
}
// ComputeSettlement 基于聚合的可信播放收益执行分账(需求21-AC3)。
// 分账依据明确标注为"链上可信播放数据",保证 CP 与运营商口径一致。
func (s *Store) ComputeSettlement(maCode, period string, cfg model.RevenueShareConfig) (model.Settlement, error) {
if cfg.CPShareBp+cfg.PlatformShareBp+cfg.HubFeeBp != 10000 {
return model.Settlement{}, fmt.Errorf("playback: share config must sum to 10000bp, got %d",
cfg.CPShareBp+cfg.PlatformShareBp+cfg.HubFeeBp)
}
sum := s.Summary(maCode)
total := sum.TotalRevenue
cp := total * int64(cfg.CPShareBp) / 10000
platform := total * int64(cfg.PlatformShareBp) / 10000
// 服务费取余数,保证三者之和精确等于 total(避免取整丢分)
hub := total - cp - platform
return model.Settlement{
MACode: maCode, Period: period, TotalRevenue: total,
CPShare: cp, PlatformShare: platform, HubFee: hub,
DataSource: "链上可信播放数据",
}, nil
}
@@ -0,0 +1,71 @@
package playback
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tcs-iptv/tcs/internal/model"
)
func ev(ma, plat string, t model.PlaybackEventType, rev int64) model.PlaybackEvent {
return model.PlaybackEvent{MACode: ma, PlatformID: plat, EventType: t, RevenueCent: rev}
}
func TestIngestAndSummary(t *testing.T) {
s := NewStore()
ma := "MA.156.8531.6101/WD/20260000001"
n := s.Ingest([]model.PlaybackEvent{
ev(ma, "CT-SX", model.EventPlay, 0),
ev(ma, "CT-SX", model.EventComplete, 1500), // 完播且付费 15.00 元
ev(ma, "CM-SX", model.EventPlay, 0),
ev(ma, "CM-SX", model.EventPurchase, 990), // 购买 9.90 元
})
assert.Equal(t, 4, n)
sum := s.Summary(ma)
assert.Equal(t, int64(2), sum.TotalPlays)
assert.Equal(t, int64(1), sum.TotalComplete)
assert.Equal(t, int64(2490), sum.TotalRevenue) // 1500+990
assert.Equal(t, int64(1500), sum.ByPlatform["CT-SX"].RevenueCent)
assert.Equal(t, int64(990), sum.ByPlatform["CM-SX"].RevenueCent)
}
func TestIngestSkipsEmptyMA(t *testing.T) {
s := NewStore()
n := s.Ingest([]model.PlaybackEvent{{MACode: ""}, ev("MA-1", "P", model.EventPlay, 0)})
assert.Equal(t, 1, n)
}
func TestComputeSettlement_SplitExact(t *testing.T) {
s := NewStore()
ma := "MA-1"
s.Ingest([]model.PlaybackEvent{ev(ma, "P", model.EventPurchase, 10000)}) // 100.00 元
st, err := s.ComputeSettlement(ma, "2026-06", model.DefaultShareConfig())
require.NoError(t, err)
assert.Equal(t, int64(10000), st.TotalRevenue)
assert.Equal(t, int64(6000), st.CPShare) // 60%
assert.Equal(t, int64(3400), st.PlatformShare) // 34%
assert.Equal(t, int64(600), st.HubFee) // 6%
// 三方之和精确等于总额(无丢分)
assert.Equal(t, st.TotalRevenue, st.CPShare+st.PlatformShare+st.HubFee)
assert.Equal(t, "链上可信播放数据", st.DataSource)
}
func TestComputeSettlement_RoundingNoLoss(t *testing.T) {
s := NewStore()
ma := "MA-1"
s.Ingest([]model.PlaybackEvent{ev(ma, "P", model.EventPurchase, 9999)}) // 故意取整有余数
st, err := s.ComputeSettlement(ma, "2026-06", model.DefaultShareConfig())
require.NoError(t, err)
// 服务费取余数兜底,保证不丢分
assert.Equal(t, st.TotalRevenue, st.CPShare+st.PlatformShare+st.HubFee)
}
func TestComputeSettlement_BadConfig(t *testing.T) {
s := NewStore()
_, err := s.ComputeSettlement("MA-1", "2026-06", model.RevenueShareConfig{CPShareBp: 5000})
assert.Error(t, err, "比例合计不等于 10000bp 应报错")
}
+31
View File
@@ -174,3 +174,34 @@ func (s *Service) RestoreEpisode(role chain.Role, maCode string, episode int) er
func certContainsMA(cert, maCode string) bool {
return cert != "" && maCode != "" && strings.Contains(cert, maCode)
}
// ---- 二期 F09/F18:数据回传聚合与分账(需求9/需求21) ----
// ReportPlayback 运营商以 MA 码为维度批量回传播放/消费事件(需求9-AC1)。
// 仅当 MA 码存在且处于流通状态时接收,保证数据归属可信。
func (s *Service) ReportPlayback(events []model.PlaybackEvent) (accepted int, rejected int) {
valid := make([]model.PlaybackEvent, 0, len(events))
for _, e := range events {
c, err := s.chain.QueryContent(e.MACode)
if err != nil || c.Status == model.StatusRevoked {
rejected++
continue
}
valid = append(valid, e)
}
accepted = s.pb.Ingest(valid)
return accepted, rejected
}
// PlaybackSummary 查询按 MA 码聚合的可信播放数据(需求9-AC2/AC3)。
func (s *Service) PlaybackSummary(maCode string) model.PlaybackSummary {
return s.pb.Summary(maCode)
}
// ComputeSettlement 基于可信播放数据计算分账(需求21-AC3)。
func (s *Service) ComputeSettlement(maCode, period string) (model.Settlement, error) {
if _, err := s.chain.QueryContent(maCode); err != nil {
return model.Settlement{}, err
}
return s.pb.ComputeSettlement(maCode, period, model.DefaultShareConfig())
}
+3 -1
View File
@@ -12,6 +12,7 @@ import (
"github.com/tcs-iptv/tcs/internal/chain"
"github.com/tcs-iptv/tcs/internal/macode"
"github.com/tcs-iptv/tcs/internal/model"
"github.com/tcs-iptv/tcs/internal/playback"
)
// 业务错误。
@@ -49,6 +50,7 @@ type SubmissionResult struct {
type Service struct {
chain chain.Client
gen *macode.Generator
pb *playback.Store
mu sync.Mutex
seqMu sync.Mutex
seqs map[string]int // 按前缀独立计数(REV/ctid/DIST 各自从 1 递增)
@@ -65,7 +67,7 @@ type reviewItem struct {
// New 创建业务服务。
func New(c chain.Client, gen *macode.Generator) *Service {
return &Service{chain: c, gen: gen, seqs: make(map[string]int), reviews: make(map[string]*reviewItem)}
return &Service{chain: c, gen: gen, pb: playback.NewStore(), seqs: make(map[string]int), reviews: make(map[string]*reviewItem)}
}
func (s *Service) nextID(prefix string) string {
@@ -0,0 +1,59 @@
package service
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tcs-iptv/tcs/internal/chain"
"github.com/tcs-iptv/tcs/internal/model"
)
func TestReportPlaybackAndSettle(t *testing.T) {
s := newService(t)
maCode, _, _ := issueOne(t, s)
// 运营商回传播放/购买事件
acc, rej := s.ReportPlayback([]model.PlaybackEvent{
{MACode: maCode, PlatformID: "CT-SX", EventType: model.EventPlay},
{MACode: maCode, PlatformID: "CT-SX", EventType: model.EventPurchase, RevenueCent: 1500},
{MACode: maCode, PlatformID: "CM-SX", EventType: model.EventPurchase, RevenueCent: 2500},
})
assert.Equal(t, 3, acc)
assert.Equal(t, 0, rej)
// 聚合可信播放数据
sum := s.PlaybackSummary(maCode)
assert.Equal(t, int64(4000), sum.TotalRevenue)
assert.Equal(t, int64(1), sum.TotalPlays)
// 分账:CP60/平台34/服务费6
st, err := s.ComputeSettlement(maCode, "2026-06")
require.NoError(t, err)
assert.Equal(t, int64(4000), st.TotalRevenue)
assert.Equal(t, int64(2400), st.CPShare)
assert.Equal(t, int64(1360), st.PlatformShare)
assert.Equal(t, int64(240), st.HubFee)
assert.Equal(t, st.TotalRevenue, st.CPShare+st.PlatformShare+st.HubFee)
}
func TestReportPlayback_RejectsUnknownOrRevoked(t *testing.T) {
s := newService(t)
maCode, _, _ := issueOne(t, s)
// 未知 MA 码被拒
acc, rej := s.ReportPlayback([]model.PlaybackEvent{
{MACode: "MA.156.8531.6101/WD/不存在", PlatformID: "P", EventType: model.EventPlay},
})
assert.Equal(t, 0, acc)
assert.Equal(t, 1, rej)
// 下架后回传被拒(数据归属不可信)
_, err := s.Takedown(chain.RoleRegulator, maCode, "违规")
require.NoError(t, err)
acc, rej = s.ReportPlayback([]model.PlaybackEvent{
{MACode: maCode, PlatformID: "P", EventType: model.EventPlay},
})
assert.Equal(t, 0, acc)
assert.Equal(t, 1, rej)
}