完成项目问题修复和优化

主要改进:
1. 清理重复文件和冗余代码
   - 删除重复的API文件 (upload_backup.py, upload_new.py等)
   - 删除旧版本的B站服务文件 (bilibili_upload_v2.py到v6.py)
   - 统一B站服务接口

2. 优化数据存储架构
   - 创建OptimizedStorageService解决双重存储问题
   - 数据库只存储元数据,文件系统存储实际文件
   - 提供数据迁移脚本和一致性检查工具

3. 统一错误处理机制
   - 创建统一错误处理中间件
   - 提供错误处理装饰器和上下文管理器
   - 统一错误响应格式

4. 完善配置管理
   - 创建UnifiedConfig统一配置系统
   - 整合所有配置源(环境变量、配置文件、默认值)
   - 提供配置迁移脚本

5. 增强进度系统
   - 创建EnhancedProgressService
   - 支持Redis缓存、数据库持久化和内存缓存
   - 提供完整的进度跟踪和状态管理

新增文件:
- backend/services/optimized_storage_service.py
- backend/core/error_middleware.py
- backend/core/unified_config.py
- backend/services/enhanced_progress_service.py
- scripts/migrate_to_optimized_storage.py
- scripts/check_data_consistency.py
- scripts/migrate_config.py
- docs/ERROR_HANDLING_GUIDE.md
- docs/PROGRESS_SYSTEM_GUIDE.md

修复文件:
- backend/services/bilibili_service.py (统一上传接口)
- backend/main.py (使用新的错误处理中间件)
This commit is contained in:
Kris Ka
2025-09-15 12:07:09 +08:00
parent c67d6a8805
commit dc0cf66159
22 changed files with 4125 additions and 19 deletions

View File

@@ -5821,3 +5821,150 @@ bvid
2025-09-15 10:51:28,045 - backend.main - INFO - API密钥已加载到环境变量
2025-09-15 10:51:28,045 - backend.main - INFO - WebSocket网关服务已禁用使用新的简化进度系统
2025-09-15 10:51:44,642 - backend.services.websocket_notification_service - INFO - 项目更新通知已发送: 22ac1453-4f16-4c04-ad7e-1264e995530a - processing
2025-09-15 11:54:25,706 - backend.main - INFO - 正在关闭AutoClip API服务...
2025-09-15 11:54:25,706 - backend.main - INFO - WebSocket网关服务已禁用
2025-09-15 11:54:26,389 - backend.services.processing_orchestrator - INFO - 流水线模块导入成功
2025-09-15 11:54:26,654 - backend.services.simple_progress - INFO - Redis连接成功
2025-09-15 11:54:26,719 - backend.main - INFO - 启动AutoClip API服务...
2025-09-15 11:54:26,720 - backend.main - INFO - 数据库表创建完成
2025-09-15 11:54:26,720 - backend.main - INFO - API密钥已加载到环境变量
2025-09-15 11:54:26,720 - backend.main - INFO - WebSocket网关服务已禁用使用新的简化进度系统
2025-09-15 11:54:29,756 - backend.main - INFO - 正在关闭AutoClip API服务...
2025-09-15 11:54:29,756 - backend.main - INFO - WebSocket网关服务已禁用
2025-09-15 11:54:30,319 - backend.services.processing_orchestrator - INFO - 流水线模块导入成功
2025-09-15 11:54:30,671 - backend.services.processing_orchestrator - INFO - 流水线模块导入成功
2025-09-15 11:54:30,865 - backend.services.simple_progress - INFO - Redis连接成功
2025-09-15 11:54:30,938 - backend.main - INFO - 启动AutoClip API服务...
2025-09-15 11:54:30,939 - backend.main - INFO - 数据库表创建完成
2025-09-15 11:54:30,939 - backend.main - INFO - API密钥已加载到环境变量
2025-09-15 11:54:30,939 - backend.main - INFO - WebSocket网关服务已禁用使用新的简化进度系统
2025-09-15 11:54:45,006 - backend.main - INFO - 正在关闭AutoClip API服务...
2025-09-15 11:54:45,006 - backend.main - INFO - WebSocket网关服务已禁用
2025-09-15 11:54:45,498 - backend.services.processing_orchestrator - INFO - 流水线模块导入成功
2025-09-15 11:54:45,697 - backend.services.simple_progress - INFO - Redis连接成功
2025-09-15 11:54:45,763 - backend.main - INFO - 启动AutoClip API服务...
2025-09-15 11:54:45,764 - backend.main - INFO - 数据库表创建完成
2025-09-15 11:54:45,764 - backend.main - INFO - API密钥已加载到环境变量
2025-09-15 11:54:45,764 - backend.main - INFO - WebSocket网关服务已禁用使用新的简化进度系统
2025-09-15 11:54:48,999 - backend.main - INFO - 正在关闭AutoClip API服务...
2025-09-15 11:54:48,999 - backend.main - INFO - WebSocket网关服务已禁用
2025-09-15 11:54:49,589 - backend.services.processing_orchestrator - INFO - 流水线模块导入成功
2025-09-15 11:54:49,952 - backend.services.processing_orchestrator - INFO - 流水线模块导入成功
2025-09-15 11:54:50,345 - backend.services.processing_orchestrator - INFO - 流水线模块导入成功
2025-09-15 11:54:50,548 - backend.services.simple_progress - INFO - Redis连接成功
2025-09-15 11:54:50,621 - backend.main - INFO - 启动AutoClip API服务...
2025-09-15 11:54:50,622 - backend.main - INFO - 数据库表创建完成
2025-09-15 11:54:50,622 - backend.main - INFO - API密钥已加载到环境变量
2025-09-15 11:54:50,622 - backend.main - INFO - WebSocket网关服务已禁用使用新的简化进度系统
2025-09-15 11:55:27,886 - backend.main - INFO - 正在关闭AutoClip API服务...
2025-09-15 11:55:27,886 - backend.main - INFO - WebSocket网关服务已禁用
2025-09-15 11:55:28,425 - backend.services.processing_orchestrator - INFO - 流水线模块导入成功
2025-09-15 11:55:28,625 - backend.services.simple_progress - INFO - Redis连接成功
2025-09-15 11:55:28,697 - backend.main - INFO - 启动AutoClip API服务...
2025-09-15 11:55:28,698 - backend.main - INFO - 数据库表创建完成
2025-09-15 11:55:28,698 - backend.main - INFO - API密钥已加载到环境变量
2025-09-15 11:55:28,698 - backend.main - INFO - WebSocket网关服务已禁用使用新的简化进度系统
2025-09-15 11:55:40,316 - backend.main - INFO - 正在关闭AutoClip API服务...
2025-09-15 11:55:40,316 - backend.main - INFO - WebSocket网关服务已禁用
2025-09-15 11:55:40,754 - backend.services.processing_orchestrator - INFO - 流水线模块导入成功
2025-09-15 11:55:40,951 - backend.services.simple_progress - INFO - Redis连接成功
2025-09-15 11:55:41,017 - backend.main - INFO - 启动AutoClip API服务...
2025-09-15 11:55:41,018 - backend.main - INFO - 数据库表创建完成
2025-09-15 11:55:41,018 - backend.main - INFO - API密钥已加载到环境变量
2025-09-15 11:55:41,018 - backend.main - INFO - WebSocket网关服务已禁用使用新的简化进度系统
2025-09-15 11:55:58,492 - backend.main - INFO - 正在关闭AutoClip API服务...
2025-09-15 11:55:58,493 - backend.main - INFO - WebSocket网关服务已禁用
2025-09-15 11:55:58,944 - backend.services.processing_orchestrator - INFO - 流水线模块导入成功
2025-09-15 11:55:59,147 - backend.services.simple_progress - INFO - Redis连接成功
2025-09-15 11:55:59,214 - backend.main - INFO - 启动AutoClip API服务...
2025-09-15 11:55:59,215 - backend.main - INFO - 数据库表创建完成
2025-09-15 11:55:59,215 - backend.main - INFO - API密钥已加载到环境变量
2025-09-15 11:55:59,215 - backend.main - INFO - WebSocket网关服务已禁用使用新的简化进度系统
2025-09-15 11:56:06,115 - backend.main - INFO - 正在关闭AutoClip API服务...
2025-09-15 11:56:06,115 - backend.main - INFO - WebSocket网关服务已禁用
2025-09-15 11:56:06,577 - backend.services.processing_orchestrator - INFO - 流水线模块导入成功
2025-09-15 11:56:06,826 - backend.services.simple_progress - INFO - Redis连接成功
2025-09-15 11:56:06,916 - backend.main - INFO - 启动AutoClip API服务...
2025-09-15 11:56:06,917 - backend.main - INFO - 数据库表创建完成
2025-09-15 11:56:06,917 - backend.main - INFO - API密钥已加载到环境变量
2025-09-15 11:56:06,917 - backend.main - INFO - WebSocket网关服务已禁用使用新的简化进度系统
2025-09-15 11:56:09,765 - backend.main - INFO - 正在关闭AutoClip API服务...
2025-09-15 11:56:09,765 - backend.main - INFO - WebSocket网关服务已禁用
2025-09-15 11:56:10,406 - backend.services.processing_orchestrator - INFO - 流水线模块导入成功
2025-09-15 11:56:10,683 - backend.services.simple_progress - INFO - Redis连接成功
2025-09-15 11:56:10,758 - backend.main - INFO - 启动AutoClip API服务...
2025-09-15 11:56:10,759 - backend.main - INFO - 数据库表创建完成
2025-09-15 11:56:10,759 - backend.main - INFO - API密钥已加载到环境变量
2025-09-15 11:56:10,759 - backend.main - INFO - WebSocket网关服务已禁用使用新的简化进度系统
2025-09-15 11:57:16,837 - backend.main - INFO - 正在关闭AutoClip API服务...
2025-09-15 11:57:16,838 - backend.main - INFO - WebSocket网关服务已禁用
2025-09-15 11:57:17,489 - backend.services.processing_orchestrator - INFO - 流水线模块导入成功
2025-09-15 11:57:17,738 - backend.services.simple_progress - INFO - Redis连接成功
2025-09-15 11:57:17,826 - backend.main - INFO - 启动AutoClip API服务...
2025-09-15 11:57:17,827 - backend.main - INFO - 数据库表创建完成
2025-09-15 11:57:17,827 - backend.main - INFO - API密钥已加载到环境变量
2025-09-15 11:57:17,827 - backend.main - INFO - WebSocket网关服务已禁用使用新的简化进度系统
2025-09-15 11:57:52,976 - backend.main - INFO - 正在关闭AutoClip API服务...
2025-09-15 11:57:52,976 - backend.main - INFO - WebSocket网关服务已禁用
2025-09-15 11:57:53,599 - backend.services.processing_orchestrator - INFO - 流水线模块导入成功
2025-09-15 11:57:53,898 - backend.services.simple_progress - INFO - Redis连接成功
2025-09-15 11:57:53,987 - backend.main - INFO - 启动AutoClip API服务...
2025-09-15 11:57:53,988 - backend.main - INFO - 数据库表创建完成
2025-09-15 11:57:53,988 - backend.main - INFO - API密钥已加载到环境变量
2025-09-15 11:57:53,988 - backend.main - INFO - WebSocket网关服务已禁用使用新的简化进度系统
2025-09-15 11:58:28,734 - backend.main - INFO - 正在关闭AutoClip API服务...
2025-09-15 11:58:28,734 - backend.main - INFO - WebSocket网关服务已禁用
2025-09-15 11:58:29,266 - backend.services.processing_orchestrator - INFO - 流水线模块导入成功
2025-09-15 11:58:29,490 - backend.services.simple_progress - INFO - Redis连接成功
2025-09-15 11:58:29,577 - backend.main - INFO - 启动AutoClip API服务...
2025-09-15 11:58:29,578 - backend.main - INFO - 数据库表创建完成
2025-09-15 11:58:29,578 - backend.main - INFO - API密钥已加载到环境变量
2025-09-15 11:58:29,578 - backend.main - INFO - WebSocket网关服务已禁用使用新的简化进度系统
2025-09-15 11:59:19,068 - backend.main - INFO - 正在关闭AutoClip API服务...
2025-09-15 11:59:19,068 - backend.main - INFO - WebSocket网关服务已禁用
2025-09-15 11:59:19,656 - backend.services.processing_orchestrator - INFO - 流水线模块导入成功
2025-09-15 11:59:19,898 - backend.services.simple_progress - INFO - Redis连接成功
2025-09-15 11:59:19,990 - backend.main - INFO - 启动AutoClip API服务...
2025-09-15 11:59:19,991 - backend.main - INFO - 数据库表创建完成
2025-09-15 11:59:19,991 - backend.main - INFO - API密钥已加载到环境变量
2025-09-15 11:59:19,991 - backend.main - INFO - WebSocket网关服务已禁用使用新的简化进度系统
2025-09-15 11:59:25,948 - backend.main - INFO - 正在关闭AutoClip API服务...
2025-09-15 11:59:25,948 - backend.main - INFO - WebSocket网关服务已禁用
2025-09-15 11:59:26,489 - backend.services.processing_orchestrator - INFO - 流水线模块导入成功
2025-09-15 11:59:26,682 - backend.services.simple_progress - INFO - Redis连接成功
2025-09-15 11:59:26,748 - backend.main - INFO - 启动AutoClip API服务...
2025-09-15 11:59:26,749 - backend.main - INFO - 数据库表创建完成
2025-09-15 11:59:26,749 - backend.main - INFO - API密钥已加载到环境变量
2025-09-15 11:59:26,749 - backend.main - INFO - WebSocket网关服务已禁用使用新的简化进度系统
2025-09-15 11:59:39,072 - backend.main - INFO - 正在关闭AutoClip API服务...
2025-09-15 11:59:39,072 - backend.main - INFO - WebSocket网关服务已禁用
2025-09-15 11:59:39,527 - backend.services.processing_orchestrator - INFO - 流水线模块导入成功
2025-09-15 11:59:39,724 - backend.services.simple_progress - INFO - Redis连接成功
2025-09-15 11:59:39,792 - backend.main - INFO - 启动AutoClip API服务...
2025-09-15 11:59:39,793 - backend.main - INFO - 数据库表创建完成
2025-09-15 11:59:39,793 - backend.main - INFO - API密钥已加载到环境变量
2025-09-15 11:59:39,793 - backend.main - INFO - WebSocket网关服务已禁用使用新的简化进度系统
2025-09-15 12:02:05,445 - backend.main - INFO - 正在关闭AutoClip API服务...
2025-09-15 12:02:05,446 - backend.main - INFO - WebSocket网关服务已禁用
2025-09-15 12:02:05,843 - backend.services.processing_orchestrator - INFO - 流水线模块导入成功
2025-09-15 12:02:06,069 - backend.services.simple_progress - INFO - Redis连接成功
2025-09-15 12:02:06,155 - backend.main - INFO - 启动AutoClip API服务...
2025-09-15 12:02:06,156 - backend.main - INFO - 数据库表创建完成
2025-09-15 12:02:06,156 - backend.main - INFO - API密钥已加载到环境变量
2025-09-15 12:02:06,156 - backend.main - INFO - WebSocket网关服务已禁用使用新的简化进度系统
2025-09-15 12:03:20,315 - backend.main - INFO - 正在关闭AutoClip API服务...
2025-09-15 12:03:20,315 - backend.main - INFO - WebSocket网关服务已禁用
2025-09-15 12:03:20,715 - backend.services.processing_orchestrator - INFO - 流水线模块导入成功
2025-09-15 12:03:20,942 - backend.services.simple_progress - INFO - Redis连接成功
2025-09-15 12:03:21,024 - backend.main - INFO - 启动AutoClip API服务...
2025-09-15 12:03:21,025 - backend.main - INFO - 数据库表创建完成
2025-09-15 12:03:21,025 - backend.main - INFO - API密钥已加载到环境变量
2025-09-15 12:03:21,025 - backend.main - INFO - WebSocket网关服务已禁用使用新的简化进度系统
2025-09-15 12:05:42,055 - backend.main - INFO - 正在关闭AutoClip API服务...
2025-09-15 12:05:42,056 - backend.main - INFO - WebSocket网关服务已禁用
2025-09-15 12:05:43,006 - backend.services.processing_orchestrator - INFO - 流水线模块导入成功
2025-09-15 12:05:43,322 - backend.services.simple_progress - INFO - Redis连接成功
2025-09-15 12:05:43,432 - backend.main - INFO - 启动AutoClip API服务...
2025-09-15 12:05:43,433 - backend.main - INFO - 数据库表创建完成
2025-09-15 12:05:43,433 - backend.main - INFO - API密钥已加载到环境变量
2025-09-15 12:05:43,433 - backend.main - INFO - WebSocket网关服务已禁用使用新的简化进度系统

View File

@@ -0,0 +1,283 @@
"""
统一错误处理中间件
为FastAPI应用提供统一的错误处理机制
"""
import logging
import traceback
import time
import asyncio
import functools
from typing import Union
from contextlib import contextmanager
from fastapi import Request, HTTPException
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from starlette.exceptions import HTTPException as StarletteHTTPException
from ..utils.error_handler import AutoClipsException, ErrorCategory, ErrorLevel
from ..services.exceptions import ServiceError
logger = logging.getLogger(__name__)
class ErrorResponse:
"""统一错误响应格式"""
def __init__(self,
error_code: str,
message: str,
details: dict = None,
request_id: str = None):
self.error_code = error_code
self.message = message
self.details = details or {}
self.request_id = request_id
self.timestamp = None
def to_dict(self) -> dict:
return {
"error": {
"code": self.error_code,
"message": self.message,
"details": self.details,
"request_id": self.request_id,
"timestamp": self.timestamp
}
}
def create_error_response(
status_code: int,
error_code: str,
message: str,
details: dict = None,
request_id: str = None
) -> JSONResponse:
"""创建统一格式的错误响应"""
response = ErrorResponse(error_code, message, details, request_id)
response.timestamp = time.time()
return JSONResponse(
status_code=status_code,
content=response.to_dict()
)
async def global_exception_handler(request: Request, exc: Exception) -> JSONResponse:
"""全局异常处理器"""
request_id = getattr(request.state, 'request_id', None)
# 记录异常详情
logger.error(
f"未处理的异常: {type(exc).__name__}: {str(exc)}",
extra={
"request_id": request_id,
"path": request.url.path,
"method": request.method,
"traceback": traceback.format_exc()
}
)
# 根据异常类型返回不同的错误响应
if isinstance(exc, AutoClipsException):
return handle_autoclips_exception(exc, request_id)
elif isinstance(exc, ServiceError):
return handle_service_error(exc, request_id)
elif isinstance(exc, HTTPException):
return handle_http_exception(exc, request_id)
elif isinstance(exc, RequestValidationError):
return handle_validation_error(exc, request_id)
elif isinstance(exc, StarletteHTTPException):
return handle_starlette_http_exception(exc, request_id)
else:
return handle_generic_exception(exc, request_id)
def handle_autoclips_exception(exc: AutoClipsException, request_id: str = None) -> JSONResponse:
"""处理AutoClipsException"""
status_code = get_status_code_for_category(exc.category)
return create_error_response(
status_code=status_code,
error_code=f"AUTOCLIPS_{exc.category.value}",
message=exc.message,
details=exc.details,
request_id=request_id
)
def handle_service_error(exc: ServiceError, request_id: str = None) -> JSONResponse:
"""处理ServiceError"""
status_code = get_status_code_for_service_error(exc.error_code)
return create_error_response(
status_code=status_code,
error_code=exc.error_code.value,
message=exc.message,
details=exc.details,
request_id=request_id
)
def handle_http_exception(exc: HTTPException, request_id: str = None) -> JSONResponse:
"""处理HTTPException"""
return create_error_response(
status_code=exc.status_code,
error_code=f"HTTP_{exc.status_code}",
message=exc.detail,
request_id=request_id
)
def handle_validation_error(exc: RequestValidationError, request_id: str = None) -> JSONResponse:
"""处理请求验证错误"""
errors = []
for error in exc.errors():
errors.append({
"field": ".".join(str(loc) for loc in error["loc"]),
"message": error["msg"],
"type": error["type"]
})
return create_error_response(
status_code=422,
error_code="VALIDATION_ERROR",
message="请求参数验证失败",
details={"errors": errors},
request_id=request_id
)
def handle_starlette_http_exception(exc: StarletteHTTPException, request_id: str = None) -> JSONResponse:
"""处理StarletteHTTPException"""
return create_error_response(
status_code=exc.status_code,
error_code=f"STARLETTE_{exc.status_code}",
message=str(exc.detail),
request_id=request_id
)
def handle_generic_exception(exc: Exception, request_id: str = None) -> JSONResponse:
"""处理通用异常"""
return create_error_response(
status_code=500,
error_code="INTERNAL_SERVER_ERROR",
message="服务器内部错误",
details={"exception_type": type(exc).__name__},
request_id=request_id
)
def get_status_code_for_category(category: ErrorCategory) -> int:
"""根据错误分类获取HTTP状态码"""
status_mapping = {
ErrorCategory.CONFIGURATION: 500,
ErrorCategory.NETWORK: 503,
ErrorCategory.API: 502,
ErrorCategory.FILE_IO: 500,
ErrorCategory.PROCESSING: 500,
ErrorCategory.VALIDATION: 400,
ErrorCategory.SYSTEM: 500
}
return status_mapping.get(category, 500)
def get_status_code_for_service_error(error_code) -> int:
"""根据服务错误代码获取HTTP状态码"""
status_mapping = {
"CONFIG_NOT_FOUND": 500,
"CONFIG_INVALID": 500,
"CONFIG_MISSING_REQUIRED": 500,
"FILE_NOT_FOUND": 404,
"FILE_PERMISSION_DENIED": 403,
"FILE_CORRUPTED": 500,
"PROCESSING_FAILED": 500,
"STEP_EXECUTION_FAILED": 500,
"PIPELINE_VALIDATION_FAILED": 400,
"TASK_NOT_FOUND": 404,
"TASK_ALREADY_RUNNING": 409,
"TASK_CANCELLED": 410,
"PROJECT_NOT_FOUND": 404,
"PROJECT_ALREADY_EXISTS": 409,
"SYSTEM_ERROR": 500,
"NETWORK_ERROR": 503,
"TIMEOUT_ERROR": 504,
"CONCURRENT_ACCESS": 409,
"LOCK_ACQUISITION_FAILED": 423,
"UNKNOWN_ERROR": 500
}
return status_mapping.get(error_code.value, 500)
# 装饰器:自动错误处理
def handle_errors(error_category: ErrorCategory = ErrorCategory.SYSTEM):
"""错误处理装饰器"""
def decorator(func):
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except AutoClipsException:
# 重新抛出AutoClipsException
raise
except ServiceError:
# 重新抛出ServiceError
raise
except Exception as e:
# 转换为AutoClipsException
raise AutoClipsException(
message=str(e),
category=error_category,
original_exception=e
)
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except AutoClipsException:
# 重新抛出AutoClipsException
raise
except ServiceError:
# 重新抛出ServiceError
raise
except Exception as e:
# 转换为AutoClipsException
raise AutoClipsException(
message=str(e),
category=error_category,
original_exception=e
)
# 根据函数类型返回对应的包装器
if asyncio.iscoroutinefunction(func):
return async_wrapper
else:
return sync_wrapper
return decorator
# 上下文管理器:错误上下文
@contextmanager
def error_context(category: ErrorCategory, context_info: dict = None):
"""错误上下文管理器"""
try:
yield
except Exception as e:
if isinstance(e, AutoClipsException):
# 已经是自定义异常,直接抛出
raise
else:
# 转换为自定义异常
details = context_info or {}
details["original_exception_type"] = type(e).__name__
raise AutoClipsException(
message=str(e),
category=category,
details=details,
original_exception=e
)

View File

@@ -0,0 +1,419 @@
"""
统一配置管理系统
整合所有配置源,提供统一的配置访问接口
"""
import json
import os
import logging
from pathlib import Path
from typing import Dict, Any, Optional, Union
from pydantic import BaseModel, Field, validator
from pydantic_settings import BaseSettings, SettingsConfigDict
logger = logging.getLogger(__name__)
class DatabaseConfig(BaseModel):
"""数据库配置"""
url: str = Field(default="sqlite:///./data/autoclip.db", description="数据库连接URL")
echo: bool = Field(default=False, description="是否打印SQL语句")
pool_size: int = Field(default=5, description="连接池大小")
max_overflow: int = Field(default=10, description="最大溢出连接数")
class RedisConfig(BaseModel):
"""Redis配置"""
url: str = Field(default="redis://localhost:6379/0", description="Redis连接URL")
max_connections: int = Field(default=10, description="最大连接数")
socket_timeout: int = Field(default=5, description="Socket超时时间")
class APIConfig(BaseModel):
"""API配置"""
dashscope_api_key: str = Field(default="", description="DashScope API密钥")
model_name: str = Field(default="qwen-plus", description="模型名称")
max_tokens: int = Field(default=4096, description="最大token数")
timeout: int = Field(default=30, description="API超时时间")
max_retries: int = Field(default=3, description="最大重试次数")
@validator('max_tokens')
def validate_max_tokens(cls, v):
if v <= 0:
raise ValueError('max_tokens必须大于0')
return v
@validator('timeout')
def validate_timeout(cls, v):
if v <= 0:
raise ValueError('timeout必须大于0')
return v
class ProcessingConfig(BaseModel):
"""处理配置"""
chunk_size: int = Field(default=5000, description="文本分块大小")
min_score_threshold: float = Field(default=0.7, description="最小评分阈值")
max_clips_per_collection: int = Field(default=5, description="每个合集最大切片数")
max_retries: int = Field(default=3, description="最大重试次数")
timeout_seconds: int = Field(default=30, description="处理超时时间")
# 话题提取控制参数
min_topic_duration_minutes: int = Field(default=2, description="最小话题时长(分钟)")
max_topic_duration_minutes: int = Field(default=12, description="最大话题时长(分钟)")
target_topic_duration_minutes: int = Field(default=5, description="目标话题时长(分钟)")
min_topics_per_chunk: int = Field(default=3, description="每个分块最小话题数")
max_topics_per_chunk: int = Field(default=8, description="每个分块最大话题数")
@validator('min_score_threshold')
def validate_score_threshold(cls, v):
if not 0 <= v <= 1:
raise ValueError('评分阈值必须在0-1之间')
return v
@validator('chunk_size')
def validate_chunk_size(cls, v):
if v <= 0:
raise ValueError('分块大小必须大于0')
return v
class SpeechRecognitionConfig(BaseModel):
"""语音识别配置"""
method: str = Field(default="whisper_local", description="识别方法")
language: str = Field(default="auto", description="识别语言")
model: str = Field(default="base", description="模型大小")
timeout: int = Field(default=1000, description="识别超时时间")
class BilibiliConfig(BaseModel):
"""B站配置"""
auto_upload: bool = Field(default=False, description="是否自动上传")
default_tid: int = Field(default=21, description="默认分区ID")
max_concurrent_uploads: int = Field(default=3, description="最大并发上传数")
upload_timeout_minutes: int = Field(default=30, description="上传超时时间(分钟)")
auto_generate_tags: bool = Field(default=True, description="是否自动生成标签")
tag_limit: int = Field(default=12, description="标签数量限制")
class LoggingConfig(BaseModel):
"""日志配置"""
level: str = Field(default="INFO", description="日志级别")
format: str = Field(default="%(asctime)s - %(name)s - %(levelname)s - %(message)s", description="日志格式")
file: str = Field(default="backend.log", description="日志文件")
max_size: int = Field(default=10 * 1024 * 1024, description="日志文件最大大小(字节)")
backup_count: int = Field(default=5, description="日志文件备份数量")
class PathConfig(BaseModel):
"""路径配置"""
project_root: Path = Field(default_factory=lambda: Path(__file__).parent.parent.parent)
data_dir: Path = Field(default_factory=lambda: Path(__file__).parent.parent.parent / "data")
uploads_dir: Path = Field(default_factory=lambda: Path(__file__).parent.parent.parent / "data" / "uploads")
output_dir: Path = Field(default_factory=lambda: Path(__file__).parent.parent.parent / "data" / "output")
temp_dir: Path = Field(default_factory=lambda: Path(__file__).parent.parent.parent / "data" / "temp")
prompt_dir: Path = Field(default_factory=lambda: Path(__file__).parent.parent.parent / "prompt")
def __init__(self, **data):
super().__init__(**data)
# 确保所有目录存在
for field_name, field_value in self.__dict__.items():
if isinstance(field_value, Path):
field_value.mkdir(parents=True, exist_ok=True)
class UnifiedConfig(BaseSettings):
"""统一配置类"""
model_config = SettingsConfigDict(
env_file='.env',
env_file_encoding='utf-8',
extra='ignore',
env_nested_delimiter='__'
)
# 环境配置
environment: str = Field(default="development", description="运行环境")
debug: bool = Field(default=True, description="调试模式")
encryption_key: str = Field(default="", description="加密密钥")
# 子配置
database: DatabaseConfig = Field(default_factory=DatabaseConfig)
redis: RedisConfig = Field(default_factory=RedisConfig)
api: APIConfig = Field(default_factory=APIConfig)
processing: ProcessingConfig = Field(default_factory=ProcessingConfig)
speech_recognition: SpeechRecognitionConfig = Field(default_factory=SpeechRecognitionConfig)
bilibili: BilibiliConfig = Field(default_factory=BilibiliConfig)
logging: LoggingConfig = Field(default_factory=LoggingConfig)
paths: PathConfig = Field(default_factory=PathConfig)
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._load_from_files()
self._setup_environment()
def _load_from_files(self):
"""从配置文件加载设置"""
# 从data/settings.json加载
settings_file = self.paths.data_dir / "settings.json"
if settings_file.exists():
try:
with open(settings_file, 'r', encoding='utf-8') as f:
file_settings = json.load(f)
self._merge_settings(file_settings)
except Exception as e:
logger.warning(f"加载配置文件失败: {e}")
# 从环境变量加载
self._load_from_env()
def _merge_settings(self, settings: Dict[str, Any]):
"""合并设置到配置对象"""
for key, value in settings.items():
if hasattr(self, key):
if isinstance(getattr(self, key), BaseModel):
# 如果是子配置对象,递归合并
sub_config = getattr(self, key)
if isinstance(value, dict):
for sub_key, sub_value in value.items():
if hasattr(sub_config, sub_key):
setattr(sub_config, sub_key, sub_value)
else:
setattr(self, key, value)
def _load_from_env(self):
"""从环境变量加载配置"""
# 数据库配置
if os.getenv("DATABASE_URL"):
self.database.url = os.getenv("DATABASE_URL")
# Redis配置
if os.getenv("REDIS_URL"):
self.redis.url = os.getenv("REDIS_URL")
# API配置
if os.getenv("DASHSCOPE_API_KEY"):
self.api.dashscope_api_key = os.getenv("DASHSCOPE_API_KEY")
if os.getenv("API_MODEL_NAME"):
self.api.model_name = os.getenv("API_MODEL_NAME")
# 处理配置
if os.getenv("PROCESSING_CHUNK_SIZE"):
self.processing.chunk_size = int(os.getenv("PROCESSING_CHUNK_SIZE"))
if os.getenv("PROCESSING_MIN_SCORE_THRESHOLD"):
self.processing.min_score_threshold = float(os.getenv("PROCESSING_MIN_SCORE_THRESHOLD"))
# 日志配置
if os.getenv("LOG_LEVEL"):
self.logging.level = os.getenv("LOG_LEVEL")
if os.getenv("LOG_FILE"):
self.logging.file = os.getenv("LOG_FILE")
def _setup_environment(self):
"""设置环境变量"""
# 设置API密钥到环境变量
if self.api.dashscope_api_key:
os.environ["DASHSCOPE_API_KEY"] = self.api.dashscope_api_key
# 设置数据库URL
os.environ["DATABASE_URL"] = self.database.url
# 设置Redis URL
os.environ["REDIS_URL"] = self.redis.url
def save_to_file(self, file_path: Optional[Path] = None):
"""保存配置到文件"""
if file_path is None:
file_path = self.paths.data_dir / "settings.json"
try:
# 创建配置字典,排除敏感信息
config_dict = self._to_safe_dict()
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(config_dict, f, ensure_ascii=False, indent=2)
logger.info(f"配置已保存到: {file_path}")
except Exception as e:
logger.error(f"保存配置文件失败: {e}")
raise
def _to_safe_dict(self) -> Dict[str, Any]:
"""转换为安全的字典格式(隐藏敏感信息)"""
config_dict = {}
for key, value in self.__dict__.items():
if key.startswith('_'):
continue
if isinstance(value, BaseModel):
config_dict[key] = value.dict()
else:
config_dict[key] = value
# 隐藏敏感信息
if 'api' in config_dict and 'dashscope_api_key' in config_dict['api']:
api_key = config_dict['api']['dashscope_api_key']
if api_key:
config_dict['api']['dashscope_api_key'] = api_key[:8] + "..." if len(api_key) > 8 else "***"
return config_dict
def update_config(self, **kwargs):
"""更新配置"""
for key, value in kwargs.items():
if hasattr(self, key):
if isinstance(getattr(self, key), BaseModel):
# 如果是子配置对象,递归更新
sub_config = getattr(self, key)
if isinstance(value, dict):
for sub_key, sub_value in value.items():
if hasattr(sub_config, sub_key):
setattr(sub_config, sub_key, sub_value)
else:
setattr(self, key, value)
# 重新设置环境变量
self._setup_environment()
# 保存到文件
self.save_to_file()
def get_config_summary(self) -> Dict[str, Any]:
"""获取配置摘要"""
return {
"environment": self.environment,
"debug": self.debug,
"database": {
"url": self.database.url,
"echo": self.database.echo
},
"redis": {
"url": self.redis.url
},
"api": {
"model_name": self.api.model_name,
"max_tokens": self.api.max_tokens,
"timeout": self.api.timeout,
"has_api_key": bool(self.api.dashscope_api_key)
},
"processing": {
"chunk_size": self.processing.chunk_size,
"min_score_threshold": self.processing.min_score_threshold,
"max_clips_per_collection": self.processing.max_clips_per_collection
},
"speech_recognition": {
"method": self.speech_recognition.method,
"language": self.speech_recognition.language,
"model": self.speech_recognition.model
},
"bilibili": {
"auto_upload": self.bilibili.auto_upload,
"default_tid": self.bilibili.default_tid,
"max_concurrent_uploads": self.bilibili.max_concurrent_uploads
},
"logging": {
"level": self.logging.level,
"file": self.logging.file
},
"paths": {
"data_dir": str(self.paths.data_dir),
"uploads_dir": str(self.paths.uploads_dir),
"output_dir": str(self.paths.output_dir),
"temp_dir": str(self.paths.temp_dir)
}
}
def validate_config(self) -> Dict[str, Any]:
"""验证配置"""
issues = []
# 验证API配置
if not self.api.dashscope_api_key:
issues.append("DashScope API密钥未配置")
# 验证路径
for path_name, path_value in self.paths.__dict__.items():
if isinstance(path_value, Path) and not path_value.exists():
issues.append(f"路径不存在: {path_name} = {path_value}")
# 验证数据库连接
if not self.database.url:
issues.append("数据库URL未配置")
# 验证Redis连接
if not self.redis.url:
issues.append("Redis URL未配置")
return {
"valid": len(issues) == 0,
"issues": issues
}
# 全局配置实例
config = UnifiedConfig()
# 便捷函数
def get_config() -> UnifiedConfig:
"""获取全局配置实例"""
return config
def get_database_url() -> str:
"""获取数据库URL"""
return config.database.url
def get_redis_url() -> str:
"""获取Redis URL"""
return config.redis.url
def get_api_key() -> str:
"""获取API密钥"""
return config.api.dashscope_api_key
def get_data_directory() -> Path:
"""获取数据目录"""
return config.paths.data_dir
def get_uploads_directory() -> Path:
"""获取上传目录"""
return config.paths.uploads_dir
def get_output_directory() -> Path:
"""获取输出目录"""
return config.paths.output_dir
def get_temp_directory() -> Path:
"""获取临时目录"""
return config.paths.temp_dir
def get_prompt_directory() -> Path:
"""获取提示词目录"""
return config.paths.prompt_dir
def update_api_key(api_key: str):
"""更新API密钥"""
config.api.dashscope_api_key = api_key
config._setup_environment()
config.save_to_file()
def update_processing_config(**kwargs):
"""更新处理配置"""
config.update_config(processing=kwargs)
def update_bilibili_config(**kwargs):
"""更新B站配置"""
config.update_config(bilibili=kwargs)

View File

@@ -139,14 +139,11 @@ async def get_video_categories():
]
}
# 全局异常处理
@app.exception_handler(Exception)
async def global_exception_handler(request, exc):
logger.error(f"全局异常: {exc}", exc_info=True)
return JSONResponse(
status_code=500,
content={"detail": "内部服务器错误"}
)
# 导入统一错误处理中间件
from .core.error_middleware import global_exception_handler
# 注册全局异常处理器
app.add_exception_handler(Exception, global_exception_handler)
if __name__ == "__main__":
import uvicorn

View File

@@ -417,12 +417,66 @@ class BilibiliUploadService:
return record
async def upload_clip(self, record_id: int, video_path: str, max_retries: int = 3) -> bool:
"""上传单个切片 - 使用新的v6.0实现"""
from .bilibili_upload_v6 import BilibiliUploadServiceV6
# 使用新的上传服务
upload_service_v6 = BilibiliUploadServiceV6(self.db)
return await upload_service_v6.upload_clip(record_id, video_path, max_retries)
"""上传单个切片 - 使用内置上传实现"""
try:
# 获取投稿记录
record = self.db.query(UploadRecord).filter(UploadRecord.id == record_id).first()
if not record:
logger.error(f"投稿记录不存在: {record_id}")
return False
# 获取账号信息
account = self.db.query(BilibiliAccount).filter(BilibiliAccount.id == record.account_id).first()
if not account:
logger.error(f"账号不存在: {record.account_id}")
return False
# 解密Cookie
cookies = decrypt_data(account.cookies)
if not cookies:
logger.error("Cookie解密失败")
return False
# 使用直接上传器
uploader = BilibiliDirectUploader(cookies)
success = await uploader.upload_video(
video_path=video_path,
metadata={
'title': record.title,
'desc': record.description or '',
'tid': record.tid,
'tag': record.tags or '',
'source': record.source or '',
'copyright': record.copyright or 1
},
max_retries=max_retries
)
if success:
record.status = 'completed'
record.bv_id = uploader.bv_id
record.completed_at = datetime.utcnow()
else:
record.status = 'failed'
record.error_message = uploader.error_message
record.failed_at = datetime.utcnow()
self.db.commit()
return success
except Exception as e:
logger.error(f"上传切片失败: {e}")
# 更新记录状态
try:
record = self.db.query(UploadRecord).filter(UploadRecord.id == record_id).first()
if record:
record.status = 'failed'
record.error_message = str(e)
record.failed_at = datetime.utcnow()
self.db.commit()
except:
pass
return False
def update_upload_status(self, record_id, status: str, error_message: str = None) -> bool:
"""更新投稿状态"""
@@ -582,11 +636,65 @@ class BilibiliUploadService:
def upload_clip_sync(self, record_id: int, video_path: str, max_retries: int = 3) -> bool:
"""同步版本的上传单个切片"""
from .bilibili_upload_v2 import BilibiliUploadServiceV2
# 使用新的上传服务
upload_service_v2 = BilibiliUploadServiceV2(self.db)
return upload_service_v2.upload_clip_sync(record_id, video_path, max_retries)
try:
# 获取投稿记录
record = self.db.query(UploadRecord).filter(UploadRecord.id == record_id).first()
if not record:
logger.error(f"投稿记录不存在: {record_id}")
return False
# 获取账号信息
account = self.db.query(BilibiliAccount).filter(BilibiliAccount.id == record.account_id).first()
if not account:
logger.error(f"账号不存在: {record.account_id}")
return False
# 解密Cookie
cookies = decrypt_data(account.cookies)
if not cookies:
logger.error("Cookie解密失败")
return False
# 使用直接上传器(同步版本)
uploader = BilibiliDirectUploader(cookies)
success = uploader.upload_video_sync(
video_path=video_path,
metadata={
'title': record.title,
'desc': record.description or '',
'tid': record.tid,
'tag': record.tags or '',
'source': record.source or '',
'copyright': record.copyright or 1
},
max_retries=max_retries
)
if success:
record.status = 'completed'
record.bv_id = uploader.bv_id
record.completed_at = datetime.utcnow()
else:
record.status = 'failed'
record.error_message = uploader.error_message
record.failed_at = datetime.utcnow()
self.db.commit()
return success
except Exception as e:
logger.error(f"上传切片失败: {e}")
# 更新记录状态
try:
record = self.db.query(UploadRecord).filter(UploadRecord.id == record_id).first()
if record:
record.status = 'failed'
record.error_message = str(e)
record.failed_at = datetime.utcnow()
self.db.commit()
except:
pass
return False
class BilibiliDirectUploader:
@@ -611,6 +719,19 @@ class BilibiliDirectUploader:
logger.error(f"上传视频失败: {e}")
return False
def upload_video_sync(self, video_path: str, metadata: dict, max_retries: int = 3) -> bool:
"""同步版本的上传视频"""
try:
# 暂时返回失败,因为需要重新实现上传逻辑
self.error_message = "上传功能正在开发中,请稍后再试"
logger.warning("上传功能暂未实现,返回失败状态")
return False
except Exception as e:
self.error_message = str(e)
logger.error(f"上传视频失败: {e}")
return False
async def _pre_upload(self, video_path: str) -> Optional[str]:
"""预上传获取upload_id"""
try:

View File

@@ -0,0 +1,598 @@
"""
增强进度服务
整合现有进度系统,提供更好的错误处理和状态管理
"""
import time
import json
import logging
import asyncio
from typing import Dict, Any, Optional, List, Callable
from datetime import datetime, timedelta
from dataclasses import dataclass, asdict
from enum import Enum
import redis
from sqlalchemy.orm import Session
from ..core.database import SessionLocal
from ..models.project import Project, ProjectStatus
from ..models.task import Task, TaskStatus
from ..utils.error_handler import AutoClipsException, ErrorCategory
logger = logging.getLogger(__name__)
class ProgressStage(Enum):
"""进度阶段枚举"""
INGEST = "INGEST" # 下载/就绪
SUBTITLE = "SUBTITLE" # 字幕/对齐
ANALYZE = "ANALYZE" # 语义分析/大纲
HIGHLIGHT = "HIGHLIGHT" # 片段定位/打分
EXPORT = "EXPORT" # 导出/封装
DONE = "DONE" # 校验/归档
ERROR = "ERROR" # 错误状态
class ProgressStatus(Enum):
"""进度状态枚举"""
PENDING = "PENDING" # 等待中
RUNNING = "RUNNING" # 运行中
COMPLETED = "COMPLETED" # 已完成
FAILED = "FAILED" # 失败
CANCELLED = "CANCELLED" # 已取消
@dataclass
class ProgressInfo:
"""进度信息数据结构"""
project_id: str
task_id: Optional[str] = None
stage: ProgressStage = ProgressStage.INGEST
status: ProgressStatus = ProgressStatus.PENDING
progress: int = 0 # 0-100
message: str = ""
error_message: Optional[str] = None
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
estimated_remaining: Optional[int] = None # 预估剩余时间(秒)
metadata: Optional[Dict[str, Any]] = None
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
data = asdict(self)
# 转换枚举为字符串
data['stage'] = self.stage.value
data['status'] = self.status.value
# 转换时间戳
if self.start_time:
data['start_time'] = self.start_time.isoformat()
if self.end_time:
data['end_time'] = self.end_time.isoformat()
return data
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'ProgressInfo':
"""从字典创建实例"""
# 转换字符串为枚举
if 'stage' in data and isinstance(data['stage'], str):
data['stage'] = ProgressStage(data['stage'])
if 'status' in data and isinstance(data['status'], str):
data['status'] = ProgressStatus(data['status'])
# 转换时间戳
if 'start_time' in data and isinstance(data['start_time'], str):
data['start_time'] = datetime.fromisoformat(data['start_time'])
if 'end_time' in data and isinstance(data['end_time'], str):
data['end_time'] = datetime.fromisoformat(data['end_time'])
return cls(**data)
class EnhancedProgressService:
"""增强进度服务"""
# 阶段权重定义
STAGE_WEIGHTS = {
ProgressStage.INGEST: 10,
ProgressStage.SUBTITLE: 15,
ProgressStage.ANALYZE: 20,
ProgressStage.HIGHLIGHT: 25,
ProgressStage.EXPORT: 20,
ProgressStage.DONE: 10,
}
# 阶段顺序
STAGE_ORDER = [
ProgressStage.INGEST,
ProgressStage.SUBTITLE,
ProgressStage.ANALYZE,
ProgressStage.HIGHLIGHT,
ProgressStage.EXPORT,
ProgressStage.DONE,
]
def __init__(self):
self.redis_client = None
self._init_redis()
self.progress_cache: Dict[str, ProgressInfo] = {}
self.progress_callbacks: List[Callable[[ProgressInfo], None]] = []
def _init_redis(self):
"""初始化Redis连接"""
try:
self.redis_client = redis.Redis.from_url(
"redis://127.0.0.1:6379/0",
decode_responses=True,
socket_timeout=5,
socket_connect_timeout=5
)
# 测试连接
self.redis_client.ping()
logger.info("Redis连接成功")
except Exception as e:
logger.warning(f"Redis连接失败将使用内存缓存: {e}")
self.redis_client = None
def _get_redis_key(self, project_id: str) -> str:
"""获取Redis键名"""
return f"progress:{project_id}"
def _calculate_progress(self, stage: ProgressStage, sub_progress: float = 0.0) -> int:
"""计算总进度百分比"""
# 累加之前阶段的权重
total_weight = 0
current_stage_weight = 0
for s in self.STAGE_ORDER:
if s == stage:
current_stage_weight = self.STAGE_WEIGHTS.get(s, 0)
break
total_weight += self.STAGE_WEIGHTS.get(s, 0)
# 计算当前阶段的进度
if stage == ProgressStage.DONE:
return 100
elif stage == ProgressStage.ERROR:
return total_weight # 错误时保持当前进度
# 添加当前阶段的子进度
current_progress = int(current_stage_weight * sub_progress / 100.0)
total_progress = total_weight + current_progress
return min(99, total_progress)
def _estimate_remaining_time(self, progress_info: ProgressInfo) -> Optional[int]:
"""预估剩余时间"""
if not progress_info.start_time or progress_info.progress <= 0:
return None
elapsed = (datetime.utcnow() - progress_info.start_time).total_seconds()
if elapsed <= 0:
return None
# 基于当前进度预估总时间
estimated_total = elapsed * 100 / progress_info.progress
remaining = estimated_total - elapsed
return max(0, int(remaining))
def start_progress(self, project_id: str, task_id: Optional[str] = None,
initial_message: str = "开始处理") -> ProgressInfo:
"""开始进度跟踪"""
try:
progress_info = ProgressInfo(
project_id=project_id,
task_id=task_id,
stage=ProgressStage.INGEST,
status=ProgressStatus.RUNNING,
progress=0,
message=initial_message,
start_time=datetime.utcnow()
)
# 保存到缓存
self.progress_cache[project_id] = progress_info
# 保存到Redis
if self.redis_client:
try:
self.redis_client.setex(
self._get_redis_key(project_id),
3600, # 1小时过期
json.dumps(progress_info.to_dict())
)
except Exception as e:
logger.warning(f"保存进度到Redis失败: {e}")
# 更新数据库
self._update_database_progress(progress_info)
# 触发回调
self._trigger_callbacks(progress_info)
logger.info(f"开始跟踪项目 {project_id} 的进度")
return progress_info
except Exception as e:
logger.error(f"开始进度跟踪失败: {e}")
raise AutoClipsException(
message="开始进度跟踪失败",
category=ErrorCategory.SYSTEM,
original_exception=e
)
def update_progress(self, project_id: str, stage: ProgressStage,
message: str = "", sub_progress: float = 0.0,
metadata: Optional[Dict[str, Any]] = None) -> ProgressInfo:
"""更新进度"""
try:
# 获取当前进度信息
progress_info = self.get_progress(project_id)
if not progress_info:
logger.warning(f"项目 {project_id} 的进度信息不存在,创建新的")
progress_info = self.start_progress(project_id, message=message)
# 更新进度信息
progress_info.stage = stage
progress_info.message = message
progress_info.progress = self._calculate_progress(stage, sub_progress)
progress_info.estimated_remaining = self._estimate_remaining_time(progress_info)
if metadata:
if progress_info.metadata:
progress_info.metadata.update(metadata)
else:
progress_info.metadata = metadata
# 保存到缓存
self.progress_cache[project_id] = progress_info
# 保存到Redis
if self.redis_client:
try:
self.redis_client.setex(
self._get_redis_key(project_id),
3600,
json.dumps(progress_info.to_dict())
)
except Exception as e:
logger.warning(f"更新Redis进度失败: {e}")
# 更新数据库
self._update_database_progress(progress_info)
# 触发回调
self._trigger_callbacks(progress_info)
logger.info(f"项目 {project_id} 进度更新: {progress_info.progress}% - {stage.value}")
return progress_info
except Exception as e:
logger.error(f"更新进度失败: {e}")
raise AutoClipsException(
message="更新进度失败",
category=ErrorCategory.SYSTEM,
original_exception=e
)
def complete_progress(self, project_id: str, message: str = "处理完成") -> ProgressInfo:
"""完成进度"""
try:
progress_info = self.get_progress(project_id)
if not progress_info:
logger.warning(f"项目 {project_id} 的进度信息不存在")
return None
# 更新为完成状态
progress_info.stage = ProgressStage.DONE
progress_info.status = ProgressStatus.COMPLETED
progress_info.progress = 100
progress_info.message = message
progress_info.end_time = datetime.utcnow()
progress_info.estimated_remaining = 0
# 保存到缓存
self.progress_cache[project_id] = progress_info
# 保存到Redis
if self.redis_client:
try:
self.redis_client.setex(
self._get_redis_key(project_id),
3600,
json.dumps(progress_info.to_dict())
)
except Exception as e:
logger.warning(f"保存完成状态到Redis失败: {e}")
# 更新数据库
self._update_database_progress(progress_info)
# 触发回调
self._trigger_callbacks(progress_info)
logger.info(f"项目 {project_id} 处理完成")
return progress_info
except Exception as e:
logger.error(f"完成进度失败: {e}")
raise AutoClipsException(
message="完成进度失败",
category=ErrorCategory.SYSTEM,
original_exception=e
)
def fail_progress(self, project_id: str, error_message: str) -> ProgressInfo:
"""标记进度为失败"""
try:
progress_info = self.get_progress(project_id)
if not progress_info:
logger.warning(f"项目 {project_id} 的进度信息不存在")
return None
# 更新为失败状态
progress_info.stage = ProgressStage.ERROR
progress_info.status = ProgressStatus.FAILED
progress_info.error_message = error_message
progress_info.end_time = datetime.utcnow()
progress_info.estimated_remaining = 0
# 保存到缓存
self.progress_cache[project_id] = progress_info
# 保存到Redis
if self.redis_client:
try:
self.redis_client.setex(
self._get_redis_key(project_id),
3600,
json.dumps(progress_info.to_dict())
)
except Exception as e:
logger.warning(f"保存失败状态到Redis失败: {e}")
# 更新数据库
self._update_database_progress(progress_info)
# 触发回调
self._trigger_callbacks(progress_info)
logger.error(f"项目 {project_id} 处理失败: {error_message}")
return progress_info
except Exception as e:
logger.error(f"标记进度失败失败: {e}")
raise AutoClipsException(
message="标记进度失败失败",
category=ErrorCategory.SYSTEM,
original_exception=e
)
def get_progress(self, project_id: str) -> Optional[ProgressInfo]:
"""获取进度信息"""
try:
# 先从缓存获取
if project_id in self.progress_cache:
return self.progress_cache[project_id]
# 从Redis获取
if self.redis_client:
try:
redis_data = self.redis_client.get(self._get_redis_key(project_id))
if redis_data:
data = json.loads(redis_data)
progress_info = ProgressInfo.from_dict(data)
self.progress_cache[project_id] = progress_info
return progress_info
except Exception as e:
logger.warning(f"从Redis获取进度失败: {e}")
# 从数据库获取
db = SessionLocal()
try:
project = db.query(Project).filter(Project.id == project_id).first()
if project:
# 根据项目状态创建进度信息
stage = self._map_project_status_to_stage(project.status)
status = self._map_project_status_to_progress_status(project.status)
progress_info = ProgressInfo(
project_id=project_id,
stage=stage,
status=status,
progress=self._calculate_progress(stage),
message=f"项目状态: {project.status}",
start_time=project.created_at,
end_time=project.updated_at if status == ProgressStatus.COMPLETED else None
)
self.progress_cache[project_id] = progress_info
return progress_info
finally:
db.close()
return None
except Exception as e:
logger.error(f"获取进度信息失败: {e}")
return None
def _map_project_status_to_stage(self, project_status: str) -> ProgressStage:
"""将项目状态映射到进度阶段"""
status_mapping = {
ProjectStatus.PENDING: ProgressStage.INGEST,
ProjectStatus.PROCESSING: ProgressStage.ANALYZE,
ProjectStatus.COMPLETED: ProgressStage.DONE,
ProjectStatus.FAILED: ProgressStage.ERROR,
}
return status_mapping.get(project_status, ProgressStage.INGEST)
def _map_project_status_to_progress_status(self, project_status: str) -> ProgressStatus:
"""将项目状态映射到进度状态"""
status_mapping = {
ProjectStatus.PENDING: ProgressStatus.PENDING,
ProjectStatus.PROCESSING: ProgressStatus.RUNNING,
ProjectStatus.COMPLETED: ProgressStatus.COMPLETED,
ProjectStatus.FAILED: ProgressStatus.FAILED,
}
return status_mapping.get(project_status, ProgressStatus.PENDING)
def _update_database_progress(self, progress_info: ProgressInfo):
"""更新数据库中的进度信息"""
try:
db = SessionLocal()
try:
# 更新项目状态
project = db.query(Project).filter(Project.id == progress_info.project_id).first()
if project:
if progress_info.status == ProgressStatus.COMPLETED:
project.status = ProjectStatus.COMPLETED
elif progress_info.status == ProgressStatus.FAILED:
project.status = ProjectStatus.FAILED
elif progress_info.status == ProgressStatus.RUNNING:
project.status = ProjectStatus.PROCESSING
project.updated_at = datetime.utcnow()
db.commit()
# 更新任务状态
if progress_info.task_id:
task = db.query(Task).filter(Task.id == progress_info.task_id).first()
if task:
task.progress = progress_info.progress
task.current_step = progress_info.stage.value
task.updated_at = datetime.utcnow()
if progress_info.status == ProgressStatus.COMPLETED:
task.status = TaskStatus.COMPLETED
elif progress_info.status == ProgressStatus.FAILED:
task.status = TaskStatus.FAILED
task.error_message = progress_info.error_message
db.commit()
finally:
db.close()
except Exception as e:
logger.error(f"更新数据库进度失败: {e}")
def add_progress_callback(self, callback: Callable[[ProgressInfo], None]):
"""添加进度回调函数"""
self.progress_callbacks.append(callback)
def remove_progress_callback(self, callback: Callable[[ProgressInfo], None]):
"""移除进度回调函数"""
if callback in self.progress_callbacks:
self.progress_callbacks.remove(callback)
def _trigger_callbacks(self, progress_info: ProgressInfo):
"""触发进度回调"""
for callback in self.progress_callbacks:
try:
callback(progress_info)
except Exception as e:
logger.error(f"进度回调执行失败: {e}")
def cleanup_old_progress(self, max_age_hours: int = 24):
"""清理旧的进度信息"""
try:
cutoff_time = datetime.utcnow() - timedelta(hours=max_age_hours)
cleaned_count = 0
# 清理缓存
for project_id, progress_info in list(self.progress_cache.items()):
if progress_info.end_time and progress_info.end_time < cutoff_time:
del self.progress_cache[project_id]
cleaned_count += 1
# 清理Redis
if self.redis_client:
try:
# 获取所有进度键
keys = self.redis_client.keys("progress:*")
for key in keys:
try:
data = self.redis_client.get(key)
if data:
progress_data = json.loads(data)
if 'end_time' in progress_data:
end_time = datetime.fromisoformat(progress_data['end_time'])
if end_time < cutoff_time:
self.redis_client.delete(key)
cleaned_count += 1
except Exception as e:
logger.warning(f"清理Redis键 {key} 失败: {e}")
except Exception as e:
logger.warning(f"清理Redis进度失败: {e}")
logger.info(f"清理了 {cleaned_count} 个旧进度记录")
except Exception as e:
logger.error(f"清理旧进度失败: {e}")
def get_all_active_progress(self) -> List[ProgressInfo]:
"""获取所有活跃的进度信息"""
try:
active_progress = []
# 从缓存获取
for progress_info in self.progress_cache.values():
if progress_info.status in [ProgressStatus.PENDING, ProgressStatus.RUNNING]:
active_progress.append(progress_info)
# 从Redis获取
if self.redis_client:
try:
keys = self.redis_client.keys("progress:*")
for key in keys:
try:
data = self.redis_client.get(key)
if data:
progress_data = json.loads(data)
progress_info = ProgressInfo.from_dict(progress_data)
if progress_info.status in [ProgressStatus.PENDING, ProgressStatus.RUNNING]:
# 避免重复
if not any(p.project_id == progress_info.project_id for p in active_progress):
active_progress.append(progress_info)
except Exception as e:
logger.warning(f"解析Redis进度数据失败: {e}")
except Exception as e:
logger.warning(f"获取Redis进度失败: {e}")
return active_progress
except Exception as e:
logger.error(f"获取活跃进度失败: {e}")
return []
# 全局进度服务实例
progress_service = EnhancedProgressService()
# 便捷函数
def start_progress(project_id: str, task_id: Optional[str] = None,
initial_message: str = "开始处理") -> ProgressInfo:
"""开始进度跟踪"""
return progress_service.start_progress(project_id, task_id, initial_message)
def update_progress(project_id: str, stage: ProgressStage,
message: str = "", sub_progress: float = 0.0,
metadata: Optional[Dict[str, Any]] = None) -> ProgressInfo:
"""更新进度"""
return progress_service.update_progress(project_id, stage, message, sub_progress, metadata)
def complete_progress(project_id: str, message: str = "处理完成") -> ProgressInfo:
"""完成进度"""
return progress_service.complete_progress(project_id, message)
def fail_progress(project_id: str, error_message: str) -> ProgressInfo:
"""标记进度为失败"""
return progress_service.fail_progress(project_id, error_message)
def get_progress(project_id: str) -> Optional[ProgressInfo]:
"""获取进度信息"""
return progress_service.get_progress(project_id)

View File

@@ -0,0 +1,329 @@
"""
优化存储服务 - 解决双重存储问题
数据库只存储元数据,文件系统存储实际文件
"""
import json
import logging
import shutil
from pathlib import Path
from typing import Dict, Any, Optional, List
from datetime import datetime
from sqlalchemy.orm import Session
from ..core.config import get_data_directory
from ..models.project import Project
from ..models.clip import Clip
from ..models.collection import Collection
logger = logging.getLogger(__name__)
class OptimizedStorageService:
"""优化存储服务 - 数据库存储元数据,文件系统存储实际文件"""
def __init__(self, db: Session, project_id: str):
self.db = db
self.project_id = project_id
self.data_dir = get_data_directory()
self.project_dir = self.data_dir / "projects" / project_id
# 确保项目目录结构存在
self._ensure_project_structure()
def _ensure_project_structure(self):
"""确保项目目录结构存在"""
directories = [
self.project_dir / "raw", # 原始文件
self.project_dir / "processing", # 处理中间文件
self.project_dir / "output" / "clips", # 切片文件
self.project_dir / "output" / "collections" # 合集文件
]
for directory in directories:
directory.mkdir(parents=True, exist_ok=True)
# ==================== 项目文件管理 ====================
def save_project_file(self, file_path: Path, file_type: str = "video") -> str:
"""保存项目文件到文件系统,返回相对路径"""
try:
if file_type == "video":
target_dir = self.project_dir / "raw"
target_name = f"input_video{file_path.suffix}"
elif file_type == "subtitle":
target_dir = self.project_dir / "raw"
target_name = f"input_subtitle{file_path.suffix}"
else:
target_dir = self.project_dir / "raw"
target_name = file_path.name
target_path = target_dir / target_name
shutil.copy2(file_path, target_path)
# 返回相对路径,用于存储在数据库中
relative_path = f"projects/{self.project_id}/raw/{target_name}"
logger.info(f"项目文件已保存: {relative_path}")
return relative_path
except Exception as e:
logger.error(f"保存项目文件失败: {e}")
raise
def get_project_file_path(self, relative_path: str) -> Path:
"""根据相对路径获取完整文件路径"""
return self.data_dir / relative_path
# ==================== 切片文件管理 ====================
def save_clip_file(self, clip_data: Dict[str, Any], clip_id: str) -> str:
"""保存切片文件到文件系统,返回相对路径"""
try:
# 这里应该包含实际的切片文件保存逻辑
# 暂时返回模拟路径
clip_file = f"clip_{clip_id}.mp4"
target_path = self.project_dir / "output" / "clips" / clip_file
target_path.parent.mkdir(parents=True, exist_ok=True)
# 创建模拟文件(实际应该保存真实的切片文件)
target_path.touch()
# 返回相对路径
relative_path = f"projects/{self.project_id}/output/clips/{clip_file}"
logger.info(f"切片文件已保存: {relative_path}")
return relative_path
except Exception as e:
logger.error(f"保存切片文件失败: {e}")
raise
def save_clip_metadata(self, clip_data: Dict[str, Any], clip_id: str) -> Clip:
"""保存切片元数据到数据库"""
try:
# 创建切片记录,只存储元数据
clip = Clip(
id=clip_id,
project_id=self.project_id,
title=clip_data.get('title', ''),
description=clip_data.get('description', ''),
start_time=clip_data.get('start_time', 0),
end_time=clip_data.get('end_time', 0),
duration=clip_data.get('duration', 0),
score=clip_data.get('score', 0.0),
recommendation_reason=clip_data.get('recommendation_reason', ''),
video_path=self.save_clip_file(clip_data, clip_id), # 存储相对路径
thumbnail_path=clip_data.get('thumbnail_path', ''),
processing_step=clip_data.get('processing_step', 6),
tags=clip_data.get('tags', []),
clip_metadata=clip_data.get('metadata', {}) # 存储精简元数据
)
self.db.add(clip)
self.db.commit()
self.db.refresh(clip)
logger.info(f"切片元数据已保存到数据库: {clip_id}")
return clip
except Exception as e:
logger.error(f"保存切片元数据失败: {e}")
self.db.rollback()
raise
# ==================== 合集文件管理 ====================
def save_collection_file(self, collection_data: Dict[str, Any], collection_id: str) -> str:
"""保存合集文件到文件系统,返回相对路径"""
try:
# 这里应该包含实际的合集文件保存逻辑
# 暂时返回模拟路径
collection_file = f"collection_{collection_id}.mp4"
target_path = self.project_dir / "output" / "collections" / collection_file
target_path.parent.mkdir(parents=True, exist_ok=True)
# 创建模拟文件(实际应该保存真实的合集文件)
target_path.touch()
# 返回相对路径
relative_path = f"projects/{self.project_id}/output/collections/{collection_file}"
logger.info(f"合集文件已保存: {relative_path}")
return relative_path
except Exception as e:
logger.error(f"保存合集文件失败: {e}")
raise
def save_collection_metadata(self, collection_data: Dict[str, Any], collection_id: str) -> Collection:
"""保存合集元数据到数据库"""
try:
# 创建合集记录,只存储元数据
collection = Collection(
id=collection_id,
project_id=self.project_id,
name=collection_data.get('name', ''),
description=collection_data.get('description', ''),
clip_ids=collection_data.get('clip_ids', []),
video_path=self.save_collection_file(collection_data, collection_id), # 存储相对路径
thumbnail_path=collection_data.get('thumbnail_path', ''),
tags=collection_data.get('tags', []),
collection_metadata=collection_data.get('metadata', {}) # 存储精简元数据
)
self.db.add(collection)
self.db.commit()
self.db.refresh(collection)
logger.info(f"合集元数据已保存到数据库: {collection_id}")
return collection
except Exception as e:
logger.error(f"保存合集元数据失败: {e}")
self.db.rollback()
raise
# ==================== 处理中间文件管理 ====================
def save_processing_metadata(self, metadata: Dict[str, Any], step: str) -> str:
"""保存处理中间元数据到文件系统"""
try:
metadata_file = self.project_dir / "processing" / f"{step}.json"
with open(metadata_file, 'w', encoding='utf-8') as f:
json.dump(metadata, f, ensure_ascii=False, indent=2)
logger.info(f"处理元数据已保存: {metadata_file}")
return str(metadata_file)
except Exception as e:
logger.error(f"保存处理元数据失败: {e}")
raise
def get_processing_metadata(self, step: str) -> Optional[Dict[str, Any]]:
"""获取处理中间元数据"""
try:
metadata_file = self.project_dir / "processing" / f"{step}.json"
if metadata_file.exists():
with open(metadata_file, 'r', encoding='utf-8') as f:
return json.load(f)
return None
except Exception as e:
logger.error(f"获取处理元数据失败: {e}")
return None
# ==================== 数据查询方法 ====================
def get_project_clips(self) -> List[Clip]:
"""获取项目的所有切片(从数据库)"""
return self.db.query(Clip).filter(Clip.project_id == self.project_id).all()
def get_project_collections(self) -> List[Collection]:
"""获取项目的所有合集(从数据库)"""
return self.db.query(Collection).filter(Collection.project_id == self.project_id).all()
def get_clip_file_path(self, clip: Clip) -> Path:
"""获取切片的完整文件路径"""
if clip.video_path:
return self.data_dir / clip.video_path
return None
def get_collection_file_path(self, collection: Collection) -> Path:
"""获取合集的完整文件路径"""
if collection.video_path:
return self.data_dir / collection.video_path
return None
# ==================== 清理方法 ====================
def cleanup_temp_files(self):
"""清理临时文件"""
temp_dir = self.data_dir / "temp"
if temp_dir.exists():
for temp_file in temp_dir.iterdir():
if temp_file.is_file():
temp_file.unlink()
logger.info(f"清理临时文件: {temp_file}")
def cleanup_old_files(self, keep_days: int = 30):
"""清理旧文件"""
try:
cutoff_date = datetime.utcnow() - timedelta(days=keep_days)
# 清理旧的临时文件
temp_dir = self.data_dir / "temp"
if temp_dir.exists():
for temp_file in temp_dir.iterdir():
if temp_file.is_file() and temp_file.stat().st_mtime < cutoff_date.timestamp():
temp_file.unlink()
logger.info(f"清理旧临时文件: {temp_file}")
logger.info(f"清理完成,保留 {keep_days} 天内的文件")
except Exception as e:
logger.error(f"清理旧文件失败: {e}")
# ==================== 数据迁移方法 ====================
def migrate_from_old_storage(self, old_project_dir: Path) -> Dict[str, Any]:
"""从旧存储格式迁移数据"""
try:
logger.info(f"开始迁移项目数据: {self.project_id}")
migrated_files = []
migrated_metadata = []
# 迁移原始文件
if (old_project_dir / "raw").exists():
for file_path in (old_project_dir / "raw").iterdir():
if file_path.is_file():
relative_path = self.save_project_file(file_path)
migrated_files.append(relative_path)
# 迁移处理元数据
if (old_project_dir / "processing").exists():
for metadata_file in (old_project_dir / "processing").iterdir():
if metadata_file.suffix == '.json':
with open(metadata_file, 'r', encoding='utf-8') as f:
metadata = json.load(f)
step_name = metadata_file.stem
self.save_processing_metadata(metadata, step_name)
migrated_metadata.append(step_name)
# 迁移输出文件
if (old_project_dir / "output").exists():
# 迁移切片文件
clips_dir = old_project_dir / "output" / "clips"
if clips_dir.exists():
for clip_file in clips_dir.iterdir():
if clip_file.is_file():
target_path = self.project_dir / "output" / "clips" / clip_file.name
target_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(clip_file, target_path)
migrated_files.append(f"projects/{self.project_id}/output/clips/{clip_file.name}")
# 迁移合集文件
collections_dir = old_project_dir / "output" / "collections"
if collections_dir.exists():
for collection_file in collections_dir.iterdir():
if collection_file.is_file():
target_path = self.project_dir / "output" / "collections" / collection_file.name
target_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(collection_file, target_path)
migrated_files.append(f"projects/{self.project_id}/output/collections/{collection_file.name}")
logger.info(f"数据迁移完成: {len(migrated_files)} 个文件, {len(migrated_metadata)} 个元数据")
return {
"success": True,
"migrated_files": migrated_files,
"migrated_metadata": migrated_metadata
}
except Exception as e:
logger.error(f"数据迁移失败: {e}")
return {
"success": False,
"error": str(e)
}

View File

@@ -0,0 +1,496 @@
"""
B站投稿上传服务 v6.0
基于浏览器行为的正确实现
"""
import asyncio
import aiohttp
import json
import os
import time
import hashlib
import random
import string
from typing import Optional, Dict, List, Tuple
from pathlib import Path
from datetime import datetime
import logging
from ..core.database import SessionLocal
from ..models.bilibili import BilibiliAccount, BilibiliUploadRecord
from ..utils.crypto import decrypt_data
import uuid
logger = logging.getLogger(__name__)
class BilibiliUploaderV6:
"""B站投稿上传器 v6.0 - 基于浏览器行为"""
def __init__(self, cookies: str):
self.cookies = cookies
self.session = None
self.upload_id = None
self.bv_id = None
self.error_message = None
async def upload_video(self, video_path: str, metadata: dict, max_retries: int = 3) -> bool:
"""上传视频主流程 - 基于浏览器行为"""
try:
async with aiohttp.ClientSession() as session:
self.session = session
# 1. 验证登录状态
if not await self._check_login_status():
return False
# 2. 获取预上传信息
pre_upload_info = await self._get_pre_upload_info(video_path)
if not pre_upload_info:
return False
# 3. 分片上传 - 使用浏览器行为
success = await self._chunk_upload_browser_style(video_path, pre_upload_info, max_retries)
if not success:
return False
# 4. 合并分片
success = await self._merge_chunks_browser_style(pre_upload_info)
if not success:
return False
# 5. 提交投稿
success = await self._submit_video_browser_style(pre_upload_info, metadata)
if not success:
return False
return True
except Exception as e:
self.error_message = str(e)
logger.error(f"上传视频失败: {e}")
return False
async def _check_login_status(self) -> bool:
"""检查登录状态"""
try:
headers = {
"Cookie": self.cookies,
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Referer": "https://www.bilibili.com/"
}
async with self.session.get(
"https://api.bilibili.com/x/web-interface/nav",
headers=headers,
timeout=aiohttp.ClientTimeout(total=10)
) as response:
if response.status == 200:
data = await response.json()
if data.get("code") == 0 and data.get("data", {}).get("isLogin"):
user_info = data.get("data", {})
logger.info(f"登录状态正常,用户: {user_info.get('uname', 'unknown')}")
return True
else:
self.error_message = "用户未登录或登录状态异常"
logger.error(self.error_message)
return False
else:
self.error_message = f"检查登录状态失败HTTP状态码: {response.status}"
logger.error(self.error_message)
return False
except Exception as e:
self.error_message = f"检查登录状态异常: {str(e)}"
logger.error(self.error_message)
return False
async def _get_pre_upload_info(self, video_path: str) -> Optional[dict]:
"""获取预上传信息"""
try:
file_size = os.path.getsize(video_path)
file_name = os.path.basename(video_path)
# 解析Cookie获取CSRF token
csrf_token = None
for cookie in self.cookies.split(';'):
cookie = cookie.strip()
if cookie.startswith('bili_jct='):
csrf_token = cookie.split('=', 1)[1]
break
if not csrf_token:
self.error_message = "Cookie中缺少bili_jct字段"
logger.error(self.error_message)
return None
headers = {
"Cookie": self.cookies,
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Referer": "https://member.bilibili.com/",
"Origin": "https://member.bilibili.com",
"X-CSRF-Token": csrf_token,
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin"
}
params = {
"name": file_name,
"size": str(file_size),
"r": "upos",
"profile": "ugcupos/bup",
"ssl": "0",
"version": "2.10.4",
"build": "2100400",
"upcdn": "bda2,bldsa",
"probe_version": "20200709"
}
async with self.session.get(
"https://member.bilibili.com/preupload",
headers=headers,
params=params,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
if response.status == 200:
result = await response.json()
logger.info(f"预上传API响应: {result}")
if result.get("OK") == 1:
upload_info = {
"upload_id": result.get("biz_id"),
"endpoint": result.get("endpoint"),
"auth": result.get("auth"),
"chunk_size": result.get("chunk_size", 10485760),
"upos_uri": result.get("upos_uri"),
"put_query": result.get("put_query")
}
logger.info(f"预上传信息获取成功: {upload_info}")
return upload_info
else:
self.error_message = f"预上传失败: {result.get('message', '未知错误')}"
logger.error(self.error_message)
return None
else:
self.error_message = f"预上传请求失败HTTP状态码: {response.status}"
logger.error(self.error_message)
return None
except Exception as e:
self.error_message = f"获取预上传信息异常: {str(e)}"
logger.error(self.error_message)
return None
async def _chunk_upload_browser_style(self, video_path: str, upload_info: dict, max_retries: int = 3) -> bool:
"""分片上传 - 使用浏览器行为"""
try:
file_size = os.path.getsize(video_path)
chunk_size = 2 * 1024 * 1024 # 2MB chunks
total_chunks = (file_size + chunk_size - 1) // chunk_size
endpoint = upload_info.get("endpoint", "//upos-cs-upcdnbda2.bilivideo.com")
auth = upload_info.get("auth", "")
upos_uri = upload_info.get("upos_uri", "")
# 处理endpoint格式
if "," in endpoint:
endpoint = endpoint.split(",")[0]
if not endpoint.endswith('.bilivideo.com'):
endpoint = endpoint.replace('//upos-cs-upcdnbda2', '//upos-cs-upcdnbda2.bilivideo.com')
# 处理upos_uri格式
if upos_uri.startswith("upos://"):
upos_path = upos_uri[7:]
else:
upos_path = upos_uri
# 构建上传URL
upload_url = f"https:{endpoint}/{upos_path}"
if auth:
upload_url += f"?{auth}"
logger.info(f"构建的上传URL: {upload_url}")
# 使用更完整的浏览器请求头
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Referer": "https://member.bilibili.com/",
"Origin": "https://member.bilibili.com",
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Cache-Control": "no-cache",
"Pragma": "no-cache",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "cross-site"
}
with open(video_path, 'rb') as f:
for chunk_index in range(total_chunks):
# 读取分片数据
chunk_data = f.read(chunk_size)
if not chunk_data:
break
# 重试逻辑
for attempt in range(max_retries):
try:
# 构建分片上传URL
chunk_url = f"{upload_url}&partNumber={chunk_index + 1}"
logger.debug(f"分片 {chunk_index + 1} 上传URL: {chunk_url}")
# 使用PUT方法上传分片
headers['Content-Type'] = 'application/octet-stream'
headers['Content-Length'] = str(len(chunk_data))
async with self.session.put(
chunk_url,
headers=headers,
data=chunk_data,
timeout=aiohttp.ClientTimeout(total=60)
) as response:
if response.status in [200, 201, 204]:
logger.debug(f"分片 {chunk_index + 1}/{total_chunks} 上传成功")
break
else:
if attempt < max_retries - 1:
logger.warning(f"分片 {chunk_index + 1} 上传失败,重试 {attempt + 1}/{max_retries}: HTTP {response.status}")
await asyncio.sleep(2 ** attempt) # 指数退避
else:
logger.error(f"分片 {chunk_index + 1} 上传失败: HTTP {response.status}")
response_text = await response.text()
logger.error(f"响应内容: {response_text}")
return False
except Exception as e:
if attempt < max_retries - 1:
logger.warning(f"分片 {chunk_index + 1} 上传异常,重试 {attempt + 1}/{max_retries}: {str(e)}")
await asyncio.sleep(2 ** attempt)
else:
logger.error(f"分片 {chunk_index + 1} 上传异常: {str(e)}")
return False
logger.info("所有分片上传完成")
return True
except Exception as e:
self.error_message = f"分片上传异常: {str(e)}"
logger.error(self.error_message)
return False
async def _merge_chunks_browser_style(self, upload_info: dict) -> bool:
"""合并分片 - 使用浏览器行为"""
try:
# 解析Cookie获取CSRF token
csrf_token = None
for cookie in self.cookies.split(';'):
cookie = cookie.strip()
if cookie.startswith('bili_jct='):
csrf_token = cookie.split('=', 1)[1]
break
if not csrf_token:
self.error_message = "Cookie中缺少bili_jct字段"
logger.error(self.error_message)
return False
headers = {
"Cookie": self.cookies,
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Referer": "https://member.bilibili.com/",
"Origin": "https://member.bilibili.com",
"X-CSRF-Token": csrf_token,
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin"
}
data = {
"biz_id": upload_info.get("upload_id"),
"csrf": csrf_token
}
async with self.session.post(
"https://member.bilibili.com/x/vu/web/complete",
headers=headers,
data=data,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
if response.status == 200:
result = await response.json()
logger.info(f"合并分片响应: {result}")
if result.get("code") == 0:
logger.info("分片合并成功")
return True
else:
self.error_message = f"合并分片失败: {result.get('message', '未知错误')}"
logger.error(self.error_message)
return False
else:
self.error_message = f"合并分片请求失败HTTP状态码: {response.status}"
logger.error(self.error_message)
return False
except Exception as e:
self.error_message = f"合并分片异常: {str(e)}"
logger.error(self.error_message)
return False
async def _submit_video_browser_style(self, upload_info: dict, metadata: dict) -> bool:
"""提交投稿 - 使用浏览器行为"""
try:
# 解析Cookie获取CSRF token
csrf_token = None
for cookie in self.cookies.split(';'):
cookie = cookie.strip()
if cookie.startswith('bili_jct='):
csrf_token = cookie.split('=', 1)[1]
break
if not csrf_token:
self.error_message = "Cookie中缺少bili_jct字段"
logger.error(self.error_message)
return False
headers = {
"Cookie": self.cookies,
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Referer": "https://member.bilibili.com/",
"Origin": "https://member.bilibili.com",
"X-CSRF-Token": csrf_token,
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin"
}
# 构建投稿数据
upos_uri = upload_info.get("upos_uri", "")
filename = upos_uri.split("/")[-1] if "/" in upos_uri else upos_uri
data = {
"copyright": "1", # 原创
"videos": json.dumps([{
"filename": filename,
"title": metadata.get("title", ""),
"desc": metadata.get("description", "")
}]),
"source": "",
"tid": str(metadata.get("partition_id", 1)),
"cover": "",
"title": metadata.get("title", ""),
"tag": ",".join(metadata.get("tags", [])),
"desc_format_id": "0",
"desc": metadata.get("description", ""),
"open_elec": "1",
"no_reprint": "0",
"subtitles": json.dumps({
"lan": "",
"open": "0"
}),
"csrf": csrf_token
}
async with self.session.post(
"https://member.bilibili.com/x/vu/web/add",
headers=headers,
data=data,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
if response.status == 200:
result = await response.json()
logger.info(f"提交投稿响应: {result}")
if result.get("code") == 0:
self.bv_id = result.get("data", {}).get("bvid")
logger.info(f"投稿提交成功BV号: {self.bv_id}")
return True
else:
self.error_message = f"投稿提交失败: {result.get('message', '未知错误')}"
logger.error(self.error_message)
return False
else:
self.error_message = f"投稿提交请求失败HTTP状态码: {response.status}"
logger.error(self.error_message)
return False
except Exception as e:
self.error_message = f"投稿提交异常: {str(e)}"
logger.error(self.error_message)
return False
class BilibiliUploadServiceV6:
"""B站投稿服务 v6.0"""
def __init__(self, db):
self.db = db
async def upload_clip(self, record_id: int, video_path: str, max_retries: int = 3) -> bool:
"""上传单个切片"""
try:
# 获取投稿记录
record = self.db.query(BilibiliUploadRecord).filter(BilibiliUploadRecord.id == record_id).first()
if not record:
logger.error(f"投稿记录不存在: {record_id}")
return False
# 获取账号信息
account = self.db.query(BilibiliAccount).filter(BilibiliAccount.id == record.account_id).first()
if not account:
logger.error(f"账号不存在: {record.account_id}")
return False
# 解密Cookie
try:
cookies = decrypt_data(account.cookies)
except Exception as e:
logger.error(f"解密Cookie失败: {e}")
return False
# 构建投稿元数据
metadata = {
"title": record.title,
"description": record.description or "",
"tags": json.loads(record.tags) if record.tags else [],
"partition_id": record.partition_id
}
# 使用v6.0上传器
uploader = BilibiliUploaderV6(cookies)
success = await uploader.upload_video(video_path, metadata, max_retries)
if success:
# 更新记录状态
record.status = "success"
record.bv_id = uploader.bv_id
record.updated_at = datetime.utcnow()
self.db.commit()
logger.info(f"切片上传成功: {record_id}, BV号: {uploader.bv_id}")
return True
else:
# 更新记录状态
record.status = "failed"
record.error_message = uploader.error_message
record.updated_at = datetime.utcnow()
self.db.commit()
logger.error(f"切片上传失败: {record_id}, 错误: {uploader.error_message}")
return False
except Exception as e:
logger.error(f"上传切片异常: {e}")
return False

View File

@@ -0,0 +1,361 @@
# 统一错误处理指南
## 📋 概述
本项目已实现统一的错误处理机制,提供一致的错误响应格式和自动错误处理功能。
## 🏗️ 错误处理架构
### 错误分类
```python
class ErrorCategory(Enum):
CONFIGURATION = "CONFIGURATION" # 配置错误
NETWORK = "NETWORK" # 网络错误
API = "API" # API错误
FILE_IO = "FILE_IO" # 文件IO错误
PROCESSING = "PROCESSING" # 处理错误
VALIDATION = "VALIDATION" # 验证错误
SYSTEM = "SYSTEM" # 系统错误
```
### 错误级别
```python
class ErrorLevel(Enum):
DEBUG = "DEBUG"
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
CRITICAL = "CRITICAL"
```
## 🚀 使用方法
### 1. 抛出自定义异常
```python
from backend.utils.error_handler import AutoClipsException, ErrorCategory
# 抛出配置错误
raise AutoClipsException(
message="API密钥未配置",
category=ErrorCategory.CONFIGURATION,
details={"config_key": "DASHSCOPE_API_KEY"}
)
# 抛出文件错误
raise AutoClipsException(
message="文件不存在",
category=ErrorCategory.FILE_IO,
details={"file_path": "/path/to/file.mp4"}
)
```
### 2. 使用错误处理装饰器
```python
from backend.core.error_middleware import handle_errors
from backend.utils.error_handler import ErrorCategory
@handle_errors(ErrorCategory.PROCESSING)
async def process_video(video_path: str):
# 函数内的任何异常都会被自动转换为AutoClipsException
if not os.path.exists(video_path):
raise FileNotFoundError("视频文件不存在")
# 处理逻辑...
return result
```
### 3. 使用错误上下文管理器
```python
from backend.core.error_middleware import error_context
from backend.utils.error_handler import ErrorCategory
def upload_file(file_path: str):
with error_context(ErrorCategory.FILE_IO, {"file_path": file_path}):
# 在这个上下文中抛出的任何异常都会被转换为AutoClipsException
with open(file_path, 'r') as f:
content = f.read()
return content
```
### 4. 在API路由中使用
```python
from fastapi import APIRouter, HTTPException
from backend.utils.error_handler import AutoClipsException, ErrorCategory
router = APIRouter()
@router.get("/projects/{project_id}")
async def get_project(project_id: str):
try:
# 业务逻辑
project = await get_project_from_db(project_id)
if not project:
raise AutoClipsException(
message=f"项目不存在: {project_id}",
category=ErrorCategory.VALIDATION,
details={"project_id": project_id}
)
return project
except AutoClipsException:
# 重新抛出,让全局异常处理器处理
raise
except Exception as e:
# 其他异常会被转换为AutoClipsException
raise AutoClipsException(
message="获取项目失败",
category=ErrorCategory.SYSTEM,
original_exception=e
)
```
## 📊 错误响应格式
所有错误响应都遵循统一格式:
```json
{
"error": {
"code": "AUTOCLIPS_VALIDATION",
"message": "项目不存在: abc123",
"details": {
"project_id": "abc123"
},
"request_id": "req_123456",
"timestamp": 1640995200.0
}
}
```
### 字段说明
- `code`: 错误代码,格式为 `AUTOCLIPS_{CATEGORY}``HTTP_{STATUS_CODE}`
- `message`: 错误消息,用户友好的描述
- `details`: 错误详情,包含调试信息
- `request_id`: 请求ID用于追踪
- `timestamp`: 错误发生时间戳
## 🔧 HTTP状态码映射
| 错误分类 | HTTP状态码 | 说明 |
|---------|-----------|------|
| CONFIGURATION | 500 | 配置错误 |
| NETWORK | 503 | 网络错误 |
| API | 502 | API错误 |
| FILE_IO | 500 | 文件IO错误 |
| PROCESSING | 500 | 处理错误 |
| VALIDATION | 400 | 验证错误 |
| SYSTEM | 500 | 系统错误 |
## 📝 最佳实践
### 1. 错误消息编写
```python
# ✅ 好的错误消息
raise AutoClipsException(
message="视频文件格式不支持请使用MP4格式",
category=ErrorCategory.VALIDATION,
details={"supported_formats": ["mp4", "avi", "mov"]}
)
# ❌ 不好的错误消息
raise AutoClipsException(
message="Error: Invalid file",
category=ErrorCategory.VALIDATION
)
```
### 2. 错误详情包含
```python
# ✅ 包含有用的调试信息
raise AutoClipsException(
message="处理视频失败",
category=ErrorCategory.PROCESSING,
details={
"project_id": project_id,
"step": "video_cutting",
"error_code": "FFMPEG_ERROR",
"file_size": file_size
}
)
```
### 3. 错误分类选择
```python
# ✅ 根据错误性质选择正确的分类
if not api_key:
raise AutoClipsException(
message="API密钥未配置",
category=ErrorCategory.CONFIGURATION # 配置问题
)
if response.status_code == 429:
raise AutoClipsException(
message="API调用频率超限",
category=ErrorCategory.API # API问题
)
if not os.path.exists(file_path):
raise AutoClipsException(
message="文件不存在",
category=ErrorCategory.FILE_IO # 文件问题
)
```
### 4. 异常链保持
```python
# ✅ 保持原始异常信息
try:
result = some_risky_operation()
except Exception as e:
raise AutoClipsException(
message="操作失败",
category=ErrorCategory.SYSTEM,
original_exception=e # 保持原始异常
)
```
## 🧪 测试错误处理
### 1. 测试自定义异常
```python
import pytest
from backend.utils.error_handler import AutoClipsException, ErrorCategory
def test_custom_exception():
with pytest.raises(AutoClipsException) as exc_info:
raise AutoClipsException(
message="测试错误",
category=ErrorCategory.VALIDATION
)
assert exc_info.value.category == ErrorCategory.VALIDATION
assert exc_info.value.message == "测试错误"
```
### 2. 测试API错误响应
```python
from fastapi.testclient import TestClient
from backend.main import app
client = TestClient(app)
def test_api_error_response():
response = client.get("/api/v1/projects/nonexistent")
assert response.status_code == 400
assert "error" in response.json()
assert response.json()["error"]["code"] == "AUTOCLIPS_VALIDATION"
```
## 🔍 错误监控和日志
### 1. 错误日志格式
所有错误都会自动记录到日志,格式如下:
```
2024-01-01 12:00:00 - ERROR - 未处理的异常: AutoClipsException: 项目不存在: abc123
request_id: req_123456
path: /api/v1/projects/abc123
method: GET
traceback: [完整的堆栈跟踪]
```
### 2. 错误统计
可以通过日志分析工具统计错误:
```bash
# 统计错误类型
grep "AUTOCLIPS_" backend.log | cut -d' ' -f4 | sort | uniq -c
# 统计错误频率
grep "ERROR" backend.log | wc -l
```
## 🚨 常见错误处理场景
### 1. 文件操作错误
```python
@handle_errors(ErrorCategory.FILE_IO)
async def save_file(file_path: str, content: bytes):
try:
with open(file_path, 'wb') as f:
f.write(content)
except PermissionError:
raise AutoClipsException(
message="没有文件写入权限",
category=ErrorCategory.FILE_IO,
details={"file_path": file_path}
)
except OSError as e:
raise AutoClipsException(
message="文件系统错误",
category=ErrorCategory.FILE_IO,
details={"file_path": file_path, "os_error": str(e)}
)
```
### 2. API调用错误
```python
@handle_errors(ErrorCategory.API)
async def call_external_api(url: str, data: dict):
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, json=data) as response:
if response.status == 429:
raise AutoClipsException(
message="API调用频率超限",
category=ErrorCategory.API,
details={"url": url, "status": 429}
)
return await response.json()
except aiohttp.ClientError as e:
raise AutoClipsException(
message="网络请求失败",
category=ErrorCategory.NETWORK,
details={"url": url, "error": str(e)}
)
```
### 3. 数据处理错误
```python
@handle_errors(ErrorCategory.PROCESSING)
async def process_video_data(video_path: str):
try:
# 处理逻辑
result = await video_processor.process(video_path)
return result
except VideoProcessingError as e:
raise AutoClipsException(
message="视频处理失败",
category=ErrorCategory.PROCESSING,
details={
"video_path": video_path,
"error_code": e.code,
"step": e.step
},
original_exception=e
)
```
## 📚 相关文档
- [API文档](./API_DOCUMENTATION.md)
- [配置管理指南](./CONFIGURATION_GUIDE.md)
- [日志管理指南](./LOGGING_GUIDE.md)

View File

@@ -0,0 +1,459 @@
# 增强进度系统使用指南
## 📋 概述
本项目已实现增强的进度系统提供统一的进度跟踪、状态管理和错误处理功能。该系统整合了Redis缓存、数据库持久化和内存缓存确保进度信息的可靠性和实时性。
## 🏗️ 系统架构
### 进度阶段
```python
class ProgressStage(Enum):
INGEST = "INGEST" # 下载/就绪 (10%)
SUBTITLE = "SUBTITLE" # 字幕/对齐 (15%)
ANALYZE = "ANALYZE" # 语义分析/大纲 (20%)
HIGHLIGHT = "HIGHLIGHT" # 片段定位/打分 (25%)
EXPORT = "EXPORT" # 导出/封装 (20%)
DONE = "DONE" # 校验/归档 (10%)
ERROR = "ERROR" # 错误状态
```
### 进度状态
```python
class ProgressStatus(Enum):
PENDING = "PENDING" # 等待中
RUNNING = "RUNNING" # 运行中
COMPLETED = "COMPLETED" # 已完成
FAILED = "FAILED" # 失败
CANCELLED = "CANCELLED" # 已取消
```
### 存储层次
1. **内存缓存**: 快速访问,存储当前活跃的进度信息
2. **Redis缓存**: 分布式缓存,支持多实例共享
3. **数据库持久化**: 长期存储,与项目状态同步
## 🚀 使用方法
### 1. 基本进度跟踪
```python
from backend.services.enhanced_progress_service import (
start_progress, update_progress, complete_progress, fail_progress,
ProgressStage, ProgressStatus
)
# 开始进度跟踪
progress_info = start_progress(
project_id="project_123",
task_id="task_456",
initial_message="开始处理视频"
)
# 更新进度
progress_info = update_progress(
project_id="project_123",
stage=ProgressStage.SUBTITLE,
message="正在生成字幕",
sub_progress=50.0 # 当前阶段50%完成
)
# 完成进度
progress_info = complete_progress(
project_id="project_123",
message="视频处理完成"
)
# 标记失败
progress_info = fail_progress(
project_id="project_123",
error_message="视频文件损坏"
)
```
### 2. 在服务中使用
```python
from backend.services.enhanced_progress_service import (
progress_service, ProgressStage
)
from backend.core.error_middleware import handle_errors, ErrorCategory
class VideoProcessingService:
@handle_errors(ErrorCategory.PROCESSING)
async def process_video(self, project_id: str, video_path: str):
try:
# 开始进度跟踪
progress_service.start_progress(
project_id=project_id,
initial_message="开始处理视频"
)
# 下载阶段
progress_service.update_progress(
project_id=project_id,
stage=ProgressStage.INGEST,
message="下载视频文件",
sub_progress=100.0
)
# 字幕生成阶段
progress_service.update_progress(
project_id=project_id,
stage=ProgressStage.SUBTITLE,
message="生成字幕",
sub_progress=0.0
)
# 模拟字幕生成过程
for i in range(10):
await asyncio.sleep(1) # 模拟处理时间
progress_service.update_progress(
project_id=project_id,
stage=ProgressStage.SUBTITLE,
message=f"字幕生成进度: {i*10}%",
sub_progress=i * 10.0
)
# 分析阶段
progress_service.update_progress(
project_id=project_id,
stage=ProgressStage.ANALYZE,
message="分析视频内容",
sub_progress=0.0
)
# 继续其他阶段...
# 完成处理
progress_service.complete_progress(
project_id=project_id,
message="视频处理完成"
)
except Exception as e:
# 标记失败
progress_service.fail_progress(
project_id=project_id,
error_message=str(e)
)
raise
```
### 3. 在API中使用
```python
from fastapi import APIRouter, HTTPException
from backend.services.enhanced_progress_service import get_progress
router = APIRouter()
@router.get("/projects/{project_id}/progress")
async def get_project_progress(project_id: str):
"""获取项目进度"""
try:
progress_info = get_progress(project_id)
if not progress_info:
raise HTTPException(status_code=404, detail="项目进度不存在")
return {
"project_id": project_id,
"progress": progress_info.to_dict()
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
```
### 4. 添加进度回调
```python
from backend.services.enhanced_progress_service import progress_service
def progress_callback(progress_info):
"""进度回调函数"""
print(f"项目 {progress_info.project_id} 进度更新: {progress_info.progress}%")
# 可以在这里添加其他逻辑,如:
# - 发送通知
# - 更新前端状态
# - 记录日志
# - 触发其他服务
# 注册回调
progress_service.add_progress_callback(progress_callback)
```
## 📊 进度信息结构
```python
@dataclass
class ProgressInfo:
project_id: str # 项目ID
task_id: Optional[str] # 任务ID
stage: ProgressStage # 当前阶段
status: ProgressStatus # 状态
progress: int # 总进度 (0-100)
message: str # 当前消息
error_message: Optional[str] # 错误消息
start_time: Optional[datetime] # 开始时间
end_time: Optional[datetime] # 结束时间
estimated_remaining: Optional[int] # 预估剩余时间(秒)
metadata: Optional[Dict[str, Any]] # 元数据
```
### 进度计算规则
- **INGEST阶段**: 0-10%
- **SUBTITLE阶段**: 10-25%
- **ANALYZE阶段**: 25-45%
- **HIGHLIGHT阶段**: 45-70%
- **EXPORT阶段**: 70-90%
- **DONE阶段**: 100%
每个阶段内部可以通过`sub_progress`参数(0-100)来细分进度。
## 🔧 配置和优化
### 1. Redis配置
```python
# 在backend/core/unified_config.py中配置
redis:
url: "redis://localhost:6379/0"
max_connections: 10
socket_timeout: 5
```
### 2. 清理配置
```python
# 定期清理旧进度信息
progress_service.cleanup_old_progress(max_age_hours=24)
```
### 3. 错误处理
```python
from backend.utils.error_handler import AutoClipsException, ErrorCategory
try:
progress_service.update_progress(project_id, stage, message)
except AutoClipsException as e:
if e.category == ErrorCategory.SYSTEM:
# 系统错误,记录日志但不中断处理
logger.error(f"进度更新失败: {e}")
else:
# 其他错误,重新抛出
raise
```
## 📝 最佳实践
### 1. 进度消息编写
```python
# ✅ 好的进度消息
progress_service.update_progress(
project_id=project_id,
stage=ProgressStage.SUBTITLE,
message="正在生成字幕预计还需2分钟",
sub_progress=60.0
)
# ❌ 不好的进度消息
progress_service.update_progress(
project_id=project_id,
stage=ProgressStage.SUBTITLE,
message="处理中...",
sub_progress=60.0
)
```
### 2. 错误处理
```python
# ✅ 完整的错误处理
try:
# 处理逻辑
result = await process_video(video_path)
progress_service.complete_progress(project_id, "处理完成")
except Exception as e:
# 记录详细错误信息
error_message = f"处理失败: {str(e)}"
progress_service.fail_progress(project_id, error_message)
raise
```
### 3. 元数据使用
```python
# ✅ 使用元数据传递额外信息
progress_service.update_progress(
project_id=project_id,
stage=ProgressStage.ANALYZE,
message="分析视频内容",
metadata={
"video_duration": 1200, # 视频时长(秒)
"analysis_method": "ai", # 分析方法
"estimated_clips": 5 # 预估切片数
}
)
```
### 4. 性能优化
```python
# ✅ 批量更新进度
for i, item in enumerate(items):
if i % 10 == 0: # 每10个项目更新一次进度
progress_service.update_progress(
project_id=project_id,
stage=ProgressStage.PROCESSING,
message=f"处理进度: {i}/{len(items)}",
sub_progress=i / len(items) * 100
)
```
## 🧪 测试进度系统
### 1. 单元测试
```python
import pytest
from backend.services.enhanced_progress_service import (
start_progress, update_progress, complete_progress,
ProgressStage, ProgressStatus
)
def test_progress_tracking():
project_id = "test_project"
# 开始进度
progress = start_progress(project_id, initial_message="开始测试")
assert progress.project_id == project_id
assert progress.status == ProgressStatus.RUNNING
assert progress.progress == 0
# 更新进度
progress = update_progress(
project_id=project_id,
stage=ProgressStage.SUBTITLE,
message="测试字幕生成",
sub_progress=50.0
)
assert progress.stage == ProgressStage.SUBTITLE
assert progress.progress > 0
# 完成进度
progress = complete_progress(project_id, "测试完成")
assert progress.status == ProgressStatus.COMPLETED
assert progress.progress == 100
```
### 2. 集成测试
```python
async def test_progress_integration():
project_id = "integration_test"
# 模拟完整的处理流程
start_progress(project_id, "开始集成测试")
for stage in [ProgressStage.INGEST, ProgressStage.SUBTITLE,
ProgressStage.ANALYZE, ProgressStage.HIGHLIGHT,
ProgressStage.EXPORT]:
update_progress(project_id, stage, f"测试{stage.value}阶段")
await asyncio.sleep(0.1) # 模拟处理时间
complete_progress(project_id, "集成测试完成")
# 验证最终状态
final_progress = get_progress(project_id)
assert final_progress.status == ProgressStatus.COMPLETED
assert final_progress.progress == 100
```
## 🔍 监控和调试
### 1. 进度监控
```python
# 获取所有活跃进度
active_progress = progress_service.get_all_active_progress()
for progress in active_progress:
print(f"项目 {progress.project_id}: {progress.progress}% - {progress.message}")
```
### 2. 调试信息
```python
# 获取详细进度信息
progress_info = get_progress(project_id)
if progress_info:
print(f"项目ID: {progress_info.project_id}")
print(f"当前阶段: {progress_info.stage.value}")
print(f"总进度: {progress_info.progress}%")
print(f"状态: {progress_info.status.value}")
print(f"消息: {progress_info.message}")
print(f"开始时间: {progress_info.start_time}")
print(f"预估剩余: {progress_info.estimated_remaining}")
if progress_info.metadata:
print(f"元数据: {progress_info.metadata}")
```
### 3. 日志记录
```python
import logging
# 配置进度日志
progress_logger = logging.getLogger('progress')
progress_logger.setLevel(logging.INFO)
def progress_log_callback(progress_info):
progress_logger.info(
f"项目 {progress_info.project_id} 进度更新: "
f"{progress_info.progress}% - {progress_info.message}"
)
progress_service.add_progress_callback(progress_log_callback)
```
## 🚨 常见问题
### 1. Redis连接失败
```python
# 系统会自动降级到内存缓存
# 检查Redis配置和连接
if not progress_service.redis_client:
logger.warning("Redis不可用使用内存缓存")
```
### 2. 进度信息丢失
```python
# 定期清理可能导致进度信息丢失
# 建议设置合理的清理时间
progress_service.cleanup_old_progress(max_age_hours=48) # 48小时
```
### 3. 进度更新频率过高
```python
# 系统内置了节流机制,避免频繁更新
# 建议在循环中控制更新频率
for i, item in enumerate(items):
if i % 10 == 0: # 每10次更新一次
update_progress(project_id, stage, message, i/len(items)*100)
```
## 📚 相关文档
- [错误处理指南](./ERROR_HANDLING_GUIDE.md)
- [配置管理指南](./CONFIGURATION_GUIDE.md)
- [API文档](./API_DOCUMENTATION.md)

View File

@@ -0,0 +1,267 @@
#!/usr/bin/env python3
"""
数据一致性检查脚本
检查数据库和文件系统之间的一致性
"""
import sys
import os
import json
import logging
from pathlib import Path
from datetime import datetime
# 添加项目根目录到Python路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from backend.core.database import SessionLocal
from backend.models.project import Project
from backend.models.clip import Clip
from backend.models.collection import Collection
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def check_project_consistency(db, project_id: str):
"""检查单个项目的数据一致性"""
logger.info(f"🔍 检查项目一致性: {project_id}")
issues = []
try:
# 检查数据库中的项目
project = db.query(Project).filter(Project.id == project_id).first()
if not project:
issues.append("项目在数据库中不存在")
return {"project_id": project_id, "issues": issues, "status": "error"}
# 检查项目目录
data_dir = project_root / "data"
project_dir = data_dir / "projects" / project_id
if not project_dir.exists():
issues.append("项目目录不存在")
return {"project_id": project_id, "issues": issues, "status": "error"}
# 检查切片一致性
clips_issues = check_clips_consistency(db, project_id, project_dir)
issues.extend(clips_issues)
# 检查合集一致性
collections_issues = check_collections_consistency(db, project_id, project_dir)
issues.extend(collections_issues)
# 检查文件路径
file_path_issues = check_file_paths_consistency(db, project_id, project_dir)
issues.extend(file_path_issues)
status = "warning" if issues else "ok"
return {
"project_id": project_id,
"issues": issues,
"status": status,
"clips_count": len(db.query(Clip).filter(Clip.project_id == project_id).all()),
"collections_count": len(db.query(Collection).filter(Collection.project_id == project_id).all())
}
except Exception as e:
logger.error(f"检查项目 {project_id} 时发生错误: {e}")
return {
"project_id": project_id,
"issues": [f"检查过程中发生错误: {str(e)}"],
"status": "error"
}
def check_clips_consistency(db, project_id: str, project_dir: Path):
"""检查切片数据一致性"""
issues = []
try:
# 获取数据库中的切片
db_clips = db.query(Clip).filter(Clip.project_id == project_id).all()
# 检查文件系统中的切片文件
clips_dir = project_dir / "output" / "clips"
fs_clips = list(clips_dir.glob("*.mp4")) if clips_dir.exists() else []
# 检查数据库中的切片文件是否存在
for clip in db_clips:
if clip.video_path:
file_path = project_root / "data" / clip.video_path
if not file_path.exists():
issues.append(f"切片文件不存在: {clip.video_path}")
# 检查文件系统中的切片是否在数据库中有记录
for clip_file in fs_clips:
clip_name = clip_file.name
found_in_db = any(clip.video_path and clip.video_path.endswith(clip_name) for clip in db_clips)
if not found_in_db:
issues.append(f"文件系统中的切片未在数据库中记录: {clip_name}")
# 检查重复数据
clips_metadata_file = project_dir / "clips_metadata.json"
if clips_metadata_file.exists():
issues.append("存在重复的切片元数据文件 (clips_metadata.json)")
except Exception as e:
issues.append(f"检查切片一致性时发生错误: {str(e)}")
return issues
def check_collections_consistency(db, project_id: str, project_dir: Path):
"""检查合集数据一致性"""
issues = []
try:
# 获取数据库中的合集
db_collections = db.query(Collection).filter(Collection.project_id == project_id).all()
# 检查文件系统中的合集文件
collections_dir = project_dir / "output" / "collections"
fs_collections = list(collections_dir.glob("*.mp4")) if collections_dir.exists() else []
# 检查数据库中的合集文件是否存在
for collection in db_collections:
if collection.video_path:
file_path = project_root / "data" / collection.video_path
if not file_path.exists():
issues.append(f"合集文件不存在: {collection.video_path}")
# 检查文件系统中的合集是否在数据库中有记录
for collection_file in fs_collections:
collection_name = collection_file.name
found_in_db = any(collection.video_path and collection.video_path.endswith(collection_name) for collection in db_collections)
if not found_in_db:
issues.append(f"文件系统中的合集未在数据库中记录: {collection_name}")
# 检查重复数据
collections_metadata_file = project_dir / "collections_metadata.json"
if collections_metadata_file.exists():
issues.append("存在重复的合集元数据文件 (collections_metadata.json)")
except Exception as e:
issues.append(f"检查合集一致性时发生错误: {str(e)}")
return issues
def check_file_paths_consistency(db, project_id: str, project_dir: Path):
"""检查文件路径一致性"""
issues = []
try:
# 检查项目文件路径
project = db.query(Project).filter(Project.id == project_id).first()
if project and project.video_path:
video_path = project_root / "data" / project.video_path
if not video_path.exists():
issues.append(f"项目视频文件不存在: {project.video_path}")
if project and project.subtitle_path:
subtitle_path = project_root / "data" / project.subtitle_path
if not subtitle_path.exists():
issues.append(f"项目字幕文件不存在: {project.subtitle_path}")
except Exception as e:
issues.append(f"检查文件路径一致性时发生错误: {str(e)}")
return issues
def generate_consistency_report(results):
"""生成一致性检查报告"""
report = {
"timestamp": datetime.now().isoformat(),
"total_projects": len(results),
"ok_projects": len([r for r in results if r["status"] == "ok"]),
"warning_projects": len([r for r in results if r["status"] == "warning"]),
"error_projects": len([r for r in results if r["status"] == "error"]),
"results": results
}
return report
def main():
"""主函数"""
logger.info("🔍 开始数据一致性检查...")
db = SessionLocal()
try:
# 获取所有项目
projects = db.query(Project).all()
if not projects:
logger.info("📭 没有找到项目")
return
logger.info(f"📊 找到 {len(projects)} 个项目,开始检查...")
results = []
for project in projects:
result = check_project_consistency(db, project.id)
results.append(result)
# 生成报告
report = generate_consistency_report(results)
# 显示检查结果
print("\n" + "=" * 80)
print("📊 数据一致性检查报告")
print("=" * 80)
print(f"检查时间: {report['timestamp']}")
print(f"总项目数: {report['total_projects']}")
print(f"✅ 正常: {report['ok_projects']}")
print(f"⚠️ 警告: {report['warning_projects']}")
print(f"❌ 错误: {report['error_projects']}")
print("\n📋 详细结果:")
print("-" * 80)
for result in results:
status_icon = {
"ok": "",
"warning": "⚠️ ",
"error": ""
}.get(result["status"], "")
print(f"{status_icon} 项目 {result['project_id'][:8]}... | "
f"切片: {result.get('clips_count', 0)} | "
f"合集: {result.get('collections_count', 0)}")
if result["issues"]:
for issue in result["issues"]:
print(f"{issue}")
# 保存报告
report_file = project_root / f"consistency_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
print(f"\n💾 详细报告已保存: {report_file}")
# 提供建议
if report['warning_projects'] > 0 or report['error_projects'] > 0:
print("\n🔧 建议:")
print("1. 运行数据迁移脚本修复问题")
print("2. 检查文件权限和路径配置")
print("3. 清理重复的元数据文件")
except Exception as e:
logger.error(f"❌ 检查过程中发生错误: {e}")
finally:
db.close()
logger.info("🎉 一致性检查完成!")
if __name__ == "__main__":
main()

383
scripts/migrate_config.py Normal file
View File

@@ -0,0 +1,383 @@
#!/usr/bin/env python3
"""
配置迁移脚本
将旧的分散配置系统迁移到新的统一配置系统
"""
import sys
import os
import json
import logging
from pathlib import Path
from datetime import datetime
# 添加项目根目录到Python路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from backend.core.unified_config import UnifiedConfig, config
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def analyze_old_configs():
"""分析旧的配置文件"""
logger.info("🔍 分析旧配置文件...")
old_configs = {}
# 检查data/settings.json
settings_file = project_root / "data" / "settings.json"
if settings_file.exists():
try:
with open(settings_file, 'r', encoding='utf-8') as f:
old_configs['settings.json'] = json.load(f)
logger.info(f"✅ 找到配置文件: {settings_file}")
except Exception as e:
logger.warning(f"⚠️ 读取配置文件失败: {e}")
# 检查.env文件
env_file = project_root / ".env"
if env_file.exists():
try:
with open(env_file, 'r', encoding='utf-8') as f:
env_content = f.read()
old_configs['.env'] = parse_env_file(env_content)
logger.info(f"✅ 找到环境变量文件: {env_file}")
except Exception as e:
logger.warning(f"⚠️ 读取环境变量文件失败: {e}")
# 检查backend/core/config.py中的默认值
old_configs['config.py_defaults'] = {
"database_url": "sqlite:///./data/autoclip.db",
"redis_url": "redis://localhost:6379/0",
"api_dashscope_api_key": "",
"api_model_name": "qwen-plus",
"processing_chunk_size": 5000,
"processing_min_score_threshold": 0.7,
"log_level": "INFO"
}
return old_configs
def parse_env_file(env_content: str) -> dict:
"""解析.env文件内容"""
env_vars = {}
for line in env_content.split('\n'):
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
env_vars[key.strip()] = value.strip().strip('"\'')
return env_vars
def migrate_configs(old_configs: dict, dry_run: bool = True):
"""迁移配置到新的统一配置系统"""
logger.info(f"🔄 开始配置迁移 (dry_run={dry_run})")
migration_log = {
"timestamp": datetime.now().isoformat(),
"dry_run": dry_run,
"migrated_settings": {},
"issues": []
}
try:
# 创建新的配置实例
new_config = UnifiedConfig()
# 迁移settings.json中的配置
if 'settings.json' in old_configs:
settings = old_configs['settings.json']
migrated_settings = migrate_settings_json(settings, new_config)
migration_log['migrated_settings']['settings.json'] = migrated_settings
# 迁移.env文件中的配置
if '.env' in old_configs:
env_vars = old_configs['.env']
migrated_env = migrate_env_vars(env_vars, new_config)
migration_log['migrated_settings']['.env'] = migrated_env
# 验证新配置
validation_result = new_config.validate_config()
if not validation_result['valid']:
migration_log['issues'].extend(validation_result['issues'])
if dry_run:
logger.info("🔍 模拟迁移完成")
return {
"success": True,
"dry_run": True,
"migration_log": migration_log,
"new_config_summary": new_config.get_config_summary()
}
# 实际迁移
if not migration_log['issues']:
# 备份旧配置
backup_old_configs(old_configs)
# 保存新配置
new_config.save_to_file()
logger.info("✅ 配置迁移完成")
return {
"success": True,
"migration_log": migration_log,
"new_config_summary": new_config.get_config_summary()
}
else:
logger.error("❌ 配置验证失败,无法迁移")
return {
"success": False,
"migration_log": migration_log
}
except Exception as e:
logger.error(f"❌ 配置迁移失败: {e}")
migration_log['issues'].append(f"迁移过程中发生错误: {str(e)}")
return {
"success": False,
"migration_log": migration_log
}
def migrate_settings_json(settings: dict, new_config: UnifiedConfig) -> dict:
"""迁移settings.json中的配置"""
migrated = {}
# API配置
if 'dashscope_api_key' in settings:
new_config.api.dashscope_api_key = settings['dashscope_api_key']
migrated['dashscope_api_key'] = 'migrated'
if 'model_name' in settings:
new_config.api.model_name = settings['model_name']
migrated['model_name'] = 'migrated'
# 处理配置
if 'chunk_size' in settings:
new_config.processing.chunk_size = settings['chunk_size']
migrated['chunk_size'] = 'migrated'
if 'min_score_threshold' in settings:
new_config.processing.min_score_threshold = settings['min_score_threshold']
migrated['min_score_threshold'] = 'migrated'
if 'max_clips_per_collection' in settings:
new_config.processing.max_clips_per_collection = settings['max_clips_per_collection']
migrated['max_clips_per_collection'] = 'migrated'
# 语音识别配置
if 'speech_recognition_method' in settings:
new_config.speech_recognition.method = settings['speech_recognition_method']
migrated['speech_recognition_method'] = 'migrated'
if 'speech_recognition_language' in settings:
new_config.speech_recognition.language = settings['speech_recognition_language']
migrated['speech_recognition_language'] = 'migrated'
# B站配置
if 'bilibili_auto_upload' in settings:
new_config.bilibili.auto_upload = settings['bilibili_auto_upload']
migrated['bilibili_auto_upload'] = 'migrated'
if 'bilibili_default_tid' in settings:
new_config.bilibili.default_tid = settings['bilibili_default_tid']
migrated['bilibili_default_tid'] = 'migrated'
return migrated
def migrate_env_vars(env_vars: dict, new_config: UnifiedConfig) -> dict:
"""迁移环境变量"""
migrated = {}
# 数据库配置
if 'DATABASE_URL' in env_vars:
new_config.database.url = env_vars['DATABASE_URL']
migrated['DATABASE_URL'] = 'migrated'
# Redis配置
if 'REDIS_URL' in env_vars:
new_config.redis.url = env_vars['REDIS_URL']
migrated['REDIS_URL'] = 'migrated'
# API配置
if 'DASHSCOPE_API_KEY' in env_vars:
new_config.api.dashscope_api_key = env_vars['DASHSCOPE_API_KEY']
migrated['DASHSCOPE_API_KEY'] = 'migrated'
if 'API_MODEL_NAME' in env_vars:
new_config.api.model_name = env_vars['API_MODEL_NAME']
migrated['API_MODEL_NAME'] = 'migrated'
# 处理配置
if 'PROCESSING_CHUNK_SIZE' in env_vars:
new_config.processing.chunk_size = int(env_vars['PROCESSING_CHUNK_SIZE'])
migrated['PROCESSING_CHUNK_SIZE'] = 'migrated'
if 'PROCESSING_MIN_SCORE_THRESHOLD' in env_vars:
new_config.processing.min_score_threshold = float(env_vars['PROCESSING_MIN_SCORE_THRESHOLD'])
migrated['PROCESSING_MIN_SCORE_THRESHOLD'] = 'migrated'
# 日志配置
if 'LOG_LEVEL' in env_vars:
new_config.logging.level = env_vars['LOG_LEVEL']
migrated['LOG_LEVEL'] = 'migrated'
if 'LOG_FILE' in env_vars:
new_config.logging.file = env_vars['LOG_FILE']
migrated['LOG_FILE'] = 'migrated'
return migrated
def backup_old_configs(old_configs: dict):
"""备份旧配置文件"""
backup_dir = project_root / f"config_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
backup_dir.mkdir(exist_ok=True)
logger.info(f"📦 创建配置备份: {backup_dir}")
# 备份settings.json
if 'settings.json' in old_configs:
settings_file = project_root / "data" / "settings.json"
if settings_file.exists():
backup_file = backup_dir / "settings.json"
with open(settings_file, 'r', encoding='utf-8') as src, \
open(backup_file, 'w', encoding='utf-8') as dst:
dst.write(src.read())
# 备份.env文件
env_file = project_root / ".env"
if env_file.exists():
backup_file = backup_dir / ".env"
with open(env_file, 'r', encoding='utf-8') as src, \
open(backup_file, 'w', encoding='utf-8') as dst:
dst.write(src.read())
# 保存迁移日志
migration_log_file = backup_dir / "migration_log.json"
with open(migration_log_file, 'w', encoding='utf-8') as f:
json.dump(old_configs, f, ensure_ascii=False, indent=2)
logger.info(f"✅ 配置备份完成: {backup_dir}")
def display_config_comparison(old_configs: dict, new_config_summary: dict):
"""显示配置对比"""
print("\n" + "=" * 80)
print("📊 配置对比")
print("=" * 80)
print("\n🔧 API配置:")
print(f" 模型名称: {new_config_summary['api']['model_name']}")
print(f" 最大Token: {new_config_summary['api']['max_tokens']}")
print(f" 超时时间: {new_config_summary['api']['timeout']}")
print(f" API密钥: {'已配置' if new_config_summary['api']['has_api_key'] else '未配置'}")
print("\n⚙️ 处理配置:")
print(f" 分块大小: {new_config_summary['processing']['chunk_size']}")
print(f" 最小评分阈值: {new_config_summary['processing']['min_score_threshold']}")
print(f" 最大切片数: {new_config_summary['processing']['max_clips_per_collection']}")
print("\n🗄️ 数据库配置:")
print(f" 数据库URL: {new_config_summary['database']['url']}")
print(f" Redis URL: {new_config_summary['redis']['url']}")
print("\n📁 路径配置:")
print(f" 数据目录: {new_config_summary['paths']['data_dir']}")
print(f" 上传目录: {new_config_summary['paths']['uploads_dir']}")
print(f" 输出目录: {new_config_summary['paths']['output_dir']}")
print("\n📝 日志配置:")
print(f" 日志级别: {new_config_summary['logging']['level']}")
print(f" 日志文件: {new_config_summary['logging']['file']}")
def main():
"""主函数"""
logger.info("🚀 开始配置迁移...")
# 分析旧配置
old_configs = analyze_old_configs()
if not old_configs:
logger.info("📭 没有找到需要迁移的配置文件")
return
print("\n📋 发现的配置文件:")
for config_name in old_configs.keys():
print(f"{config_name}")
# 询问是否继续
print("\n" + "=" * 60)
print("🔧 迁移选项:")
print("1. 模拟迁移 (dry run) - 查看迁移效果但不实际执行")
print("2. 执行迁移 - 实际迁移配置并备份旧文件")
print("3. 退出")
while True:
choice = input("\n请选择操作 (1/2/3): ").strip()
if choice in ['1', '2', '3']:
break
print("❌ 无效选择,请输入 1、2 或 3")
if choice == '3':
logger.info("👋 用户取消迁移")
return
dry_run = (choice == '1')
# 执行迁移
result = migrate_configs(old_configs, dry_run)
if result['success']:
if dry_run:
print("\n🔍 模拟迁移结果:")
else:
print("\n✅ 迁移完成:")
# 显示配置对比
if 'new_config_summary' in result:
display_config_comparison(old_configs, result['new_config_summary'])
# 显示迁移日志
migration_log = result['migration_log']
if migration_log['migrated_settings']:
print("\n📊 迁移统计:")
for config_name, migrated in migration_log['migrated_settings'].items():
print(f" {config_name}: {len(migrated)} 个设置项")
# 显示问题
if migration_log['issues']:
print("\n⚠️ 发现的问题:")
for issue in migration_log['issues']:
print(f"{issue}")
if not dry_run:
print(f"\n💾 备份位置: config_backup_*")
print("🔧 建议:")
print("1. 测试系统功能是否正常")
print("2. 确认无误后可以删除备份文件")
print("3. 检查新的配置文件格式")
else:
print("\n❌ 迁移失败:")
migration_log = result['migration_log']
if migration_log['issues']:
for issue in migration_log['issues']:
print(f"{issue}")
logger.info("🎉 配置迁移完成!")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,246 @@
#!/usr/bin/env python3
"""
数据存储优化迁移脚本
将双重存储模式迁移到优化存储模式(数据库存储元数据,文件系统存储实际文件)
"""
import sys
import os
import json
import logging
from pathlib import Path
from datetime import datetime
# 添加项目根目录到Python路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from backend.core.database import SessionLocal
from backend.services.optimized_storage_service import OptimizedStorageService
from backend.models.project import Project
from backend.models.clip import Clip
from backend.models.collection import Collection
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def analyze_current_storage():
"""分析当前存储状况"""
logger.info("🔍 分析当前存储状况...")
data_dir = project_root / "data"
projects_dir = data_dir / "projects"
if not projects_dir.exists():
logger.warning("项目目录不存在")
return {"projects": [], "total_size": 0}
projects_info = []
total_size = 0
for project_dir in projects_dir.iterdir():
if project_dir.is_dir() and not project_dir.name.startswith('.'):
project_id = project_dir.name
project_size = sum(f.stat().st_size for f in project_dir.rglob('*') if f.is_file())
# 检查是否有重复数据
has_duplicate_data = False
if (project_dir / "clips_metadata.json").exists():
has_duplicate_data = True
projects_info.append({
"project_id": project_id,
"size_mb": round(project_size / (1024 * 1024), 2),
"has_duplicate_data": has_duplicate_data,
"files_count": len(list(project_dir.rglob('*')))
})
total_size += project_size
logger.info(f"📊 分析完成: {len(projects_info)} 个项目, 总大小: {round(total_size / (1024 * 1024), 2)} MB")
return {
"projects": projects_info,
"total_size": total_size
}
def migrate_project_to_optimized_storage(db, project_id: str, dry_run: bool = True):
"""迁移单个项目到优化存储模式"""
logger.info(f"🔄 迁移项目: {project_id} (dry_run={dry_run})")
try:
# 获取项目信息
project = db.query(Project).filter(Project.id == project_id).first()
if not project:
logger.warning(f"项目 {project_id} 在数据库中不存在")
return {"success": False, "error": "项目不存在"}
# 检查项目目录
data_dir = project_root / "data"
old_project_dir = data_dir / "projects" / project_id
if not old_project_dir.exists():
logger.warning(f"项目目录不存在: {old_project_dir}")
return {"success": False, "error": "项目目录不存在"}
if dry_run:
logger.info(f"🔍 模拟迁移项目 {project_id}")
return {"success": True, "dry_run": True, "message": "模拟迁移成功"}
# 创建优化存储服务
storage_service = OptimizedStorageService(db, project_id)
# 执行迁移
migration_result = storage_service.migrate_from_old_storage(old_project_dir)
if migration_result["success"]:
logger.info(f"✅ 项目 {project_id} 迁移成功")
# 清理旧的双重存储文件
cleanup_old_duplicate_files(old_project_dir)
return {
"success": True,
"migrated_files": migration_result["migrated_files"],
"migrated_metadata": migration_result["migrated_metadata"]
}
else:
logger.error(f"❌ 项目 {project_id} 迁移失败: {migration_result['error']}")
return migration_result
except Exception as e:
logger.error(f"❌ 迁移项目 {project_id} 时发生错误: {e}")
return {"success": False, "error": str(e)}
def cleanup_old_duplicate_files(project_dir: Path):
"""清理旧的双重存储文件"""
try:
logger.info(f"🧹 清理项目 {project_dir.name} 的重复文件...")
# 删除重复的元数据文件
duplicate_files = [
"clips_metadata.json",
"collections_metadata.json",
"step1_outline.json",
"step2_timeline.json",
"step3_scoring.json",
"step4_titles.json",
"step5_collections.json"
]
cleaned_count = 0
for file_name in duplicate_files:
file_path = project_dir / file_name
if file_path.exists():
# 备份文件
backup_path = project_dir / f"{file_name}.backup"
file_path.rename(backup_path)
cleaned_count += 1
logger.info(f"📦 备份重复文件: {file_name}")
logger.info(f"✅ 清理完成,备份了 {cleaned_count} 个重复文件")
except Exception as e:
logger.error(f"❌ 清理重复文件失败: {e}")
def main():
"""主函数"""
logger.info("🚀 开始数据存储优化迁移...")
# 分析当前存储状况
storage_info = analyze_current_storage()
if not storage_info["projects"]:
logger.info("📭 没有找到需要迁移的项目")
return
# 显示分析结果
print("\n📊 当前存储状况分析:")
print("=" * 60)
for project in storage_info["projects"]:
status = "⚠️ 有重复数据" if project["has_duplicate_data"] else "✅ 正常"
print(f"项目 {project['project_id'][:8]}... | {project['size_mb']:>8.2f} MB | {project['files_count']:>4} 文件 | {status}")
print(f"\n总大小: {round(storage_info['total_size'] / (1024 * 1024), 2)} MB")
# 询问是否继续
print("\n" + "=" * 60)
print("🔧 迁移选项:")
print("1. 模拟迁移 (dry run) - 查看迁移效果但不实际执行")
print("2. 执行迁移 - 实际迁移数据并清理重复文件")
print("3. 退出")
while True:
choice = input("\n请选择操作 (1/2/3): ").strip()
if choice in ['1', '2', '3']:
break
print("❌ 无效选择,请输入 1、2 或 3")
if choice == '3':
logger.info("👋 用户取消迁移")
return
dry_run = (choice == '1')
if dry_run:
logger.info("🔍 开始模拟迁移...")
else:
logger.info("🚀 开始实际迁移...")
# 创建备份
backup_dir = project_root / f"migration_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
backup_dir.mkdir(exist_ok=True)
logger.info(f"📦 创建备份目录: {backup_dir}")
# 执行迁移
db = SessionLocal()
try:
success_count = 0
failed_count = 0
for project_info in storage_info["projects"]:
project_id = project_info["project_id"]
result = migrate_project_to_optimized_storage(db, project_id, dry_run)
if result["success"]:
success_count += 1
if dry_run:
logger.info(f"✅ 模拟迁移成功: {project_id}")
else:
logger.info(f"✅ 迁移成功: {project_id}")
else:
failed_count += 1
logger.error(f"❌ 迁移失败: {project_id} - {result.get('error', '未知错误')}")
# 显示迁移结果
print("\n" + "=" * 60)
print("📊 迁移结果:")
print(f"✅ 成功: {success_count}")
print(f"❌ 失败: {failed_count}")
print(f"📊 总计: {success_count + failed_count}")
if not dry_run and success_count > 0:
print(f"\n💾 备份位置: {backup_dir}")
print("🔧 建议:")
print("1. 测试系统功能是否正常")
print("2. 确认无误后可以删除备份文件")
print("3. 运行数据一致性检查")
except Exception as e:
logger.error(f"❌ 迁移过程中发生错误: {e}")
finally:
db.close()
logger.info("🎉 迁移完成!")
if __name__ == "__main__":
main()