Files
MAcode/docs/09-异步消息流转设计.md
T
selfrelease a329d4906b init: AIGC-Hub/AVCC 方案文档 + TCS-IPTV 内容可信锁定系统 MVP
- 方案文档: AVCC 体系建设、IPTV TCS 需求(0-req)/PRD(1-prd)/任务(2-task)/二三四期任务
- tcs-iptv: Go 后端(哈希SDK/MA码生成/可信数据空间mock/业务编排/HTTP API+HMAC鉴权)
- web-console: React+AntD 监管大屏(角色工作台/全流程演示/监管片库)
- 一剧一码+集级哈希, 集级下架/恢复, 全栈测试通过
2026-06-14 16:50:31 +08:00

5.3 KiB
Raw Blame History

第九章 异步消息流转设计 (Kafka)

版本:V1.0
基于文档:《AIGC-Hub智视码(AVCC)体系建设方案 V2.0》

本章详细定义了 AIGC-Hub 系统内部基于 Apache Kafka 的异步事件驱动架构,包括 Topic 规划、消息体契约(Payload Schema)、生产者与消费者职责,以及可靠性保证机制。


9.1 Topic 规划总览

Topic 名称 用途说明 分区数(建议) 数据保留期
topic.avcc.apply.created 赋码申请创建(预检通过后触发) 6 7 天
topic.review.task.finished 审核任务完成(AI/人工审核出具结果后触发) 12 7 天
topic.chain.tx.pending 准备上链事件(存证、分账指令) 6 7 天
topic.chain.tx.confirmed 链上交易确认回执 6 30 天
topic.playback.event 平台播放与行为数据回传 24 3 天
topic.system.dlq 死信队列(所有消费者重试失败后路由至此) 3 永久(人工处理)

9.2 核心事件契约定义 (Event Schema)

所有消息均采用 JSON 格式,统一包含外层 Envelope:

{
  "event_id": "uuid",          // 事件唯一标识,用于幂等
  "timestamp": 1717312200000,  // 毫秒时间戳
  "source": "code-svc",        // 发送方服务
  "payload": { ... }           // 业务数据
}

9.2.1 赋码申请创建事件

  • Topic: topic.avcc.apply.created
  • 生产者: code-svc (赋码引擎)
  • 消费者: review-svc (审核引擎)

Payload 定义:

{
  "request_id": "req_20260602_001",
  "avcc_record_id": 10042,
  "content_url": "s3://aigc-hub-storage/works/raw/...",
  "content_hash": "sha256:abc123def456...",
  "ai_tool_info": {
    "model_name": "Stable-Diffusion-XL",
    "model_version": "v1.0"
  },
  "priority": 5
}

9.2.2 审核任务完成事件

  • Topic: topic.review.task.finished
  • 生产者: review-svc (审核引擎)
  • 消费者: chain-svc (准备上链), code-svc (更新状态)

Payload 定义:

{
  "request_id": "req_20260602_001",
  "avcc_record_id": 10042,
  "review_report_id": 5001,
  "decision": "approved",       // approved / rejected / need_human
  "final_level": "G",          // P / G / O
  "overall_score": 78.5,
  "report_hash": "sha256:789ghi..."
}

9.2.3 上链交易确认事件

  • Topic: topic.chain.tx.confirmed
  • 生产者: chain-svc (版权链引擎)
  • 消费者: code-svc (生成AVCC), settlement-svc (更新清算状态)

Payload 定义:

{
  "avcc_record_id": 10042,
  "chain_type": "copyright_main",  // copyright_main / settlement
  "tx_hash": "0xabc123...",
  "block_height": 123456,
  "status": "success",
  "crd_address": "0x7f3e9a..."
}

9.2.4 播放回传事件 (海量数据)

  • Topic: topic.playback.event
  • 生产者: gateway-svc (数据回传 API 接收后发送)
  • 消费者: clickhouse-sink (存入时序数据库), flink-job (实时异常检测)

Payload 定义:

{
  "avcc_code": "MA.156.10005.8361/...",
  "platform_node": "8361",
  "user_hash": "sha256:user123",
  "event_type": "play",      // play / complete / purchase
  "duration_sec": 1800,
  "revenue_cny": 0.00
}

9.3 可靠性与顺序保证机制

9.3.1 消息投递语义 (Delivery Semantics)

  • 核心业务流(申请、审核、上链):采用 At-Least-Once(至少一次)语义。
    • 生产者配置:acks=all, retries=Integer.MAX_VALUE, enable.idempotence=true
    • 消费者处理必须保证幂等性(基于 DB 的 unique key 或 event_id 去重表)。
  • 播放数据流(海量统计):采用 At-Most-Once / 高吞吐 配置。
    • 生产者配置:acks=1, linger.ms=5, batch.size=64KB,允许极小概率的数据丢失以换取极高吞吐。

9.3.2 局部顺序保证 (Ordering)

同一个 AVCC 作品的事件需要保序处理(例如先创建、后审核、再上链)。

  • 策略:在发送 topic.avcc.*, topic.review.*, topic.chain.* 时,指定 Kafka 消息的 Key = avcc_record_id
  • 效果:同一个作品的所有生命周期事件会被路由到同一个 Partition,确保同一作品的状态流转严格有序。

9.3.3 失败重试与死信队列 (DLQ)

消费者处理失败(如依赖的第三方服务超时、数据库锁冲突)时的处理策略:

  1. 内部重试:消费者内部捕获非致命异常,执行指数退避重试(如 1s, 2s, 4s, 8s),最大重试 3 次。
  2. 重试队列 (Retry Topic):若内部重试失败,将消息转发至 topic.system.retry,由专门的 Retry Consumer 延时消费(利用内存或单独逻辑处理延时)。
  3. 死信队列 (DLQ):若重试队列依然失败,将其移入 topic.system.dlq,触发 P2 级告警。
  4. 人工介入:运营/开发人员通过后台界面查看 DLQ 消息,排查 bug 或脏数据后,可一键将消息重新注入原 Topic。

9.3.4 消费堆积监控

为确保 SLA(如 O类作品秒级赋码),配置严格的 Consumer Lag 告警:

  • topic.avcc.apply.createdLag > 1000 或持续时长 > 1分钟 -> 触发告警(自动扩容 review-svc Pod)。
  • topic.playback.eventLag > 50000 -> 触发告警(检查 ClickHouse 写入瓶颈)。