mirror of
https://github.com/zhouxiaoka/autoclip.git
synced 2026-05-06 14:04:32 +08:00
6.8 KiB
6.8 KiB
简洁进度条系统实现总结
概述
基于你提供的"对接蓝图",我们成功实现了一个简洁、高效的实时进度条系统,完美解决了"断线丢帧、重复订阅、日志风暴"等问题。
核心特性
1. 消息契约:富消息 → 简消息
- 后端:继续发送富消息(保持兼容性)
- 网关:自动转换为简消息发送给前端
- 前端:只接收简消息
{progress, step_name, status}
2. 快照回放系统
- Redis Hash存储:每次进度更新同时保存快照
- 断线重连:自动回放最新快照,避免0%→100%跳变
- 时间戳检查:防止旧快照覆盖新进度
3. 幂等订阅机制
- 差集计算:只对新增/移除频道进行操作
- 防抖处理:200ms防抖,避免频繁订阅
- 日志优化:只在变化时记录INFO日志
4. 节流控制
- 时间间隔:200ms最小发送间隔
- 进度单调性:防止进度回退导致的UI闪烁
- 智能过滤:自动过滤重复或过期消息
实现架构
后端组件
1. 消息适配器 (progress_message_adapter.py)
def to_simple(msg: dict) -> dict:
"""将富消息转换为简消息"""
return {
"type": "task_progress_update",
"project_id": msg.get("project_id"),
"progress": int(round(float(msg.get("progress", 0)))),
"step_name": msg.get("step_name") or "处理中",
"status": status_map.get(str(msg.get("status")).upper(), "running")
}
2. 快照服务 (progress_snapshot_service.py)
async def save_snapshot(self, channel: str, payload: dict) -> bool:
"""保存进度快照到Redis Hash"""
snapshot_key = f"progress:last:{channel}"
await self.redis_client.hset(snapshot_key, mapping=payload)
await self.redis_client.expire(snapshot_key, 86400) # 24小时过期
3. WebSocket网关 (websocket_gateway_service.py)
async def sync_user_subscriptions(self, user_id: str, channels: Set[str]):
"""幂等订阅同步"""
# 计算差集
to_add = channels - current_channels
to_remove = current_channels - channels
# 处理新增订阅 + 快照回放
for channel in to_add:
await self._subscribe_to_channel(channel)
await self._replay_snapshot(user_id, channel)
4. 处理编排器更新 (processing_orchestrator.py)
async def _async_send_progress_update(self, payload: dict):
"""异步发送进度更新和快照"""
channel = f"progress:project_{self.project_id}"
# 1) 保存快照
await snapshot_service.save_snapshot(channel, payload)
# 2) 发布消息到Redis
await redis_client.publish(channel, json.dumps(payload))
前端组件
1. WebSocket客户端更新 (useWebSocket.ts)
const syncSubscriptions = useCallback((projectIds: string[]) => {
// 防抖处理
syncDebounceTimeout = window.setTimeout(() => {
sendMessage({
type: 'sync_subscriptions',
project_ids: Array.from(desired)
});
}, SYNC_DEBOUNCE_DELAY);
}, []);
2. 进度条组件更新 (InlineProgressBar.tsx)
const handleProgressUpdate = (message: any) => {
// 快照消息检查 - 避免回退
if (message.snapshot && progressData.progress > newProgress) {
console.log('忽略旧快照消息');
return;
}
setProgressData(prev => ({
...prev,
progress: newProgress,
stepName: stepName
}));
};
进度映射方案
步骤进度分配
| 步骤 | 步骤名称 | 进度范围 | 显示名称 |
|---|---|---|---|
| 初始化 | 任务准备 | 0-5% | 准备中 |
| Step 1 | 大纲提取 | 5-20% | 大纲提取 |
| Step 2 | 时间线提取 | 20-40% | 时间定位 |
| Step 3 | 内容评分 | 40-60% | 内容评分 |
| Step 4 | 标题生成 | 60-75% | 标题生成 |
| Step 5 | 主题聚类 | 75-90% | 主题聚类 |
| Step 6 | 视频生成 | 90-100% | 视频生成 |
前端显示
┌─────────────────────────────────┐
│ 📊 大纲提取 25% │
│ ████████░░░░░░░░░░░░░░░░░░░░░ │
└─────────────────────────────────┘
关键优化
1. 日志清洁度
- INFO日志:只在订阅集合变化时打印
- DEBUG日志:心跳同步、未变化操作
- ERROR日志:只在真正异常时打印
2. 连接管理
- 单例连接:避免重复WebSocket连接
- 心跳机制:25秒心跳,5秒超时重连
- 指数退避:0.5s → 1s → 2s → ... → 10s
3. 消息处理
- 队列化发送:避免WebSocket阻塞
- 异常处理:优雅处理连接断开
- 资源清理:自动清理过期快照
验收清单
✅ 已完成
- 消息适配器:富消息 → 简消息转换
- 快照系统:Redis Hash存储和回放
- 幂等订阅:差集计算和防抖处理
- 节流控制:200ms间隔和进度单调性
- 前端集成:简消息接收和快照检查
- 日志优化:减少噪音日志
- 测试脚本:完整功能验证
🧪 测试验证
# 运行测试脚本
python test_progress_system.py
📋 验收标准
- 打开首页:看到一次"批量订阅完成: 新增 N"
- 静置2分钟:不应每10秒刷一次"同步完成"
- 手动发送进度:20% → 40% → 60%,卡片平滑增长
- 刷新页面:瞬时显示当前快照值,不是0%
- 断网重连:先显示快照,随后继续增长
技术优势
1. 零侵入性
- 保持现有业务逻辑不变
- 向后兼容所有老代码
- 渐进式升级路径
2. 高性能
- Redis PubSub + Hash组合
- 消息节流和去重
- 连接池和异步处理
3. 高可靠性
- 快照回放机制
- 断线自动重连
- 异常优雅处理
4. 易维护
- 清晰的组件分离
- 完整的日志记录
- 详细的测试覆盖
部署说明
1. 后端部署
- 确保Redis服务运行
- 启动WebSocket网关服务
- 验证消息适配器工作正常
2. 前端部署
- 更新WebSocket客户端
- 部署新的进度条组件
- 测试订阅同步功能
3. 监控要点
- Redis内存使用(快照存储)
- WebSocket连接数
- 消息发送频率
- 错误日志数量
总结
这个实现完美遵循了你的"对接蓝图",实现了:
- 消息契约:富消息 → 简消息的无缝转换
- 快照回放:解决断线丢帧问题
- 幂等订阅:避免重复订阅和日志风暴
- 节流控制:平滑的进度显示体验
- 零侵入性:保持现有架构不变
整个系统设计简洁、高效、可靠,完全满足你的需求!