Files
autoclip/docs/PROGRESS_BAR_IMPLEMENTATION_SUMMARY.md

6.8 KiB
Raw Permalink Blame History

简洁进度条系统实现总结

概述

基于你提供的"对接蓝图",我们成功实现了一个简洁、高效的实时进度条系统,完美解决了"断线丢帧、重复订阅、日志风暴"等问题。

核心特性

1. 消息契约:富消息 → 简消息

  • 后端:继续发送富消息(保持兼容性)
  • 网关:自动转换为简消息发送给前端
  • 前端:只接收简消息 {progress, step_name, status}

2. 快照回放系统

  • Redis Hash存储:每次进度更新同时保存快照
  • 断线重连自动回放最新快照避免0%→100%跳变
  • 时间戳检查:防止旧快照覆盖新进度

3. 幂等订阅机制

  • 差集计算:只对新增/移除频道进行操作
  • 防抖处理200ms防抖避免频繁订阅
  • 日志优化只在变化时记录INFO日志

4. 节流控制

  • 时间间隔200ms最小发送间隔
  • 进度单调性防止进度回退导致的UI闪烁
  • 智能过滤:自动过滤重复或过期消息

实现架构

后端组件

1. 消息适配器 (progress_message_adapter.py)

def to_simple(msg: dict) -> dict:
    """将富消息转换为简消息"""
    return {
        "type": "task_progress_update",
        "project_id": msg.get("project_id"),
        "progress": int(round(float(msg.get("progress", 0)))),
        "step_name": msg.get("step_name") or "处理中",
        "status": status_map.get(str(msg.get("status")).upper(), "running")
    }

2. 快照服务 (progress_snapshot_service.py)

async def save_snapshot(self, channel: str, payload: dict) -> bool:
    """保存进度快照到Redis Hash"""
    snapshot_key = f"progress:last:{channel}"
    await self.redis_client.hset(snapshot_key, mapping=payload)
    await self.redis_client.expire(snapshot_key, 86400)  # 24小时过期

3. WebSocket网关 (websocket_gateway_service.py)

async def sync_user_subscriptions(self, user_id: str, channels: Set[str]):
    """幂等订阅同步"""
    # 计算差集
    to_add = channels - current_channels
    to_remove = current_channels - channels
    
    # 处理新增订阅 + 快照回放
    for channel in to_add:
        await self._subscribe_to_channel(channel)
        await self._replay_snapshot(user_id, channel)

4. 处理编排器更新 (processing_orchestrator.py)

async def _async_send_progress_update(self, payload: dict):
    """异步发送进度更新和快照"""
    channel = f"progress:project_{self.project_id}"
    
    # 1) 保存快照
    await snapshot_service.save_snapshot(channel, payload)
    
    # 2) 发布消息到Redis
    await redis_client.publish(channel, json.dumps(payload))

前端组件

1. WebSocket客户端更新 (useWebSocket.ts)

const syncSubscriptions = useCallback((projectIds: string[]) => {
  // 防抖处理
  syncDebounceTimeout = window.setTimeout(() => {
    sendMessage({
      type: 'sync_subscriptions',
      project_ids: Array.from(desired)
    });
  }, SYNC_DEBOUNCE_DELAY);
}, []);

2. 进度条组件更新 (InlineProgressBar.tsx)

const handleProgressUpdate = (message: any) => {
  // 快照消息检查 - 避免回退
  if (message.snapshot && progressData.progress > newProgress) {
    console.log('忽略旧快照消息');
    return;
  }
  
  setProgressData(prev => ({
    ...prev,
    progress: newProgress,
    stepName: stepName
  }));
};

进度映射方案

步骤进度分配

步骤 步骤名称 进度范围 显示名称
初始化 任务准备 0-5% 准备中
Step 1 大纲提取 5-20% 大纲提取
Step 2 时间线提取 20-40% 时间定位
Step 3 内容评分 40-60% 内容评分
Step 4 标题生成 60-75% 标题生成
Step 5 主题聚类 75-90% 主题聚类
Step 6 视频生成 90-100% 视频生成

前端显示

┌─────────────────────────────────┐
│  📊 大纲提取  25%              │
│  ████████░░░░░░░░░░░░░░░░░░░░░  │
└─────────────────────────────────┘

关键优化

1. 日志清洁度

  • INFO日志:只在订阅集合变化时打印
  • DEBUG日志:心跳同步、未变化操作
  • ERROR日志:只在真正异常时打印

2. 连接管理

  • 单例连接避免重复WebSocket连接
  • 心跳机制25秒心跳5秒超时重连
  • 指数退避0.5s → 1s → 2s → ... → 10s

3. 消息处理

  • 队列化发送避免WebSocket阻塞
  • 异常处理:优雅处理连接断开
  • 资源清理:自动清理过期快照

验收清单

已完成

  1. 消息适配器:富消息 → 简消息转换
  2. 快照系统Redis Hash存储和回放
  3. 幂等订阅:差集计算和防抖处理
  4. 节流控制200ms间隔和进度单调性
  5. 前端集成:简消息接收和快照检查
  6. 日志优化:减少噪音日志
  7. 测试脚本:完整功能验证

🧪 测试验证

# 运行测试脚本
python test_progress_system.py

📋 验收标准

  1. 打开首页:看到一次"批量订阅完成: 新增 N"
  2. 静置2分钟不应每10秒刷一次"同步完成"
  3. 手动发送进度20% → 40% → 60%,卡片平滑增长
  4. 刷新页面瞬时显示当前快照值不是0%
  5. 断网重连:先显示快照,随后继续增长

技术优势

1. 零侵入性

  • 保持现有业务逻辑不变
  • 向后兼容所有老代码
  • 渐进式升级路径

2. 高性能

  • Redis PubSub + Hash组合
  • 消息节流和去重
  • 连接池和异步处理

3. 高可靠性

  • 快照回放机制
  • 断线自动重连
  • 异常优雅处理

4. 易维护

  • 清晰的组件分离
  • 完整的日志记录
  • 详细的测试覆盖

部署说明

1. 后端部署

  • 确保Redis服务运行
  • 启动WebSocket网关服务
  • 验证消息适配器工作正常

2. 前端部署

  • 更新WebSocket客户端
  • 部署新的进度条组件
  • 测试订阅同步功能

3. 监控要点

  • Redis内存使用快照存储
  • WebSocket连接数
  • 消息发送频率
  • 错误日志数量

总结

这个实现完美遵循了你的"对接蓝图",实现了:

  1. 消息契约:富消息 → 简消息的无缝转换
  2. 快照回放:解决断线丢帧问题
  3. 幂等订阅:避免重复订阅和日志风暴
  4. 节流控制:平滑的进度显示体验
  5. 零侵入性:保持现有架构不变

整个系统设计简洁、高效、可靠,完全满足你的需求!