# 第九章 异步消息流转设计 (Kafka) > 版本:V1.0 > 基于文档:《AIGC-Hub智视码(AVCC)体系建设方案 V2.0》 本章详细定义了 AIGC-Hub 系统内部基于 Apache Kafka 的异步事件驱动架构,包括 Topic 规划、消息体契约(Payload Schema)、生产者与消费者职责,以及可靠性保证机制。 --- ## 9.1 Topic 规划总览 | Topic 名称 | 用途说明 | 分区数(建议) | 数据保留期 | |------------|----------|--------------|------------| | `topic.avcc.apply.created` | 赋码申请创建(预检通过后触发) | 6 | 7 天 | | `topic.review.task.finished` | 审核任务完成(AI/人工审核出具结果后触发) | 12 | 7 天 | | `topic.chain.tx.pending` | 准备上链事件(存证、分账指令) | 6 | 7 天 | | `topic.chain.tx.confirmed` | 链上交易确认回执 | 6 | 30 天 | | `topic.playback.event` | 平台播放与行为数据回传 | 24 | 3 天 | | `topic.system.dlq` | 死信队列(所有消费者重试失败后路由至此) | 3 | 永久(人工处理) | --- ## 9.2 核心事件契约定义 (Event Schema) 所有消息均采用 JSON 格式,统一包含外层 Envelope: ```json { "event_id": "uuid", // 事件唯一标识,用于幂等 "timestamp": 1717312200000, // 毫秒时间戳 "source": "code-svc", // 发送方服务 "payload": { ... } // 业务数据 } ``` ### 9.2.1 赋码申请创建事件 - **Topic**: `topic.avcc.apply.created` - **生产者**: `code-svc` (赋码引擎) - **消费者**: `review-svc` (审核引擎) **Payload 定义:** ```json { "request_id": "req_20260602_001", "avcc_record_id": 10042, "content_url": "s3://aigc-hub-storage/works/raw/...", "content_hash": "sha256:abc123def456...", "ai_tool_info": { "model_name": "Stable-Diffusion-XL", "model_version": "v1.0" }, "priority": 5 } ``` ### 9.2.2 审核任务完成事件 - **Topic**: `topic.review.task.finished` - **生产者**: `review-svc` (审核引擎) - **消费者**: `chain-svc` (准备上链), `code-svc` (更新状态) **Payload 定义:** ```json { "request_id": "req_20260602_001", "avcc_record_id": 10042, "review_report_id": 5001, "decision": "approved", // approved / rejected / need_human "final_level": "G", // P / G / O "overall_score": 78.5, "report_hash": "sha256:789ghi..." } ``` ### 9.2.3 上链交易确认事件 - **Topic**: `topic.chain.tx.confirmed` - **生产者**: `chain-svc` (版权链引擎) - **消费者**: `code-svc` (生成AVCC), `settlement-svc` (更新清算状态) **Payload 定义:** ```json { "avcc_record_id": 10042, "chain_type": "copyright_main", // copyright_main / settlement "tx_hash": "0xabc123...", "block_height": 123456, "status": "success", "crd_address": "0x7f3e9a..." } ``` ### 9.2.4 播放回传事件 (海量数据) - **Topic**: `topic.playback.event` - **生产者**: `gateway-svc` (数据回传 API 接收后发送) - **消费者**: `clickhouse-sink` (存入时序数据库), `flink-job` (实时异常检测) **Payload 定义:** ```json { "avcc_code": "MA.156.10005.8361/...", "platform_node": "8361", "user_hash": "sha256:user123", "event_type": "play", // play / complete / purchase "duration_sec": 1800, "revenue_cny": 0.00 } ``` --- ## 9.3 可靠性与顺序保证机制 ### 9.3.1 消息投递语义 (Delivery Semantics) - **核心业务流**(申请、审核、上链):采用 **At-Least-Once**(至少一次)语义。 - 生产者配置:`acks=all`, `retries=Integer.MAX_VALUE`, `enable.idempotence=true`。 - 消费者处理必须保证幂等性(基于 DB 的 unique key 或 event_id 去重表)。 - **播放数据流**(海量统计):采用 **At-Most-Once / 高吞吐** 配置。 - 生产者配置:`acks=1`, `linger.ms=5`, `batch.size=64KB`,允许极小概率的数据丢失以换取极高吞吐。 ### 9.3.2 局部顺序保证 (Ordering) 同一个 AVCC 作品的事件需要保序处理(例如先创建、后审核、再上链)。 - **策略**:在发送 `topic.avcc.*`, `topic.review.*`, `topic.chain.*` 时,指定 Kafka 消息的 **Key = `avcc_record_id`**。 - **效果**:同一个作品的所有生命周期事件会被路由到同一个 Partition,确保同一作品的状态流转严格有序。 ### 9.3.3 失败重试与死信队列 (DLQ) 消费者处理失败(如依赖的第三方服务超时、数据库锁冲突)时的处理策略: 1. **内部重试**:消费者内部捕获非致命异常,执行指数退避重试(如 1s, 2s, 4s, 8s),最大重试 3 次。 2. **重试队列 (Retry Topic)**:若内部重试失败,将消息转发至 `topic.system.retry`,由专门的 Retry Consumer 延时消费(利用内存或单独逻辑处理延时)。 3. **死信队列 (DLQ)**:若重试队列依然失败,将其移入 `topic.system.dlq`,触发 P2 级告警。 4. **人工介入**:运营/开发人员通过后台界面查看 DLQ 消息,排查 bug 或脏数据后,可一键将消息重新注入原 Topic。 ### 9.3.4 消费堆积监控 为确保 SLA(如 O类作品秒级赋码),配置严格的 Consumer Lag 告警: - `topic.avcc.apply.created`:Lag > 1000 或持续时长 > 1分钟 -> 触发告警(自动扩容 `review-svc` Pod)。 - `topic.playback.event`:Lag > 50000 -> 触发告警(检查 ClickHouse 写入瓶颈)。