chore: normalize line endings for issue fixes

This commit is contained in:
Genz
2026-04-02 14:10:00 +08:00
parent 9b83d4595c
commit 5d18293f1e
5 changed files with 7181 additions and 7181 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -1,126 +1,126 @@
"""FastAPI application initialization"""
from fastapi import FastAPI
from fastapi.responses import HTMLResponse, FileResponse
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from pathlib import Path
from .core.config import config
from .core.database import Database
from .services.flow_client import FlowClient
from .services.proxy_manager import ProxyManager
from .services.token_manager import TokenManager
from .services.load_balancer import LoadBalancer
from .services.concurrency_manager import ConcurrencyManager
from .services.generation_handler import GenerationHandler
from .api import routes, admin
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager"""
# Startup
print("=" * 60)
print("Flow2API Starting...")
print("=" * 60)
# Get config from setting.toml
config_dict = config.get_raw_config()
# Check if database exists (determine if first startup)
is_first_startup = not db.db_exists()
# Initialize database tables structure
await db.init_db()
# Handle database initialization based on startup type
if is_first_startup:
print("🎉 First startup detected. Initializing database and configuration from setting.toml...")
await db.init_config_from_toml(config_dict, is_first_startup=True)
print("✓ Database and configuration initialized successfully.")
else:
print("🔄 Existing database detected. Checking for missing tables and columns...")
await db.check_and_migrate_db(config_dict)
print("✓ Database migration check completed.")
"""FastAPI application initialization"""
from fastapi import FastAPI
from fastapi.responses import HTMLResponse, FileResponse
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from pathlib import Path
from .core.config import config
from .core.database import Database
from .services.flow_client import FlowClient
from .services.proxy_manager import ProxyManager
from .services.token_manager import TokenManager
from .services.load_balancer import LoadBalancer
from .services.concurrency_manager import ConcurrencyManager
from .services.generation_handler import GenerationHandler
from .api import routes, admin
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager"""
# Startup
print("=" * 60)
print("Flow2API Starting...")
print("=" * 60)
# Get config from setting.toml
config_dict = config.get_raw_config()
# Check if database exists (determine if first startup)
is_first_startup = not db.db_exists()
# Initialize database tables structure
await db.init_db()
# Handle database initialization based on startup type
if is_first_startup:
print("🎉 First startup detected. Initializing database and configuration from setting.toml...")
await db.init_config_from_toml(config_dict, is_first_startup=True)
print("✓ Database and configuration initialized successfully.")
else:
print("🔄 Existing database detected. Checking for missing tables and columns...")
await db.check_and_migrate_db(config_dict)
print("✓ Database migration check completed.")
# 启动时统一把数据库配置同步到内存,避免 personal/brower 相关运行时配置遗漏。
await db.reload_config_to_memory()
generation_handler.file_cache.set_timeout(config.cache_timeout)
cache_cleanup_enabled = await generation_handler.file_cache.refresh_cleanup_task()
captcha_config = await db.get_captcha_config()
# 尽量在浏览器服务启动前就拿到 token 快照,后续并发管理和预热共用。
tokens = await token_manager.get_all_tokens()
# Initialize browser captcha service if needed
browser_service = None
if captcha_config.captcha_method == "personal":
from .services.browser_captcha_personal import BrowserCaptchaService
browser_service = await BrowserCaptchaService.get_instance(db)
print("✓ Browser captcha service initialized (nodriver mode)")
warmup_limit = max(1, int(config.personal_max_resident_tabs or 1))
warmup_project_ids = await token_manager.get_personal_warmup_project_ids(
tokens=tokens,
limit=warmup_limit,
)
warmed_slots = []
warmup_error = None
try:
warmed_slots = await browser_service.warmup_resident_tabs(
warmup_project_ids,
limit=warmup_limit,
)
except Exception as e:
warmup_error = e
print(
"⚠ Browser captcha resident warmup failed: "
f"{type(e).__name__}: {e}"
)
if warmed_slots:
print(
f"✓ Browser captcha shared resident tabs warmed "
f"({len(warmed_slots)} slot(s), limit={warmup_limit})"
)
elif warmup_error is not None:
print("⚠ Browser captcha resident warmup skipped for this startup")
elif tokens:
print("⚠ Browser captcha resident warmup skipped: no tab warmed successfully")
else:
# 没有任何可用 token 时,打开登录窗口供用户手动操作
await browser_service.open_login_window()
print("⚠ No active token found, opened login window for manual setup")
elif captcha_config.captcha_method == "browser":
from .services.browser_captcha import BrowserCaptchaService
browser_service = await BrowserCaptchaService.get_instance(db)
await browser_service.warmup_browser_slots()
print("? Browser captcha service initialized (headed mode)")
# Initialize concurrency manager
await concurrency_manager.initialize(tokens)
if config.captcha_method == "remote_browser":
try:
warmed_projects = await flow_client.prefill_remote_browser_for_tokens(tokens, action="IMAGE_GENERATION")
print(f"✓ Remote browser pool prefill started for {warmed_projects} project(s)")
except Exception as e:
print(f"⚠ Remote browser pool prefill failed: {e}")
# 尽量在浏览器服务启动前就拿到 token 快照,后续并发管理和预热共用。
tokens = await token_manager.get_all_tokens()
# Initialize browser captcha service if needed
browser_service = None
if captcha_config.captcha_method == "personal":
from .services.browser_captcha_personal import BrowserCaptchaService
browser_service = await BrowserCaptchaService.get_instance(db)
print("✓ Browser captcha service initialized (nodriver mode)")
warmup_limit = max(1, int(config.personal_max_resident_tabs or 1))
warmup_project_ids = await token_manager.get_personal_warmup_project_ids(
tokens=tokens,
limit=warmup_limit,
)
warmed_slots = []
warmup_error = None
try:
warmed_slots = await browser_service.warmup_resident_tabs(
warmup_project_ids,
limit=warmup_limit,
)
except Exception as e:
warmup_error = e
print(
"⚠ Browser captcha resident warmup failed: "
f"{type(e).__name__}: {e}"
)
if warmed_slots:
print(
f"✓ Browser captcha shared resident tabs warmed "
f"({len(warmed_slots)} slot(s), limit={warmup_limit})"
)
elif warmup_error is not None:
print("⚠ Browser captcha resident warmup skipped for this startup")
elif tokens:
print("⚠ Browser captcha resident warmup skipped: no tab warmed successfully")
else:
# 没有任何可用 token 时,打开登录窗口供用户手动操作
await browser_service.open_login_window()
print("⚠ No active token found, opened login window for manual setup")
elif captcha_config.captcha_method == "browser":
from .services.browser_captcha import BrowserCaptchaService
browser_service = await BrowserCaptchaService.get_instance(db)
await browser_service.warmup_browser_slots()
print("? Browser captcha service initialized (headed mode)")
# Initialize concurrency manager
await concurrency_manager.initialize(tokens)
if config.captcha_method == "remote_browser":
try:
warmed_projects = await flow_client.prefill_remote_browser_for_tokens(tokens, action="IMAGE_GENERATION")
print(f"✓ Remote browser pool prefill started for {warmed_projects} project(s)")
except Exception as e:
print(f"⚠ Remote browser pool prefill failed: {e}")
# Start 429 auto-unban task
import asyncio
async def auto_unban_task():
"""定时任务每小时检查并解禁429被禁用的token"""
while True:
try:
await asyncio.sleep(3600) # 每小时执行一次
await token_manager.auto_unban_429_tokens()
except Exception as e:
print(f"❌ Auto-unban task error: {e}")
auto_unban_task_handle = asyncio.create_task(auto_unban_task())
async def auto_unban_task():
"""定时任务每小时检查并解禁429被禁用的token"""
while True:
try:
await asyncio.sleep(3600) # 每小时执行一次
await token_manager.auto_unban_429_tokens()
except Exception as e:
print(f"❌ Auto-unban task error: {e}")
auto_unban_task_handle = asyncio.create_task(auto_unban_task())
print(f"✓ Database initialized")
print(f"✓ Total tokens: {len(tokens)}")
print(f"✓ Cache: {'Enabled' if config.cache_enabled else 'Disabled'} (timeout: {config.cache_timeout}s)")
@@ -129,110 +129,110 @@ async def lifespan(app: FastAPI):
else:
print("✓ File cache cleanup task disabled (timeout <= 0)")
print(f"✓ 429 auto-unban task started (runs every hour)")
print(f"✓ Server running on http://{config.server_host}:{config.server_port}")
print("=" * 60)
yield
# Shutdown
print("Flow2API Shutting down...")
# Stop file cache cleanup task
await generation_handler.file_cache.stop_cleanup_task()
# Stop auto-unban task
auto_unban_task_handle.cancel()
try:
await auto_unban_task_handle
except asyncio.CancelledError:
pass
# Close browser if initialized
if browser_service:
await browser_service.close()
print("✓ Browser captcha service closed")
print("✓ File cache cleanup task stopped")
print("✓ 429 auto-unban task stopped")
# Initialize components
db = Database()
proxy_manager = ProxyManager(db)
flow_client = FlowClient(proxy_manager, db)
token_manager = TokenManager(db, flow_client)
concurrency_manager = ConcurrencyManager()
load_balancer = LoadBalancer(token_manager, concurrency_manager)
generation_handler = GenerationHandler(
flow_client,
token_manager,
load_balancer,
db,
concurrency_manager,
proxy_manager # 添加 proxy_manager 参数
)
# Set dependencies
routes.set_generation_handler(generation_handler)
admin.set_dependencies(token_manager, proxy_manager, db, concurrency_manager)
# Create FastAPI app
app = FastAPI(
title="Flow2API",
description="OpenAI-compatible API for Google VideoFX (Veo)",
version="1.0.0",
lifespan=lifespan
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(routes.router)
app.include_router(admin.router)
# Static files - serve tmp directory for cached files
tmp_dir = Path(__file__).parent.parent / "tmp"
tmp_dir.mkdir(exist_ok=True)
app.mount("/tmp", StaticFiles(directory=str(tmp_dir)), name="tmp")
# HTML routes for frontend
static_path = Path(__file__).parent.parent / "static"
@app.get("/", response_class=HTMLResponse)
async def index():
"""Redirect to login page"""
login_file = static_path / "login.html"
if login_file.exists():
return FileResponse(str(login_file))
return HTMLResponse(content="<h1>Flow2API</h1><p>Frontend not found</p>", status_code=404)
@app.get("/login", response_class=HTMLResponse)
async def login_page():
"""Login page"""
login_file = static_path / "login.html"
if login_file.exists():
return FileResponse(str(login_file))
return HTMLResponse(content="<h1>Login Page Not Found</h1>", status_code=404)
@app.get("/manage", response_class=HTMLResponse)
async def manage_page():
"""Management console page"""
manage_file = static_path / "manage.html"
if manage_file.exists():
return FileResponse(str(manage_file))
return HTMLResponse(content="<h1>Management Page Not Found</h1>", status_code=404)
@app.get("/test", response_class=HTMLResponse)
async def test_page():
"""Model testing page"""
test_file = static_path / "test.html"
if test_file.exists():
return FileResponse(str(test_file))
return HTMLResponse(content="<h1>Test Page Not Found</h1>", status_code=404)
print(f"✓ Server running on http://{config.server_host}:{config.server_port}")
print("=" * 60)
yield
# Shutdown
print("Flow2API Shutting down...")
# Stop file cache cleanup task
await generation_handler.file_cache.stop_cleanup_task()
# Stop auto-unban task
auto_unban_task_handle.cancel()
try:
await auto_unban_task_handle
except asyncio.CancelledError:
pass
# Close browser if initialized
if browser_service:
await browser_service.close()
print("✓ Browser captcha service closed")
print("✓ File cache cleanup task stopped")
print("✓ 429 auto-unban task stopped")
# Initialize components
db = Database()
proxy_manager = ProxyManager(db)
flow_client = FlowClient(proxy_manager, db)
token_manager = TokenManager(db, flow_client)
concurrency_manager = ConcurrencyManager()
load_balancer = LoadBalancer(token_manager, concurrency_manager)
generation_handler = GenerationHandler(
flow_client,
token_manager,
load_balancer,
db,
concurrency_manager,
proxy_manager # 添加 proxy_manager 参数
)
# Set dependencies
routes.set_generation_handler(generation_handler)
admin.set_dependencies(token_manager, proxy_manager, db, concurrency_manager)
# Create FastAPI app
app = FastAPI(
title="Flow2API",
description="OpenAI-compatible API for Google VideoFX (Veo)",
version="1.0.0",
lifespan=lifespan
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(routes.router)
app.include_router(admin.router)
# Static files - serve tmp directory for cached files
tmp_dir = Path(__file__).parent.parent / "tmp"
tmp_dir.mkdir(exist_ok=True)
app.mount("/tmp", StaticFiles(directory=str(tmp_dir)), name="tmp")
# HTML routes for frontend
static_path = Path(__file__).parent.parent / "static"
@app.get("/", response_class=HTMLResponse)
async def index():
"""Redirect to login page"""
login_file = static_path / "login.html"
if login_file.exists():
return FileResponse(str(login_file))
return HTMLResponse(content="<h1>Flow2API</h1><p>Frontend not found</p>", status_code=404)
@app.get("/login", response_class=HTMLResponse)
async def login_page():
"""Login page"""
login_file = static_path / "login.html"
if login_file.exists():
return FileResponse(str(login_file))
return HTMLResponse(content="<h1>Login Page Not Found</h1>", status_code=404)
@app.get("/manage", response_class=HTMLResponse)
async def manage_page():
"""Management console page"""
manage_file = static_path / "manage.html"
if manage_file.exists():
return FileResponse(str(manage_file))
return HTMLResponse(content="<h1>Management Page Not Found</h1>", status_code=404)
@app.get("/test", response_class=HTMLResponse)
async def test_page():
"""Model testing page"""
test_file = static_path / "test.html"
if test_file.exists():
return FileResponse(str(test_file))
return HTMLResponse(content="<h1>Test Page Not Found</h1>", status_code=404)

File diff suppressed because it is too large Load Diff

View File

@@ -1,182 +1,182 @@
"""File caching service"""
import os
import asyncio
import hashlib
import time
import mimetypes
from pathlib import Path
from typing import Optional, Dict, Any
from datetime import datetime, timedelta
from urllib.parse import urlparse
from curl_cffi.requests import AsyncSession
from ..core.config import config
from ..core.logger import debug_logger
class FileCache:
"""File caching service for videos"""
def __init__(
self,
cache_dir: str = "tmp",
default_timeout: int = 7200,
proxy_manager=None,
flow_client=None,
):
"""
Initialize file cache
Args:
cache_dir: Cache directory path
default_timeout: Default cache timeout in seconds (default: 2 hours)
proxy_manager: ProxyManager instance for downloading files
"""
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
self.default_timeout = max(0, int(default_timeout))
self.proxy_manager = proxy_manager
self.flow_client = flow_client
self._cleanup_task = None
self._download_locks: Dict[str, asyncio.Lock] = {}
def _is_cleanup_disabled(self) -> bool:
return self.default_timeout <= 0
def _get_request_fingerprint(self) -> Optional[Dict[str, Any]]:
"""读取当前请求链路里绑定的浏览器指纹。"""
if not self.flow_client or not hasattr(self.flow_client, "get_request_fingerprint"):
return None
try:
fingerprint = self.flow_client.get_request_fingerprint()
if isinstance(fingerprint, dict) and fingerprint:
return fingerprint
except Exception as e:
debug_logger.log_warning(f"Get request fingerprint failed: {str(e)}")
return None
async def _resolve_download_proxy(
self,
media_type: str,
fingerprint: Optional[Dict[str, Any]] = None,
) -> Optional[str]:
"""根据媒体类型解析下载代理地址。"""
if isinstance(fingerprint, dict):
fingerprint_proxy = str(fingerprint.get("proxy_url") or "").strip()
if fingerprint_proxy:
return fingerprint_proxy
if not self.proxy_manager:
return None
try:
# 媒体下载(图片/视频)优先使用独立的上传/下载代理
if media_type in ("image", "video") and hasattr(self.proxy_manager, "get_media_proxy_url"):
return await self.proxy_manager.get_media_proxy_url()
# 其他下载走请求代理
if hasattr(self.proxy_manager, "get_request_proxy_url"):
return await self.proxy_manager.get_request_proxy_url()
# 向后兼容旧实现
if hasattr(self.proxy_manager, "get_proxy_url"):
return await self.proxy_manager.get_proxy_url()
except Exception as e:
debug_logger.log_warning(f"Resolve download proxy failed: {str(e)}")
return None
def _guess_extension(self, url: str, media_type: str) -> str:
"""尽量保留原始扩展名,未知时回退到默认值。"""
path = urlparse(url).path or ""
guessed, _ = mimetypes.guess_type(path)
suffix = Path(path).suffix.lower()
if media_type == "video":
if suffix in {".mp4", ".mov", ".webm", ".mkv", ".m4v"}:
return suffix
if guessed == "video/webm":
return ".webm"
if guessed == "video/quicktime":
return ".mov"
return ".mp4"
if media_type == "image":
if suffix in {".png", ".jpg", ".jpeg", ".webp", ".gif", ".avif", ".bmp"}:
return suffix
if guessed == "image/png":
return ".png"
if guessed == "image/webp":
return ".webp"
if guessed == "image/gif":
return ".gif"
if guessed == "image/avif":
return ".avif"
if guessed == "image/bmp":
return ".bmp"
return ".jpg"
return suffix
def _build_download_headers(
self,
media_type: str,
fingerprint: Optional[Dict[str, Any]] = None,
) -> Dict[str, str]:
"""构建媒体下载请求头,优先复用当前打码浏览器指纹。"""
headers = {
"Accept": (
"image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8"
if media_type == "image"
else "*/*"
),
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Referer": "https://labs.google/",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-Mode": "cors",
}
if media_type == "image":
headers["Sec-Fetch-Dest"] = "image"
else:
headers["Sec-Fetch-Dest"] = "video"
if isinstance(fingerprint, dict):
if fingerprint.get("user_agent"):
headers["User-Agent"] = str(fingerprint["user_agent"])
if fingerprint.get("accept_language"):
headers["Accept-Language"] = str(fingerprint["accept_language"])
if fingerprint.get("sec_ch_ua"):
headers["sec-ch-ua"] = str(fingerprint["sec_ch_ua"])
if fingerprint.get("sec_ch_ua_mobile"):
headers["sec-ch-ua-mobile"] = str(fingerprint["sec_ch_ua_mobile"])
if fingerprint.get("sec_ch_ua_platform"):
headers["sec-ch-ua-platform"] = str(fingerprint["sec_ch_ua_platform"])
headers.setdefault(
"User-Agent",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)
return headers
def _write_cached_content(self, file_path: Path, content: bytes):
"""先写临时文件,再原子替换,避免并发读到半截文件。"""
temp_path = file_path.with_suffix(f"{file_path.suffix}.part")
try:
with open(temp_path, "wb") as f:
f.write(content)
temp_path.replace(file_path)
finally:
if temp_path.exists():
try:
temp_path.unlink()
except Exception:
pass
"""File caching service"""
import os
import asyncio
import hashlib
import time
import mimetypes
from pathlib import Path
from typing import Optional, Dict, Any
from datetime import datetime, timedelta
from urllib.parse import urlparse
from curl_cffi.requests import AsyncSession
from ..core.config import config
from ..core.logger import debug_logger
class FileCache:
"""File caching service for videos"""
def __init__(
self,
cache_dir: str = "tmp",
default_timeout: int = 7200,
proxy_manager=None,
flow_client=None,
):
"""
Initialize file cache
Args:
cache_dir: Cache directory path
default_timeout: Default cache timeout in seconds (default: 2 hours)
proxy_manager: ProxyManager instance for downloading files
"""
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
self.default_timeout = max(0, int(default_timeout))
self.proxy_manager = proxy_manager
self.flow_client = flow_client
self._cleanup_task = None
self._download_locks: Dict[str, asyncio.Lock] = {}
def _is_cleanup_disabled(self) -> bool:
return self.default_timeout <= 0
def _get_request_fingerprint(self) -> Optional[Dict[str, Any]]:
"""读取当前请求链路里绑定的浏览器指纹。"""
if not self.flow_client or not hasattr(self.flow_client, "get_request_fingerprint"):
return None
try:
fingerprint = self.flow_client.get_request_fingerprint()
if isinstance(fingerprint, dict) and fingerprint:
return fingerprint
except Exception as e:
debug_logger.log_warning(f"Get request fingerprint failed: {str(e)}")
return None
async def _resolve_download_proxy(
self,
media_type: str,
fingerprint: Optional[Dict[str, Any]] = None,
) -> Optional[str]:
"""根据媒体类型解析下载代理地址。"""
if isinstance(fingerprint, dict):
fingerprint_proxy = str(fingerprint.get("proxy_url") or "").strip()
if fingerprint_proxy:
return fingerprint_proxy
if not self.proxy_manager:
return None
try:
# 媒体下载(图片/视频)优先使用独立的上传/下载代理
if media_type in ("image", "video") and hasattr(self.proxy_manager, "get_media_proxy_url"):
return await self.proxy_manager.get_media_proxy_url()
# 其他下载走请求代理
if hasattr(self.proxy_manager, "get_request_proxy_url"):
return await self.proxy_manager.get_request_proxy_url()
# 向后兼容旧实现
if hasattr(self.proxy_manager, "get_proxy_url"):
return await self.proxy_manager.get_proxy_url()
except Exception as e:
debug_logger.log_warning(f"Resolve download proxy failed: {str(e)}")
return None
def _guess_extension(self, url: str, media_type: str) -> str:
"""尽量保留原始扩展名,未知时回退到默认值。"""
path = urlparse(url).path or ""
guessed, _ = mimetypes.guess_type(path)
suffix = Path(path).suffix.lower()
if media_type == "video":
if suffix in {".mp4", ".mov", ".webm", ".mkv", ".m4v"}:
return suffix
if guessed == "video/webm":
return ".webm"
if guessed == "video/quicktime":
return ".mov"
return ".mp4"
if media_type == "image":
if suffix in {".png", ".jpg", ".jpeg", ".webp", ".gif", ".avif", ".bmp"}:
return suffix
if guessed == "image/png":
return ".png"
if guessed == "image/webp":
return ".webp"
if guessed == "image/gif":
return ".gif"
if guessed == "image/avif":
return ".avif"
if guessed == "image/bmp":
return ".bmp"
return ".jpg"
return suffix
def _build_download_headers(
self,
media_type: str,
fingerprint: Optional[Dict[str, Any]] = None,
) -> Dict[str, str]:
"""构建媒体下载请求头,优先复用当前打码浏览器指纹。"""
headers = {
"Accept": (
"image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8"
if media_type == "image"
else "*/*"
),
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Referer": "https://labs.google/",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-Mode": "cors",
}
if media_type == "image":
headers["Sec-Fetch-Dest"] = "image"
else:
headers["Sec-Fetch-Dest"] = "video"
if isinstance(fingerprint, dict):
if fingerprint.get("user_agent"):
headers["User-Agent"] = str(fingerprint["user_agent"])
if fingerprint.get("accept_language"):
headers["Accept-Language"] = str(fingerprint["accept_language"])
if fingerprint.get("sec_ch_ua"):
headers["sec-ch-ua"] = str(fingerprint["sec_ch_ua"])
if fingerprint.get("sec_ch_ua_mobile"):
headers["sec-ch-ua-mobile"] = str(fingerprint["sec_ch_ua_mobile"])
if fingerprint.get("sec_ch_ua_platform"):
headers["sec-ch-ua-platform"] = str(fingerprint["sec_ch_ua_platform"])
headers.setdefault(
"User-Agent",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)
return headers
def _write_cached_content(self, file_path: Path, content: bytes):
"""先写临时文件,再原子替换,避免并发读到半截文件。"""
temp_path = file_path.with_suffix(f"{file_path.suffix}.part")
try:
with open(temp_path, "wb") as f:
f.write(content)
temp_path.replace(file_path)
finally:
if temp_path.exists():
try:
temp_path.unlink()
except Exception:
pass
async def start_cleanup_task(self):
"""Start background cleanup task"""
if self._is_cleanup_disabled():
@@ -191,8 +191,8 @@ class FileCache:
"""Stop background cleanup task"""
if self._cleanup_task:
self._cleanup_task.cancel()
try:
await self._cleanup_task
try:
await self._cleanup_task
except asyncio.CancelledError:
pass
self._cleanup_task = None
@@ -203,22 +203,22 @@ class FileCache:
await self.stop_cleanup_task()
return False
return await self.start_cleanup_task()
async def _cleanup_loop(self):
"""Background task to clean up expired files"""
while True:
try:
await asyncio.sleep(300) # Check every 5 minutes
await self._cleanup_expired_files()
except asyncio.CancelledError:
break
except Exception as e:
debug_logger.log_error(
error_message=f"Cleanup task error: {str(e)}",
status_code=0,
response_text=""
)
async def _cleanup_loop(self):
"""Background task to clean up expired files"""
while True:
try:
await asyncio.sleep(300) # Check every 5 minutes
await self._cleanup_expired_files()
except asyncio.CancelledError:
break
except Exception as e:
debug_logger.log_error(
error_message=f"Cleanup task error: {str(e)}",
status_code=0,
response_text=""
)
async def _cleanup_expired_files(self):
"""Remove expired cache files"""
try:
@@ -242,273 +242,273 @@ class FileCache:
removed_count += 1
except Exception:
pass
if removed_count > 0:
debug_logger.log_info(f"Cleanup: removed {removed_count} expired cache files")
except Exception as e:
debug_logger.log_error(
error_message=f"Failed to cleanup expired files: {str(e)}",
status_code=0,
response_text=""
)
def _generate_cache_filename(self, url: str, media_type: str) -> str:
"""Generate unique filename for cached file"""
# Use URL hash as filename
url_hash = hashlib.md5(url.encode()).hexdigest()
ext = self._guess_extension(url, media_type)
return f"{url_hash}{ext}"
def _normalize_cache_error(self, error: Exception) -> str:
"""整理缓存错误,避免将底层命令异常直接暴露给用户。"""
if isinstance(error, FileNotFoundError):
missing_name = Path(getattr(error, "filename", "") or "curl").name or "curl"
return f"本机未安装 {missing_name}"
message = str(error or "").strip()
if not message:
return "未知错误"
if message.startswith("Failed to cache file:"):
message = message.split(":", 1)[1].strip() or "未知错误"
return message
async def download_and_cache(self, url: str, media_type: str) -> str:
"""
Download file from URL and cache it locally
Args:
url: File URL to download
media_type: 'image' or 'video'
Returns:
Local cache filename
"""
filename = self._generate_cache_filename(url, media_type)
file_path = self.cache_dir / filename
download_lock = self._download_locks.setdefault(filename, asyncio.Lock())
async with download_lock:
# Check if already cached and not expired
if file_path.exists():
if self._is_cleanup_disabled():
return filename
file_age = time.time() - file_path.stat().st_mtime
if file_age < self.default_timeout:
debug_logger.log_info(f"Cache hit: {filename}")
return filename
try:
file_path.unlink()
except Exception:
pass
# Download file
debug_logger.log_info(f"Downloading file from: {url}")
fingerprint = self._get_request_fingerprint()
proxy_url = await self._resolve_download_proxy(media_type, fingerprint=fingerprint)
headers = self._build_download_headers(media_type, fingerprint=fingerprint)
# Try method 1: curl_cffi with browser impersonation
try:
async with AsyncSession() as session:
response = await session.get(
url,
timeout=60,
proxy=proxy_url,
headers=headers,
impersonate="chrome120",
verify=False
)
if response.status_code == 200 and response.content:
self._write_cached_content(file_path, response.content)
debug_logger.log_info(
f"File cached (curl_cffi): {filename} ({len(response.content)} bytes)"
)
return filename
debug_logger.log_warning(
f"curl_cffi failed with HTTP {response.status_code}, trying wget..."
)
except Exception as e:
debug_logger.log_warning(f"curl_cffi failed: {str(e)}, trying wget...")
# Try method 2: wget command
try:
import subprocess
wget_cmd = [
"wget",
"-q",
"-O", str(file_path),
"--timeout=60",
"--tries=3",
f"--user-agent={headers.get('User-Agent', '')}",
f"--header=Accept: {headers.get('Accept', '*/*')}",
f"--header=Accept-Language: {headers.get('Accept-Language', 'zh-CN,zh;q=0.9,en;q=0.8')}",
f"--header=Connection: {headers.get('Connection', 'keep-alive')}",
f"--header=Referer: {headers.get('Referer', 'https://labs.google/')}",
]
if "sec-ch-ua" in headers:
wget_cmd.append(f"--header=sec-ch-ua: {headers['sec-ch-ua']}")
if "sec-ch-ua-mobile" in headers:
wget_cmd.append(f"--header=sec-ch-ua-mobile: {headers['sec-ch-ua-mobile']}")
if "sec-ch-ua-platform" in headers:
wget_cmd.append(f"--header=sec-ch-ua-platform: {headers['sec-ch-ua-platform']}")
if proxy_url:
env = os.environ.copy()
env["http_proxy"] = proxy_url
env["https_proxy"] = proxy_url
else:
env = None
wget_cmd.append(url)
result = subprocess.run(wget_cmd, capture_output=True, timeout=90, env=env)
if result.returncode == 0 and file_path.exists():
file_size = file_path.stat().st_size
if file_size > 0:
debug_logger.log_info(f"File cached (wget): {filename} ({file_size} bytes)")
return filename
raise Exception("Downloaded file is empty")
error_msg = result.stderr.decode("utf-8", errors="ignore") if result.stderr else "Unknown error"
debug_logger.log_warning(f"wget failed: {error_msg}, trying curl...")
except FileNotFoundError:
debug_logger.log_warning("wget not found, trying curl...")
except Exception as e:
debug_logger.log_warning(f"wget failed: {str(e)}, trying curl...")
# Try method 3: system curl command
try:
import subprocess
curl_cmd = [
"curl",
"-L",
"-s",
"-o", str(file_path),
"--max-time", "60",
"-H", f"Accept: {headers.get('Accept', '*/*')}",
"-H", f"Accept-Language: {headers.get('Accept-Language', 'zh-CN,zh;q=0.9,en;q=0.8')}",
"-H", f"Connection: {headers.get('Connection', 'keep-alive')}",
"-H", f"Referer: {headers.get('Referer', 'https://labs.google/')}",
"-A", headers.get("User-Agent", ""),
]
if "sec-ch-ua" in headers:
curl_cmd.extend(["-H", f"sec-ch-ua: {headers['sec-ch-ua']}"])
if "sec-ch-ua-mobile" in headers:
curl_cmd.extend(["-H", f"sec-ch-ua-mobile: {headers['sec-ch-ua-mobile']}"])
if "sec-ch-ua-platform" in headers:
curl_cmd.extend(["-H", f"sec-ch-ua-platform: {headers['sec-ch-ua-platform']}"])
if proxy_url:
curl_cmd.extend(["-x", proxy_url])
curl_cmd.append(url)
result = subprocess.run(curl_cmd, capture_output=True, timeout=90)
if result.returncode == 0 and file_path.exists():
file_size = file_path.stat().st_size
if file_size > 0:
debug_logger.log_info(f"File cached (curl): {filename} ({file_size} bytes)")
return filename
raise Exception("Downloaded file is empty")
error_msg = result.stderr.decode("utf-8", errors="ignore") if result.stderr else "Unknown error"
raise Exception(f"curl command failed: {error_msg}")
except FileNotFoundError as e:
normalized_error = self._normalize_cache_error(e)
debug_logger.log_error(
error_message=f"Failed to download file: {str(e)}",
status_code=0,
response_text=str(e)
)
raise Exception(normalized_error) from e
except Exception as e:
normalized_error = self._normalize_cache_error(e)
debug_logger.log_error(
error_message=f"Failed to download file: {str(e)}",
status_code=0,
response_text=str(e)
)
raise Exception(normalized_error) from e
async def cache_base64_image(self, base64_data: str, resolution: str = "") -> str:
"""
Cache base64 encoded image data to local file
Args:
base64_data: Base64 encoded image data (without data:image/... prefix)
resolution: Resolution info for filename (e.g., "4K", "2K")
Returns:
Local cache filename
"""
import base64
import uuid
# Generate unique filename
unique_id = hashlib.md5(f"{uuid.uuid4()}{time.time()}".encode()).hexdigest()
suffix = f"_{resolution}" if resolution else ""
filename = f"{unique_id}{suffix}.jpg"
file_path = self.cache_dir / filename
try:
# Decode base64 and save to file
image_data = base64.b64decode(base64_data)
with open(file_path, 'wb') as f:
f.write(image_data)
debug_logger.log_info(f"Base64 image cached: {filename} ({len(image_data)} bytes)")
return filename
except Exception as e:
debug_logger.log_error(
error_message=f"Failed to cache base64 image: {str(e)}",
status_code=0,
response_text=""
)
raise Exception(f"Failed to cache base64 image: {str(e)}")
def get_cache_path(self, filename: str) -> Path:
"""Get full path to cached file"""
return self.cache_dir / filename
def set_timeout(self, timeout: int):
"""Set cache timeout in seconds"""
self.default_timeout = max(0, int(timeout))
debug_logger.log_info(f"Cache timeout updated to {timeout} seconds")
def get_timeout(self) -> int:
"""Get current cache timeout"""
return self.default_timeout
async def clear_all(self):
"""Clear all cached files"""
try:
removed_count = 0
for file_path in self.cache_dir.iterdir():
if file_path.is_file():
try:
file_path.unlink()
removed_count += 1
except Exception:
pass
debug_logger.log_info(f"Cache cleared: removed {removed_count} files")
return removed_count
except Exception as e:
debug_logger.log_error(
error_message=f"Failed to clear cache: {str(e)}",
status_code=0,
response_text=""
)
raise
if removed_count > 0:
debug_logger.log_info(f"Cleanup: removed {removed_count} expired cache files")
except Exception as e:
debug_logger.log_error(
error_message=f"Failed to cleanup expired files: {str(e)}",
status_code=0,
response_text=""
)
def _generate_cache_filename(self, url: str, media_type: str) -> str:
"""Generate unique filename for cached file"""
# Use URL hash as filename
url_hash = hashlib.md5(url.encode()).hexdigest()
ext = self._guess_extension(url, media_type)
return f"{url_hash}{ext}"
def _normalize_cache_error(self, error: Exception) -> str:
"""整理缓存错误,避免将底层命令异常直接暴露给用户。"""
if isinstance(error, FileNotFoundError):
missing_name = Path(getattr(error, "filename", "") or "curl").name or "curl"
return f"本机未安装 {missing_name}"
message = str(error or "").strip()
if not message:
return "未知错误"
if message.startswith("Failed to cache file:"):
message = message.split(":", 1)[1].strip() or "未知错误"
return message
async def download_and_cache(self, url: str, media_type: str) -> str:
"""
Download file from URL and cache it locally
Args:
url: File URL to download
media_type: 'image' or 'video'
Returns:
Local cache filename
"""
filename = self._generate_cache_filename(url, media_type)
file_path = self.cache_dir / filename
download_lock = self._download_locks.setdefault(filename, asyncio.Lock())
async with download_lock:
# Check if already cached and not expired
if file_path.exists():
if self._is_cleanup_disabled():
return filename
file_age = time.time() - file_path.stat().st_mtime
if file_age < self.default_timeout:
debug_logger.log_info(f"Cache hit: {filename}")
return filename
try:
file_path.unlink()
except Exception:
pass
# Download file
debug_logger.log_info(f"Downloading file from: {url}")
fingerprint = self._get_request_fingerprint()
proxy_url = await self._resolve_download_proxy(media_type, fingerprint=fingerprint)
headers = self._build_download_headers(media_type, fingerprint=fingerprint)
# Try method 1: curl_cffi with browser impersonation
try:
async with AsyncSession() as session:
response = await session.get(
url,
timeout=60,
proxy=proxy_url,
headers=headers,
impersonate="chrome120",
verify=False
)
if response.status_code == 200 and response.content:
self._write_cached_content(file_path, response.content)
debug_logger.log_info(
f"File cached (curl_cffi): {filename} ({len(response.content)} bytes)"
)
return filename
debug_logger.log_warning(
f"curl_cffi failed with HTTP {response.status_code}, trying wget..."
)
except Exception as e:
debug_logger.log_warning(f"curl_cffi failed: {str(e)}, trying wget...")
# Try method 2: wget command
try:
import subprocess
wget_cmd = [
"wget",
"-q",
"-O", str(file_path),
"--timeout=60",
"--tries=3",
f"--user-agent={headers.get('User-Agent', '')}",
f"--header=Accept: {headers.get('Accept', '*/*')}",
f"--header=Accept-Language: {headers.get('Accept-Language', 'zh-CN,zh;q=0.9,en;q=0.8')}",
f"--header=Connection: {headers.get('Connection', 'keep-alive')}",
f"--header=Referer: {headers.get('Referer', 'https://labs.google/')}",
]
if "sec-ch-ua" in headers:
wget_cmd.append(f"--header=sec-ch-ua: {headers['sec-ch-ua']}")
if "sec-ch-ua-mobile" in headers:
wget_cmd.append(f"--header=sec-ch-ua-mobile: {headers['sec-ch-ua-mobile']}")
if "sec-ch-ua-platform" in headers:
wget_cmd.append(f"--header=sec-ch-ua-platform: {headers['sec-ch-ua-platform']}")
if proxy_url:
env = os.environ.copy()
env["http_proxy"] = proxy_url
env["https_proxy"] = proxy_url
else:
env = None
wget_cmd.append(url)
result = subprocess.run(wget_cmd, capture_output=True, timeout=90, env=env)
if result.returncode == 0 and file_path.exists():
file_size = file_path.stat().st_size
if file_size > 0:
debug_logger.log_info(f"File cached (wget): {filename} ({file_size} bytes)")
return filename
raise Exception("Downloaded file is empty")
error_msg = result.stderr.decode("utf-8", errors="ignore") if result.stderr else "Unknown error"
debug_logger.log_warning(f"wget failed: {error_msg}, trying curl...")
except FileNotFoundError:
debug_logger.log_warning("wget not found, trying curl...")
except Exception as e:
debug_logger.log_warning(f"wget failed: {str(e)}, trying curl...")
# Try method 3: system curl command
try:
import subprocess
curl_cmd = [
"curl",
"-L",
"-s",
"-o", str(file_path),
"--max-time", "60",
"-H", f"Accept: {headers.get('Accept', '*/*')}",
"-H", f"Accept-Language: {headers.get('Accept-Language', 'zh-CN,zh;q=0.9,en;q=0.8')}",
"-H", f"Connection: {headers.get('Connection', 'keep-alive')}",
"-H", f"Referer: {headers.get('Referer', 'https://labs.google/')}",
"-A", headers.get("User-Agent", ""),
]
if "sec-ch-ua" in headers:
curl_cmd.extend(["-H", f"sec-ch-ua: {headers['sec-ch-ua']}"])
if "sec-ch-ua-mobile" in headers:
curl_cmd.extend(["-H", f"sec-ch-ua-mobile: {headers['sec-ch-ua-mobile']}"])
if "sec-ch-ua-platform" in headers:
curl_cmd.extend(["-H", f"sec-ch-ua-platform: {headers['sec-ch-ua-platform']}"])
if proxy_url:
curl_cmd.extend(["-x", proxy_url])
curl_cmd.append(url)
result = subprocess.run(curl_cmd, capture_output=True, timeout=90)
if result.returncode == 0 and file_path.exists():
file_size = file_path.stat().st_size
if file_size > 0:
debug_logger.log_info(f"File cached (curl): {filename} ({file_size} bytes)")
return filename
raise Exception("Downloaded file is empty")
error_msg = result.stderr.decode("utf-8", errors="ignore") if result.stderr else "Unknown error"
raise Exception(f"curl command failed: {error_msg}")
except FileNotFoundError as e:
normalized_error = self._normalize_cache_error(e)
debug_logger.log_error(
error_message=f"Failed to download file: {str(e)}",
status_code=0,
response_text=str(e)
)
raise Exception(normalized_error) from e
except Exception as e:
normalized_error = self._normalize_cache_error(e)
debug_logger.log_error(
error_message=f"Failed to download file: {str(e)}",
status_code=0,
response_text=str(e)
)
raise Exception(normalized_error) from e
async def cache_base64_image(self, base64_data: str, resolution: str = "") -> str:
"""
Cache base64 encoded image data to local file
Args:
base64_data: Base64 encoded image data (without data:image/... prefix)
resolution: Resolution info for filename (e.g., "4K", "2K")
Returns:
Local cache filename
"""
import base64
import uuid
# Generate unique filename
unique_id = hashlib.md5(f"{uuid.uuid4()}{time.time()}".encode()).hexdigest()
suffix = f"_{resolution}" if resolution else ""
filename = f"{unique_id}{suffix}.jpg"
file_path = self.cache_dir / filename
try:
# Decode base64 and save to file
image_data = base64.b64decode(base64_data)
with open(file_path, 'wb') as f:
f.write(image_data)
debug_logger.log_info(f"Base64 image cached: {filename} ({len(image_data)} bytes)")
return filename
except Exception as e:
debug_logger.log_error(
error_message=f"Failed to cache base64 image: {str(e)}",
status_code=0,
response_text=""
)
raise Exception(f"Failed to cache base64 image: {str(e)}")
def get_cache_path(self, filename: str) -> Path:
"""Get full path to cached file"""
return self.cache_dir / filename
def set_timeout(self, timeout: int):
"""Set cache timeout in seconds"""
self.default_timeout = max(0, int(timeout))
debug_logger.log_info(f"Cache timeout updated to {timeout} seconds")
def get_timeout(self) -> int:
"""Get current cache timeout"""
return self.default_timeout
async def clear_all(self):
"""Clear all cached files"""
try:
removed_count = 0
for file_path in self.cache_dir.iterdir():
if file_path.is_file():
try:
file_path.unlink()
removed_count += 1
except Exception:
pass
debug_logger.log_info(f"Cache cleared: removed {removed_count} files")
return removed_count
except Exception as e:
debug_logger.log_error(
error_message=f"Failed to clear cache: {str(e)}",
status_code=0,
response_text=""
)
raise

File diff suppressed because it is too large Load Diff