a329d4906b
- 方案文档: AVCC 体系建设、IPTV TCS 需求(0-req)/PRD(1-prd)/任务(2-task)/二三四期任务 - tcs-iptv: Go 后端(哈希SDK/MA码生成/可信数据空间mock/业务编排/HTTP API+HMAC鉴权) - web-console: React+AntD 监管大屏(角色工作台/全流程演示/监管片库) - 一剧一码+集级哈希, 集级下架/恢复, 全栈测试通过
135 lines
5.3 KiB
Markdown
135 lines
5.3 KiB
Markdown
# 第九章 异步消息流转设计 (Kafka)
|
||
|
||
> 版本:V1.0
|
||
> 基于文档:《AIGC-Hub智视码(AVCC)体系建设方案 V2.0》
|
||
|
||
本章详细定义了 AIGC-Hub 系统内部基于 Apache Kafka 的异步事件驱动架构,包括 Topic 规划、消息体契约(Payload Schema)、生产者与消费者职责,以及可靠性保证机制。
|
||
|
||
---
|
||
|
||
## 9.1 Topic 规划总览
|
||
|
||
| Topic 名称 | 用途说明 | 分区数(建议) | 数据保留期 |
|
||
|------------|----------|--------------|------------|
|
||
| `topic.avcc.apply.created` | 赋码申请创建(预检通过后触发) | 6 | 7 天 |
|
||
| `topic.review.task.finished` | 审核任务完成(AI/人工审核出具结果后触发) | 12 | 7 天 |
|
||
| `topic.chain.tx.pending` | 准备上链事件(存证、分账指令) | 6 | 7 天 |
|
||
| `topic.chain.tx.confirmed` | 链上交易确认回执 | 6 | 30 天 |
|
||
| `topic.playback.event` | 平台播放与行为数据回传 | 24 | 3 天 |
|
||
| `topic.system.dlq` | 死信队列(所有消费者重试失败后路由至此) | 3 | 永久(人工处理) |
|
||
|
||
---
|
||
|
||
## 9.2 核心事件契约定义 (Event Schema)
|
||
|
||
所有消息均采用 JSON 格式,统一包含外层 Envelope:
|
||
|
||
```json
|
||
{
|
||
"event_id": "uuid", // 事件唯一标识,用于幂等
|
||
"timestamp": 1717312200000, // 毫秒时间戳
|
||
"source": "code-svc", // 发送方服务
|
||
"payload": { ... } // 业务数据
|
||
}
|
||
```
|
||
|
||
### 9.2.1 赋码申请创建事件
|
||
- **Topic**: `topic.avcc.apply.created`
|
||
- **生产者**: `code-svc` (赋码引擎)
|
||
- **消费者**: `review-svc` (审核引擎)
|
||
|
||
**Payload 定义:**
|
||
```json
|
||
{
|
||
"request_id": "req_20260602_001",
|
||
"avcc_record_id": 10042,
|
||
"content_url": "s3://aigc-hub-storage/works/raw/...",
|
||
"content_hash": "sha256:abc123def456...",
|
||
"ai_tool_info": {
|
||
"model_name": "Stable-Diffusion-XL",
|
||
"model_version": "v1.0"
|
||
},
|
||
"priority": 5
|
||
}
|
||
```
|
||
|
||
### 9.2.2 审核任务完成事件
|
||
- **Topic**: `topic.review.task.finished`
|
||
- **生产者**: `review-svc` (审核引擎)
|
||
- **消费者**: `chain-svc` (准备上链), `code-svc` (更新状态)
|
||
|
||
**Payload 定义:**
|
||
```json
|
||
{
|
||
"request_id": "req_20260602_001",
|
||
"avcc_record_id": 10042,
|
||
"review_report_id": 5001,
|
||
"decision": "approved", // approved / rejected / need_human
|
||
"final_level": "G", // P / G / O
|
||
"overall_score": 78.5,
|
||
"report_hash": "sha256:789ghi..."
|
||
}
|
||
```
|
||
|
||
### 9.2.3 上链交易确认事件
|
||
- **Topic**: `topic.chain.tx.confirmed`
|
||
- **生产者**: `chain-svc` (版权链引擎)
|
||
- **消费者**: `code-svc` (生成AVCC), `settlement-svc` (更新清算状态)
|
||
|
||
**Payload 定义:**
|
||
```json
|
||
{
|
||
"avcc_record_id": 10042,
|
||
"chain_type": "copyright_main", // copyright_main / settlement
|
||
"tx_hash": "0xabc123...",
|
||
"block_height": 123456,
|
||
"status": "success",
|
||
"crd_address": "0x7f3e9a..."
|
||
}
|
||
```
|
||
|
||
### 9.2.4 播放回传事件 (海量数据)
|
||
- **Topic**: `topic.playback.event`
|
||
- **生产者**: `gateway-svc` (数据回传 API 接收后发送)
|
||
- **消费者**: `clickhouse-sink` (存入时序数据库), `flink-job` (实时异常检测)
|
||
|
||
**Payload 定义:**
|
||
```json
|
||
{
|
||
"avcc_code": "MA.156.10005.8361/...",
|
||
"platform_node": "8361",
|
||
"user_hash": "sha256:user123",
|
||
"event_type": "play", // play / complete / purchase
|
||
"duration_sec": 1800,
|
||
"revenue_cny": 0.00
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 9.3 可靠性与顺序保证机制
|
||
|
||
### 9.3.1 消息投递语义 (Delivery Semantics)
|
||
- **核心业务流**(申请、审核、上链):采用 **At-Least-Once**(至少一次)语义。
|
||
- 生产者配置:`acks=all`, `retries=Integer.MAX_VALUE`, `enable.idempotence=true`。
|
||
- 消费者处理必须保证幂等性(基于 DB 的 unique key 或 event_id 去重表)。
|
||
- **播放数据流**(海量统计):采用 **At-Most-Once / 高吞吐** 配置。
|
||
- 生产者配置:`acks=1`, `linger.ms=5`, `batch.size=64KB`,允许极小概率的数据丢失以换取极高吞吐。
|
||
|
||
### 9.3.2 局部顺序保证 (Ordering)
|
||
同一个 AVCC 作品的事件需要保序处理(例如先创建、后审核、再上链)。
|
||
- **策略**:在发送 `topic.avcc.*`, `topic.review.*`, `topic.chain.*` 时,指定 Kafka 消息的 **Key = `avcc_record_id`**。
|
||
- **效果**:同一个作品的所有生命周期事件会被路由到同一个 Partition,确保同一作品的状态流转严格有序。
|
||
|
||
### 9.3.3 失败重试与死信队列 (DLQ)
|
||
消费者处理失败(如依赖的第三方服务超时、数据库锁冲突)时的处理策略:
|
||
1. **内部重试**:消费者内部捕获非致命异常,执行指数退避重试(如 1s, 2s, 4s, 8s),最大重试 3 次。
|
||
2. **重试队列 (Retry Topic)**:若内部重试失败,将消息转发至 `topic.system.retry`,由专门的 Retry Consumer 延时消费(利用内存或单独逻辑处理延时)。
|
||
3. **死信队列 (DLQ)**:若重试队列依然失败,将其移入 `topic.system.dlq`,触发 P2 级告警。
|
||
4. **人工介入**:运营/开发人员通过后台界面查看 DLQ 消息,排查 bug 或脏数据后,可一键将消息重新注入原 Topic。
|
||
|
||
### 9.3.4 消费堆积监控
|
||
为确保 SLA(如 O类作品秒级赋码),配置严格的 Consumer Lag 告警:
|
||
- `topic.avcc.apply.created`:Lag > 1000 或持续时长 > 1分钟 -> 触发告警(自动扩容 `review-svc` Pod)。
|
||
- `topic.playback.event`:Lag > 50000 -> 触发告警(检查 ClickHouse 写入瓶颈)。
|