Files
MAcode/docs/09-异步消息流转设计.md
T
selfrelease a329d4906b init: AIGC-Hub/AVCC 方案文档 + TCS-IPTV 内容可信锁定系统 MVP
- 方案文档: AVCC 体系建设、IPTV TCS 需求(0-req)/PRD(1-prd)/任务(2-task)/二三四期任务
- tcs-iptv: Go 后端(哈希SDK/MA码生成/可信数据空间mock/业务编排/HTTP API+HMAC鉴权)
- web-console: React+AntD 监管大屏(角色工作台/全流程演示/监管片库)
- 一剧一码+集级哈希, 集级下架/恢复, 全栈测试通过
2026-06-14 16:50:31 +08:00

135 lines
5.3 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 第九章 异步消息流转设计 (Kafka)
> 版本:V1.0
> 基于文档:《AIGC-Hub智视码(AVCC)体系建设方案 V2.0》
本章详细定义了 AIGC-Hub 系统内部基于 Apache Kafka 的异步事件驱动架构,包括 Topic 规划、消息体契约(Payload Schema)、生产者与消费者职责,以及可靠性保证机制。
---
## 9.1 Topic 规划总览
| Topic 名称 | 用途说明 | 分区数(建议) | 数据保留期 |
|------------|----------|--------------|------------|
| `topic.avcc.apply.created` | 赋码申请创建(预检通过后触发) | 6 | 7 天 |
| `topic.review.task.finished` | 审核任务完成(AI/人工审核出具结果后触发) | 12 | 7 天 |
| `topic.chain.tx.pending` | 准备上链事件(存证、分账指令) | 6 | 7 天 |
| `topic.chain.tx.confirmed` | 链上交易确认回执 | 6 | 30 天 |
| `topic.playback.event` | 平台播放与行为数据回传 | 24 | 3 天 |
| `topic.system.dlq` | 死信队列(所有消费者重试失败后路由至此) | 3 | 永久(人工处理) |
---
## 9.2 核心事件契约定义 (Event Schema)
所有消息均采用 JSON 格式,统一包含外层 Envelope:
```json
{
"event_id": "uuid", // 事件唯一标识,用于幂等
"timestamp": 1717312200000, // 毫秒时间戳
"source": "code-svc", // 发送方服务
"payload": { ... } // 业务数据
}
```
### 9.2.1 赋码申请创建事件
- **Topic**: `topic.avcc.apply.created`
- **生产者**: `code-svc` (赋码引擎)
- **消费者**: `review-svc` (审核引擎)
**Payload 定义:**
```json
{
"request_id": "req_20260602_001",
"avcc_record_id": 10042,
"content_url": "s3://aigc-hub-storage/works/raw/...",
"content_hash": "sha256:abc123def456...",
"ai_tool_info": {
"model_name": "Stable-Diffusion-XL",
"model_version": "v1.0"
},
"priority": 5
}
```
### 9.2.2 审核任务完成事件
- **Topic**: `topic.review.task.finished`
- **生产者**: `review-svc` (审核引擎)
- **消费者**: `chain-svc` (准备上链), `code-svc` (更新状态)
**Payload 定义:**
```json
{
"request_id": "req_20260602_001",
"avcc_record_id": 10042,
"review_report_id": 5001,
"decision": "approved", // approved / rejected / need_human
"final_level": "G", // P / G / O
"overall_score": 78.5,
"report_hash": "sha256:789ghi..."
}
```
### 9.2.3 上链交易确认事件
- **Topic**: `topic.chain.tx.confirmed`
- **生产者**: `chain-svc` (版权链引擎)
- **消费者**: `code-svc` (生成AVCC), `settlement-svc` (更新清算状态)
**Payload 定义:**
```json
{
"avcc_record_id": 10042,
"chain_type": "copyright_main", // copyright_main / settlement
"tx_hash": "0xabc123...",
"block_height": 123456,
"status": "success",
"crd_address": "0x7f3e9a..."
}
```
### 9.2.4 播放回传事件 (海量数据)
- **Topic**: `topic.playback.event`
- **生产者**: `gateway-svc` (数据回传 API 接收后发送)
- **消费者**: `clickhouse-sink` (存入时序数据库), `flink-job` (实时异常检测)
**Payload 定义:**
```json
{
"avcc_code": "MA.156.10005.8361/...",
"platform_node": "8361",
"user_hash": "sha256:user123",
"event_type": "play", // play / complete / purchase
"duration_sec": 1800,
"revenue_cny": 0.00
}
```
---
## 9.3 可靠性与顺序保证机制
### 9.3.1 消息投递语义 (Delivery Semantics)
- **核心业务流**(申请、审核、上链):采用 **At-Least-Once**(至少一次)语义。
- 生产者配置:`acks=all`, `retries=Integer.MAX_VALUE`, `enable.idempotence=true`
- 消费者处理必须保证幂等性(基于 DB 的 unique key 或 event_id 去重表)。
- **播放数据流**(海量统计):采用 **At-Most-Once / 高吞吐** 配置。
- 生产者配置:`acks=1`, `linger.ms=5`, `batch.size=64KB`,允许极小概率的数据丢失以换取极高吞吐。
### 9.3.2 局部顺序保证 (Ordering)
同一个 AVCC 作品的事件需要保序处理(例如先创建、后审核、再上链)。
- **策略**:在发送 `topic.avcc.*`, `topic.review.*`, `topic.chain.*` 时,指定 Kafka 消息的 **Key = `avcc_record_id`**
- **效果**:同一个作品的所有生命周期事件会被路由到同一个 Partition,确保同一作品的状态流转严格有序。
### 9.3.3 失败重试与死信队列 (DLQ)
消费者处理失败(如依赖的第三方服务超时、数据库锁冲突)时的处理策略:
1. **内部重试**:消费者内部捕获非致命异常,执行指数退避重试(如 1s, 2s, 4s, 8s),最大重试 3 次。
2. **重试队列 (Retry Topic)**:若内部重试失败,将消息转发至 `topic.system.retry`,由专门的 Retry Consumer 延时消费(利用内存或单独逻辑处理延时)。
3. **死信队列 (DLQ)**:若重试队列依然失败,将其移入 `topic.system.dlq`,触发 P2 级告警。
4. **人工介入**:运营/开发人员通过后台界面查看 DLQ 消息,排查 bug 或脏数据后,可一键将消息重新注入原 Topic。
### 9.3.4 消费堆积监控
为确保 SLA(如 O类作品秒级赋码),配置严格的 Consumer Lag 告警:
- `topic.avcc.apply.created`Lag > 1000 或持续时长 > 1分钟 -> 触发告警(自动扩容 `review-svc` Pod)。
- `topic.playback.event`Lag > 50000 -> 触发告警(检查 ClickHouse 写入瓶颈)。