mirror of
https://github.com/zhouxiaoka/autoclip.git
synced 2026-05-06 22:13:00 +08:00
227 lines
6.8 KiB
Markdown
227 lines
6.8 KiB
Markdown
# 简洁进度条系统实现总结
|
||
|
||
## 概述
|
||
|
||
基于你提供的"对接蓝图",我们成功实现了一个简洁、高效的实时进度条系统,完美解决了"断线丢帧、重复订阅、日志风暴"等问题。
|
||
|
||
## 核心特性
|
||
|
||
### 1. 消息契约:富消息 → 简消息
|
||
- **后端**:继续发送富消息(保持兼容性)
|
||
- **网关**:自动转换为简消息发送给前端
|
||
- **前端**:只接收简消息 `{progress, step_name, status}`
|
||
|
||
### 2. 快照回放系统
|
||
- **Redis Hash存储**:每次进度更新同时保存快照
|
||
- **断线重连**:自动回放最新快照,避免0%→100%跳变
|
||
- **时间戳检查**:防止旧快照覆盖新进度
|
||
|
||
### 3. 幂等订阅机制
|
||
- **差集计算**:只对新增/移除频道进行操作
|
||
- **防抖处理**:200ms防抖,避免频繁订阅
|
||
- **日志优化**:只在变化时记录INFO日志
|
||
|
||
### 4. 节流控制
|
||
- **时间间隔**:200ms最小发送间隔
|
||
- **进度单调性**:防止进度回退导致的UI闪烁
|
||
- **智能过滤**:自动过滤重复或过期消息
|
||
|
||
## 实现架构
|
||
|
||
### 后端组件
|
||
|
||
#### 1. 消息适配器 (`progress_message_adapter.py`)
|
||
```python
|
||
def to_simple(msg: dict) -> dict:
|
||
"""将富消息转换为简消息"""
|
||
return {
|
||
"type": "task_progress_update",
|
||
"project_id": msg.get("project_id"),
|
||
"progress": int(round(float(msg.get("progress", 0)))),
|
||
"step_name": msg.get("step_name") or "处理中",
|
||
"status": status_map.get(str(msg.get("status")).upper(), "running")
|
||
}
|
||
```
|
||
|
||
#### 2. 快照服务 (`progress_snapshot_service.py`)
|
||
```python
|
||
async def save_snapshot(self, channel: str, payload: dict) -> bool:
|
||
"""保存进度快照到Redis Hash"""
|
||
snapshot_key = f"progress:last:{channel}"
|
||
await self.redis_client.hset(snapshot_key, mapping=payload)
|
||
await self.redis_client.expire(snapshot_key, 86400) # 24小时过期
|
||
```
|
||
|
||
#### 3. WebSocket网关 (`websocket_gateway_service.py`)
|
||
```python
|
||
async def sync_user_subscriptions(self, user_id: str, channels: Set[str]):
|
||
"""幂等订阅同步"""
|
||
# 计算差集
|
||
to_add = channels - current_channels
|
||
to_remove = current_channels - channels
|
||
|
||
# 处理新增订阅 + 快照回放
|
||
for channel in to_add:
|
||
await self._subscribe_to_channel(channel)
|
||
await self._replay_snapshot(user_id, channel)
|
||
```
|
||
|
||
#### 4. 处理编排器更新 (`processing_orchestrator.py`)
|
||
```python
|
||
async def _async_send_progress_update(self, payload: dict):
|
||
"""异步发送进度更新和快照"""
|
||
channel = f"progress:project_{self.project_id}"
|
||
|
||
# 1) 保存快照
|
||
await snapshot_service.save_snapshot(channel, payload)
|
||
|
||
# 2) 发布消息到Redis
|
||
await redis_client.publish(channel, json.dumps(payload))
|
||
```
|
||
|
||
### 前端组件
|
||
|
||
#### 1. WebSocket客户端更新 (`useWebSocket.ts`)
|
||
```typescript
|
||
const syncSubscriptions = useCallback((projectIds: string[]) => {
|
||
// 防抖处理
|
||
syncDebounceTimeout = window.setTimeout(() => {
|
||
sendMessage({
|
||
type: 'sync_subscriptions',
|
||
project_ids: Array.from(desired)
|
||
});
|
||
}, SYNC_DEBOUNCE_DELAY);
|
||
}, []);
|
||
```
|
||
|
||
#### 2. 进度条组件更新 (`InlineProgressBar.tsx`)
|
||
```typescript
|
||
const handleProgressUpdate = (message: any) => {
|
||
// 快照消息检查 - 避免回退
|
||
if (message.snapshot && progressData.progress > newProgress) {
|
||
console.log('忽略旧快照消息');
|
||
return;
|
||
}
|
||
|
||
setProgressData(prev => ({
|
||
...prev,
|
||
progress: newProgress,
|
||
stepName: stepName
|
||
}));
|
||
};
|
||
```
|
||
|
||
## 进度映射方案
|
||
|
||
### 步骤进度分配
|
||
| 步骤 | 步骤名称 | 进度范围 | 显示名称 |
|
||
|------|----------|----------|----------|
|
||
| **初始化** | 任务准备 | 0-5% | 准备中 |
|
||
| **Step 1** | 大纲提取 | 5-20% | 大纲提取 |
|
||
| **Step 2** | 时间线提取 | 20-40% | 时间定位 |
|
||
| **Step 3** | 内容评分 | 40-60% | 内容评分 |
|
||
| **Step 4** | 标题生成 | 60-75% | 标题生成 |
|
||
| **Step 5** | 主题聚类 | 75-90% | 主题聚类 |
|
||
| **Step 6** | 视频生成 | 90-100% | 视频生成 |
|
||
|
||
### 前端显示
|
||
```
|
||
┌─────────────────────────────────┐
|
||
│ 📊 大纲提取 25% │
|
||
│ ████████░░░░░░░░░░░░░░░░░░░░░ │
|
||
└─────────────────────────────────┘
|
||
```
|
||
|
||
## 关键优化
|
||
|
||
### 1. 日志清洁度
|
||
- **INFO日志**:只在订阅集合变化时打印
|
||
- **DEBUG日志**:心跳同步、未变化操作
|
||
- **ERROR日志**:只在真正异常时打印
|
||
|
||
### 2. 连接管理
|
||
- **单例连接**:避免重复WebSocket连接
|
||
- **心跳机制**:25秒心跳,5秒超时重连
|
||
- **指数退避**:0.5s → 1s → 2s → ... → 10s
|
||
|
||
### 3. 消息处理
|
||
- **队列化发送**:避免WebSocket阻塞
|
||
- **异常处理**:优雅处理连接断开
|
||
- **资源清理**:自动清理过期快照
|
||
|
||
## 验收清单
|
||
|
||
### ✅ 已完成
|
||
1. **消息适配器**:富消息 → 简消息转换
|
||
2. **快照系统**:Redis Hash存储和回放
|
||
3. **幂等订阅**:差集计算和防抖处理
|
||
4. **节流控制**:200ms间隔和进度单调性
|
||
5. **前端集成**:简消息接收和快照检查
|
||
6. **日志优化**:减少噪音日志
|
||
7. **测试脚本**:完整功能验证
|
||
|
||
### 🧪 测试验证
|
||
```bash
|
||
# 运行测试脚本
|
||
python test_progress_system.py
|
||
```
|
||
|
||
### 📋 验收标准
|
||
1. **打开首页**:看到一次"批量订阅完成: 新增 N"
|
||
2. **静置2分钟**:不应每10秒刷一次"同步完成"
|
||
3. **手动发送进度**:20% → 40% → 60%,卡片平滑增长
|
||
4. **刷新页面**:瞬时显示当前快照值,不是0%
|
||
5. **断网重连**:先显示快照,随后继续增长
|
||
|
||
## 技术优势
|
||
|
||
### 1. 零侵入性
|
||
- 保持现有业务逻辑不变
|
||
- 向后兼容所有老代码
|
||
- 渐进式升级路径
|
||
|
||
### 2. 高性能
|
||
- Redis PubSub + Hash组合
|
||
- 消息节流和去重
|
||
- 连接池和异步处理
|
||
|
||
### 3. 高可靠性
|
||
- 快照回放机制
|
||
- 断线自动重连
|
||
- 异常优雅处理
|
||
|
||
### 4. 易维护
|
||
- 清晰的组件分离
|
||
- 完整的日志记录
|
||
- 详细的测试覆盖
|
||
|
||
## 部署说明
|
||
|
||
### 1. 后端部署
|
||
- 确保Redis服务运行
|
||
- 启动WebSocket网关服务
|
||
- 验证消息适配器工作正常
|
||
|
||
### 2. 前端部署
|
||
- 更新WebSocket客户端
|
||
- 部署新的进度条组件
|
||
- 测试订阅同步功能
|
||
|
||
### 3. 监控要点
|
||
- Redis内存使用(快照存储)
|
||
- WebSocket连接数
|
||
- 消息发送频率
|
||
- 错误日志数量
|
||
|
||
## 总结
|
||
|
||
这个实现完美遵循了你的"对接蓝图",实现了:
|
||
|
||
1. **消息契约**:富消息 → 简消息的无缝转换
|
||
2. **快照回放**:解决断线丢帧问题
|
||
3. **幂等订阅**:避免重复订阅和日志风暴
|
||
4. **节流控制**:平滑的进度显示体验
|
||
5. **零侵入性**:保持现有架构不变
|
||
|
||
整个系统设计简洁、高效、可靠,完全满足你的需求!
|