Merge pull request #133 from aicrossai/main

Feature: Distributed Captcha Extension, VEO Video Extend & Timeout Optimization
This commit is contained in:
Genz
2026-04-30 18:52:11 +08:00
committed by GitHub
20 changed files with 1898 additions and 229 deletions

View File

@@ -1,76 +0,0 @@
[global]
api_key = "han1234"
admin_username = "admin"
admin_password = "admin"
[flow]
labs_base_url = "https://labs.google/fx/api"
api_base_url = "https://aisandbox-pa.googleapis.com/v1"
timeout = 120
max_retries = 4
image_request_timeout = 40
image_timeout_retry_count = 1
image_timeout_retry_delay = 0.8
image_timeout_use_media_proxy_fallback = true
image_prefer_media_proxy = true
image_slot_wait_timeout = 900
image_launch_soft_limit = 25
image_launch_wait_timeout = 900
image_launch_stagger_ms = 250
video_slot_wait_timeout = 480
video_launch_soft_limit = 20
video_launch_wait_timeout = 480
video_launch_stagger_ms = 250
poll_interval = 3.0
max_poll_attempts = 200
[server]
host = "0.0.0.0"
port = 8000
[debug]
enabled = false
log_requests = true
log_responses = true
mask_token = true
[proxy]
proxy_enabled = false
proxy_url = ""
[generation]
image_timeout = 300
video_timeout = 1500
[call_logic]
call_mode = "default" # default=随机轮询策略, polling=顺序轮询策略
[admin]
error_ban_threshold = 3
[cache]
enabled = false
timeout = 7200 # 缓存超时时间(秒), 默认2小时
base_url = "" # 缓存文件访问的基础URL, 留空则使用服务器地址
[captcha]
captcha_method = "personal" # 打码方式: yescaptcha/browser/personal/remote_browser
browser_recaptcha_settle_seconds = 1.0 # 打码完成额外稳态等待,速度优先可设 1.0
browser_launch_background = true # browser 打码默认后台启动(最小化/避免抢占前台)
# 内置浏览器打码 (personal) 配置
# 每个项目(project_id)对应一个常驻标签页,标签页会被复用以提高性能
# 推荐配置每个标签约占用200-300MB内存
# - 2GB内存: 3个标签
# - 4GB内存: 5个标签
# - 8GB内存: 10个标签
# - 16GB内存: 20个标签
# 注意:标签数量取决于不同项目的数量,单个项目的并发请求会复用同一个标签页
personal_max_resident_tabs = 5 # 最大常驻标签页数量
personal_idle_tab_ttl_seconds = 600 # 标签页空闲超时(秒),超时后自动回收
yescaptcha_api_key = "" # YesCaptcha API密钥
yescaptcha_base_url = "https://api.yescaptcha.com"
remote_browser_base_url = "http://127.0.0.1:8060" # 本地 token 池服务地址
remote_browser_api_key = "" # 本地 token 池服务 API Key
remote_browser_timeout = 35 # 远程有头打码请求超时(秒)

View File

@@ -54,8 +54,7 @@ timeout = 7200 # 缓存超时时间(秒), 默认2小时; 设置为0表示不自
base_url = "" # 缓存文件访问的基础URL, 留空则使用服务器地址
[captcha]
captcha_method = "browser" # 打码方式: yescaptcha/browser/personal/remote_browser
browser_launch_background = true # 有头浏览器是否默认后台启动;设为 false 可直接看到窗口
captcha_method = "extension" # 打码方式: extension/yescaptcha/browser/personal/remote_browser
browser_recaptcha_settle_seconds = 3.0 # reload/clr 就绪后的额外稳态等待
browser_count = 1 # browser 模式的有头浏览器实例数量
personal_project_pool_size = 4 # personal 模式下单个 Token 默认维护的项目池数量(仅影响项目轮换,不决定打码标签页数量)
@@ -66,3 +65,5 @@ yescaptcha_base_url = "https://api.yescaptcha.com"
remote_browser_base_url = "" # 远程有头打码服务地址
remote_browser_api_key = "" # 远程有头打码服务 API Key
remote_browser_timeout = 60 # 远程有头打码请求超时(秒)
capsolver_api_key = ""
capsolver_base_url = "https://api.capsolver.com"

242
extension/background.js Normal file
View File

@@ -0,0 +1,242 @@
let ws = null;
let reconnectTimeout = null;
let heartbeatInterval = null;
const DEFAULT_SETTINGS = {
serverUrl: "ws://127.0.0.1:8000/captcha_ws",
apiKey: "",
routeKey: "",
clientLabel: ""
};
function getSettings() {
return new Promise((resolve) => {
chrome.storage.local.get(DEFAULT_SETTINGS, (stored) => {
resolve({
serverUrl: (stored.serverUrl || DEFAULT_SETTINGS.serverUrl).trim(),
apiKey: (stored.apiKey || "").trim(),
routeKey: (stored.routeKey || "").trim(),
clientLabel: (stored.clientLabel || "").trim()
});
});
});
}
function closeSocket() {
if (heartbeatInterval) clearInterval(heartbeatInterval);
heartbeatInterval = null;
if (reconnectTimeout) clearTimeout(reconnectTimeout);
reconnectTimeout = null;
if (ws) {
try {
ws.close();
} catch (e) {
console.log("[Flow2API] Close socket error", e);
}
ws = null;
}
}
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
function waitForTabReady(tabId, timeoutMs = 12000) {
return new Promise((resolve) => {
let settled = false;
const finish = () => {
if (settled) return;
settled = true;
chrome.tabs.onUpdated.removeListener(onUpdated);
clearTimeout(timer);
resolve();
};
const onUpdated = (updatedTabId, changeInfo) => {
if (updatedTabId === tabId && changeInfo.status === "complete") {
finish();
}
};
const timer = setTimeout(finish, timeoutMs);
chrome.tabs.onUpdated.addListener(onUpdated);
chrome.tabs.get(tabId, (tab) => {
if (chrome.runtime.lastError) {
finish();
return;
}
if (tab && tab.status === "complete") {
finish();
}
});
});
}
async function connectWS() {
if (ws && (ws.readyState === WebSocket.OPEN || ws.readyState === WebSocket.CONNECTING)) return;
const settings = await getSettings();
const url = new URL(settings.serverUrl || DEFAULT_SETTINGS.serverUrl);
if (settings.apiKey) {
url.searchParams.set("key", settings.apiKey);
}
if (settings.routeKey) {
url.searchParams.set("route_key", settings.routeKey);
}
if (settings.clientLabel) {
url.searchParams.set("client_label", settings.clientLabel);
}
ws = new WebSocket(url.toString());
ws.onopen = () => {
console.log("[Flow2API] Background connected to WebSocket", url.toString());
ws.send(JSON.stringify({
type: "register",
route_key: settings.routeKey,
client_label: settings.clientLabel
}));
if (heartbeatInterval) clearInterval(heartbeatInterval);
heartbeatInterval = setInterval(() => {
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: "ping" }));
}
}, 20000);
};
let tokenQueue = Promise.resolve();
ws.onmessage = async (event) => {
let data;
try {
data = JSON.parse(event.data);
} catch (e) {
return;
}
if (data.type === "register_ack") {
console.log("[Flow2API] Registered route key:", data.route_key || "(empty)");
return;
}
if (data.type === "get_token") {
tokenQueue = tokenQueue.then(() => handleGetToken(data)).catch(err => {
console.error("[Flow2API] Queue Error:", err);
});
}
};
ws.onclose = () => {
console.log("[Flow2API] WebSocket Closed. Reconnecting in 2s...");
ws = null;
if (heartbeatInterval) clearInterval(heartbeatInterval);
if (reconnectTimeout) clearTimeout(reconnectTimeout);
reconnectTimeout = setTimeout(connectWS, 2000);
};
ws.onerror = (e) => {
console.log("[Flow2API] WebSocket Error", e);
};
}
async function handleGetToken(data) {
let newTabId = null;
try {
console.log("[Flow2API] Auto-opening fresh Google Labs tab to avoid token expiry...");
const newTab = await chrome.tabs.create({ url: "https://labs.google/fx/tools/flow", active: false });
newTabId = newTab.id;
await waitForTabReady(newTabId);
await sleep(1200);
let successResponse = null;
let lastErrorMsg = "No response from tab.";
const scriptTimeoutMs = data.action === "VIDEO_GENERATION" ? 30000 : 20000;
try {
const results = await chrome.scripting.executeScript({
target: { tabId: newTabId },
world: "MAIN",
func: async (action, timeoutMs) => {
return new Promise((resolve, reject) => {
let settled = false;
const finish = (fn, value) => {
if (settled) return;
settled = true;
fn(value);
};
try {
function run() {
grecaptcha.enterprise.ready(function() {
grecaptcha.enterprise.execute("6LdsFiUsAAAAAIjVDZcuLhaHiDn5nnHVXVRQGeMV", { action: action })
.then(token => finish(resolve, token))
.catch(err => finish(reject, err.message || "reCAPTCHA evaluation failed internally"));
});
}
if (typeof grecaptcha !== "undefined" && grecaptcha.enterprise) {
run();
} else {
const s = document.createElement("script");
s.src = "https://www.google.com/recaptcha/enterprise.js?render=6LdsFiUsAAAAAIjVDZcuLhaHiDn5nnHVXVRQGeMV";
s.onload = run;
s.onerror = () => finish(reject, "Failed to load enterprise.js via network");
document.head.appendChild(s);
}
setTimeout(() => finish(reject, "Timeout generating reCAPTCHA locally"), timeoutMs);
} catch (e) {
finish(reject, e.message);
}
});
},
args: [data.action || "IMAGE_GENERATION", scriptTimeoutMs]
});
if (results && results[0] && results[0].result) {
successResponse = { status: "success", token: results[0].result };
}
} catch (e) {
lastErrorMsg = e.message || "Script execution failed";
}
if (successResponse) {
ws.send(JSON.stringify({
req_id: data.req_id,
status: successResponse.status,
token: successResponse.token
}));
} else {
ws.send(JSON.stringify({
req_id: data.req_id,
status: "error",
error: "Extension script failed: " + lastErrorMsg
}));
}
} catch (err) {
ws.send(JSON.stringify({
req_id: data.req_id,
status: "error",
error: err.message
}));
} finally {
if (newTabId) {
try {
await chrome.tabs.remove(newTabId);
console.log("[Flow2API] Closed temporary token tab.");
} catch (e) {
console.log("[Flow2API] Error closing tab:", e);
}
}
}
}
chrome.storage.onChanged.addListener((changes, areaName) => {
if (areaName !== "local") return;
if (changes.routeKey || changes.serverUrl || changes.apiKey || changes.clientLabel) {
console.log("[Flow2API] Extension settings changed, reconnecting WebSocket...");
closeSocket();
connectWS();
}
});
connectWS();

63
extension/content.js Normal file
View File

@@ -0,0 +1,63 @@
console.log("[Flow2API] Captcha Worker injected.");
function getRecaptchaToken(action) {
return new Promise((resolve, reject) => {
const reqId = Date.now() + Math.random().toString();
const script = document.createElement("script");
script.textContent = `
try {
function runCaptcha() {
grecaptcha.enterprise.ready(function() {
grecaptcha.enterprise.execute('6LdsFiUsAAAAAIjVDZcuLhaHiDn5nnHVXVRQGeMV', {action: '${action}'})
.then(token => window.postMessage({type: 'reCAPTCHA_result', reqId: '${reqId}', token: token}, '*'))
.catch(err => window.postMessage({type: 'reCAPTCHA_error', reqId: '${reqId}', error: err.message}, '*'));
});
}
if (typeof grecaptcha !== "undefined" && grecaptcha.enterprise) {
runCaptcha();
} else {
const rScript = document.createElement('script');
rScript.src = "https://www.google.com/recaptcha/enterprise.js?render=6LdsFiUsAAAAAIjVDZcuLhaHiDn5nnHVXVRQGeMV";
rScript.onload = () => { runCaptcha(); };
rScript.onerror = () => { window.postMessage({type: 'reCAPTCHA_error', reqId: '${reqId}', error: 'Failed to load enterprise.js'}, '*'); };
document.head.appendChild(rScript);
}
} catch (e) {
window.postMessage({type: 'reCAPTCHA_error', reqId: '${reqId}', error: e.message}, '*');
}
`;
const listener = (event) => {
if (event.source !== window || !event.data) return;
if (event.data.reqId === reqId) {
window.removeEventListener("message", listener);
script.remove();
if (event.data.type === 'reCAPTCHA_result') {
resolve(event.data.token);
} else {
reject(new Error(event.data.error || "Unknown reCAPTCHA Error"));
}
}
};
window.addEventListener("message", listener);
document.documentElement.appendChild(script);
setTimeout(() => {
window.removeEventListener("message", listener);
script.remove();
reject(new Error("Timeout generating reCAPTCHA"));
}, 15000);
});
}
chrome.runtime.onMessage.addListener((message, sender, sendResponse) => {
if (message.type === "get_token") {
console.log("[Flow2API] Generating token for action: " + message.action);
getRecaptchaToken(message.action)
.then(token => sendResponse({status: "success", token: token}))
.catch(err => sendResponse({status: "error", error: err.message}));
return true;
}
});

29
extension/manifest.json Normal file
View File

@@ -0,0 +1,29 @@
{
"manifest_version": 3,
"name": "Flow2API Captcha Worker",
"version": "1.0.0",
"description": "Generate Google Labs reCAPTCHA Enterprise tokens for Flow2API.",
"permissions": [
"storage",
"tabs",
"scripting"
],
"host_permissions": [
"https://labs.google/*"
],
"background": {
"service_worker": "background.js"
},
"content_scripts": [
{
"matches": [
"https://labs.google/*"
],
"js": [
"content.js"
],
"run_at": "document_idle"
}
],
"options_page": "options.html"
}

100
extension/options.html Normal file
View File

@@ -0,0 +1,100 @@
<!doctype html>
<html lang="zh-CN">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Flow2API Captcha Worker Settings</title>
<style>
body {
margin: 0;
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif;
background: #f7f8fa;
color: #111827;
}
.wrap {
max-width: 720px;
margin: 0 auto;
padding: 24px;
}
.panel {
background: #fff;
border: 1px solid #e5e7eb;
border-radius: 12px;
padding: 20px;
box-shadow: 0 8px 24px rgba(15, 23, 42, 0.06);
}
h1 {
font-size: 20px;
margin: 0 0 8px;
}
p {
color: #4b5563;
line-height: 1.5;
}
label {
display: block;
margin: 16px 0 8px;
font-weight: 600;
}
input {
width: 100%;
box-sizing: border-box;
padding: 10px 12px;
border: 1px solid #d1d5db;
border-radius: 10px;
font-size: 14px;
}
button {
margin-top: 18px;
border: 0;
border-radius: 10px;
background: #111827;
color: #fff;
padding: 10px 16px;
font-size: 14px;
cursor: pointer;
}
.hint {
margin-top: 12px;
font-size: 13px;
color: #6b7280;
}
.status {
margin-top: 12px;
min-height: 20px;
font-size: 13px;
color: #065f46;
}
code {
background: #f3f4f6;
border-radius: 6px;
padding: 1px 6px;
}
</style>
</head>
<body>
<div class="wrap">
<div class="panel">
<h1>Flow2API Captcha Worker</h1>
<p>给当前浏览器实例设置一个独立的路由键Flow2API 后端就能把对应账号的验证码请求固定发到这个实例。</p>
<label for="routeKey">Route Key</label>
<input id="routeKey" type="text" placeholder="例如 9223">
<label for="clientLabel">Client Label</label>
<input id="clientLabel" type="text" placeholder="例如 browser-9223">
<label for="serverUrl">WebSocket URL</label>
<input id="serverUrl" type="text" placeholder="ws://127.0.0.1:8000/captcha_ws">
<label for="apiKey">Flow2API API Key</label>
<input id="apiKey" type="password" placeholder="管理后台里的 API Key">
<button id="saveBtn" type="button">保存</button>
<div class="status" id="status"></div>
<div class="hint">管理台 token 里填写同样的 <code>Route Key</code>,比如 9223 对 9223。后端会使用 API Key 校验 WebSocket 连接。</div>
</div>
</div>
<script src="options.js"></script>
</body>
</html>

73
extension/options.js Normal file
View File

@@ -0,0 +1,73 @@
const DEFAULT_SETTINGS = {
serverUrl: "ws://127.0.0.1:8000/captcha_ws",
apiKey: "",
routeKey: "",
clientLabel: ""
};
const $ = (id) => document.getElementById(id);
function normalizeSettings(values) {
return {
serverUrl: (values.serverUrl || DEFAULT_SETTINGS.serverUrl).trim(),
apiKey: (values.apiKey || "").trim(),
routeKey: (values.routeKey || "").trim(),
clientLabel: (values.clientLabel || "").trim()
};
}
function setStatus(message, isError = false) {
const status = $("status");
status.textContent = message;
status.style.color = isError ? "#b91c1c" : "#065f46";
}
function isValidWsUrl(value) {
try {
const url = new URL(value);
return url.protocol === "ws:" || url.protocol === "wss:";
} catch (e) {
return false;
}
}
function loadSettings() {
chrome.storage.local.get(DEFAULT_SETTINGS, (stored) => {
const settings = normalizeSettings(stored);
$("serverUrl").value = settings.serverUrl;
$("apiKey").value = settings.apiKey;
$("routeKey").value = settings.routeKey;
$("clientLabel").value = settings.clientLabel;
});
}
function saveSettings() {
const settings = normalizeSettings({
serverUrl: $("serverUrl").value,
apiKey: $("apiKey").value,
routeKey: $("routeKey").value,
clientLabel: $("clientLabel").value
});
if (!isValidWsUrl(settings.serverUrl)) {
setStatus("WebSocket URL 必须以 ws:// 或 wss:// 开头。", true);
return;
}
if (!settings.apiKey) {
setStatus("请填写 Flow2API API Key。", true);
return;
}
chrome.storage.local.set(settings, () => {
if (chrome.runtime.lastError) {
setStatus(`保存失败:${chrome.runtime.lastError.message}`, true);
return;
}
setStatus("已保存,后台连接会自动重连。");
});
}
document.addEventListener("DOMContentLoaded", () => {
loadSettings();
$("saveBtn").addEventListener("click", saveSettings);
});

View File

@@ -371,13 +371,23 @@ async def _solve_recaptcha_with_api_service(
create_url = f"{base_url.rstrip('/')}/createTask"
get_url = f"{base_url.rstrip('/')}/getTaskResult"
# Do not use curl_cffi impersonation for captcha API JSON endpoints: some ASGI servers
# (for example FastAPI/Uvicorn) may receive an empty body and return 422.
# 获取代理配置
proxies = None
try:
if proxy_manager:
proxy_cfg = await proxy_manager.get_proxy_config()
if proxy_cfg and proxy_cfg.enabled and proxy_cfg.proxy_url:
proxies = {"http": proxy_cfg.proxy_url, "https": proxy_cfg.proxy_url}
except Exception:
pass
async with AsyncSession() as session:
create_resp = await session.post(
create_url,
json={"clientKey": client_key, "task": task},
timeout=30
impersonate="chrome120",
timeout=30,
proxies=proxies
)
create_json = create_resp.json()
task_id = create_json.get("taskId")
@@ -390,7 +400,9 @@ async def _solve_recaptcha_with_api_service(
poll_resp = await session.post(
get_url,
json={"clientKey": client_key, "taskId": task_id},
timeout=30
impersonate="chrome120",
timeout=30,
proxies=proxies
)
poll_json = poll_resp.json()
if poll_json.get("status") == "ready":
@@ -470,6 +482,7 @@ class AddTokenRequest(BaseModel):
project_name: Optional[str] = None
remark: Optional[str] = None
captcha_proxy_url: Optional[str] = None
extension_route_key: Optional[str] = None
image_enabled: bool = True
video_enabled: bool = True
image_concurrency: int = -1
@@ -482,6 +495,7 @@ class UpdateTokenRequest(BaseModel):
project_name: Optional[str] = None
remark: Optional[str] = None
captcha_proxy_url: Optional[str] = None
extension_route_key: Optional[str] = None
image_enabled: Optional[bool] = None
video_enabled: Optional[bool] = None
image_concurrency: Optional[int] = None
@@ -549,6 +563,7 @@ class ImportTokenItem(BaseModel):
session_token: Optional[str] = None
is_active: bool = True
captcha_proxy_url: Optional[str] = None
extension_route_key: Optional[str] = None
image_enabled: bool = True
video_enabled: bool = True
image_concurrency: int = -1
@@ -679,6 +694,7 @@ async def get_tokens(token: str = Depends(verify_admin_token)):
"current_project_id": row.get("current_project_id"), # 🆕 项目ID
"current_project_name": row.get("current_project_name"), # 🆕 项目名称
"captcha_proxy_url": row.get("captcha_proxy_url") or "",
"extension_route_key": row.get("extension_route_key") or "",
"image_enabled": bool(row.get("image_enabled")),
"video_enabled": bool(row.get("video_enabled")),
"image_concurrency": row.get("image_concurrency"),
@@ -707,6 +723,7 @@ async def add_token(
project_name=request.project_name,
remark=request.remark,
captcha_proxy_url=request.captcha_proxy_url.strip() if request.captcha_proxy_url is not None else None,
extension_route_key=request.extension_route_key.strip() if request.extension_route_key is not None else None,
image_enabled=request.image_enabled,
video_enabled=request.video_enabled,
image_concurrency=request.image_concurrency,
@@ -770,6 +787,7 @@ async def update_token(
project_name=request.project_name,
remark=request.remark,
captcha_proxy_url=request.captcha_proxy_url.strip() if request.captcha_proxy_url is not None else None,
extension_route_key=request.extension_route_key.strip() if request.extension_route_key is not None else None,
image_enabled=request.image_enabled,
video_enabled=request.video_enabled,
image_concurrency=request.image_concurrency,
@@ -973,6 +991,7 @@ async def import_tokens(
at=at,
at_expires=at_expires,
captcha_proxy_url=item.captcha_proxy_url.strip() if item.captcha_proxy_url is not None else None,
extension_route_key=item.extension_route_key.strip() if item.extension_route_key is not None else None,
image_enabled=item.image_enabled,
video_enabled=item.video_enabled,
image_concurrency=item.image_concurrency,
@@ -986,6 +1005,7 @@ async def import_tokens(
existing.at = at
existing.at_expires = at_expires
existing.captcha_proxy_url = item.captcha_proxy_url
existing.extension_route_key = item.extension_route_key
existing.image_enabled = item.image_enabled
existing.video_enabled = item.video_enabled
existing.image_concurrency = item.image_concurrency
@@ -996,6 +1016,7 @@ async def import_tokens(
new_token = await token_manager.add_token(
st=st,
captcha_proxy_url=item.captcha_proxy_url.strip() if item.captcha_proxy_url is not None else None,
extension_route_key=item.extension_route_key.strip() if item.extension_route_key is not None else None,
image_enabled=item.image_enabled,
video_enabled=item.video_enabled,
image_concurrency=item.image_concurrency,
@@ -1669,8 +1690,336 @@ async def test_captcha_score(
_request: Optional[CaptchaScoreTestRequest] = None,
_token: str = Depends(verify_admin_token)
):
"""分数测试已禁用"""
raise HTTPException(status_code=403, detail="已禁用分数测试")
"""使用当前打码方式获取 token并提交到 antcpt 校验分数。"""
req = request or CaptchaScoreTestRequest()
website_url = (req.website_url or "https://antcpt.com/score_detector/").strip()
website_key = (req.website_key or "6LcR_okUAAAAAPYrPe-HK_0RULO1aZM15ENyM-Mf").strip()
action = (req.action or "homepage").strip()
verify_url = (req.verify_url or "https://antcpt.com/score_detector/verify.php").strip()
enterprise = bool(req.enterprise)
started_at = time.time()
captcha_config = await db.get_captcha_config()
captcha_method = (captcha_config.captcha_method or config.captcha_method or "").strip().lower()
browser_proxy_enabled = bool(captcha_config.browser_proxy_enabled)
browser_proxy_url = captcha_config.browser_proxy_url or ""
token_value: Optional[str] = None
fingerprint: Optional[Dict[str, Any]] = None
token_elapsed_ms = 0
verify_elapsed_ms = 0
verify_http_status = None
verify_result: Dict[str, Any] = {}
verify_headers: Dict[str, str] = {}
verify_proxy_used = False
verify_proxy_source = "none"
verify_proxy_url = ""
verify_impersonate = "chrome120"
page_verify_only = captcha_method in {"browser", "personal", "remote_browser"}
verify_mode = "browser_page" if page_verify_only else "server_post"
try:
token_start = time.time()
if captcha_method == "browser":
from ..services.browser_captcha import BrowserCaptchaService
service = await BrowserCaptchaService.get_instance(db)
score_payload, browser_id = await service.get_custom_score(
website_url=website_url,
website_key=website_key,
verify_url=verify_url,
action=action,
enterprise=enterprise
)
if isinstance(score_payload, dict):
token_value = score_payload.get("token")
verify_elapsed_ms = int(score_payload.get("verify_elapsed_ms") or 0)
verify_http_status = score_payload.get("verify_http_status")
verify_result = score_payload.get("verify_result") if isinstance(score_payload.get("verify_result"), dict) else {}
verify_mode = score_payload.get("verify_mode") or "browser_page"
score_token_elapsed = score_payload.get("token_elapsed_ms")
if isinstance(score_token_elapsed, (int, float)):
token_elapsed_ms = int(score_token_elapsed)
if token_value:
fingerprint = await service.get_fingerprint(browser_id)
verify_proxy_used = bool(browser_proxy_enabled and browser_proxy_url)
verify_proxy_source = "captcha_browser_proxy" if verify_proxy_used else "browser_direct"
verify_proxy_url = browser_proxy_url if verify_proxy_used else ""
elif captcha_method == "personal":
from ..services.browser_captcha_personal import BrowserCaptchaService
service = await BrowserCaptchaService.get_instance(db)
score_payload = await service.get_custom_score(
website_url=website_url,
website_key=website_key,
verify_url=verify_url,
action=action,
enterprise=enterprise
)
if isinstance(score_payload, dict):
token_value = score_payload.get("token")
verify_elapsed_ms = int(score_payload.get("verify_elapsed_ms") or 0)
verify_http_status = score_payload.get("verify_http_status")
verify_result = score_payload.get("verify_result") if isinstance(score_payload.get("verify_result"), dict) else {}
verify_mode = score_payload.get("verify_mode") or "browser_page"
score_token_elapsed = score_payload.get("token_elapsed_ms")
if isinstance(score_token_elapsed, (int, float)):
token_elapsed_ms = int(score_token_elapsed)
if token_value:
fingerprint = service.get_last_fingerprint()
verify_proxy_used = bool(browser_proxy_enabled and browser_proxy_url)
verify_proxy_source = "captcha_browser_proxy" if verify_proxy_used else "browser_direct"
verify_proxy_url = browser_proxy_url if verify_proxy_used else ""
elif captcha_method == "remote_browser":
score_payload = await _score_test_with_remote_browser_service(
website_url=website_url,
website_key=website_key,
verify_url=verify_url,
action=action,
enterprise=enterprise,
)
if isinstance(score_payload, dict):
if score_payload.get("success") is False:
raise RuntimeError(score_payload.get("message") or "远程打码分数测试失败")
token_value = score_payload.get("token")
verify_elapsed_ms = int(score_payload.get("verify_elapsed_ms") or 0)
verify_http_status = score_payload.get("verify_http_status")
verify_result = score_payload.get("verify_result") if isinstance(score_payload.get("verify_result"), dict) else {}
verify_mode = score_payload.get("verify_mode") or "remote_browser_page"
score_token_elapsed = score_payload.get("token_elapsed_ms")
if isinstance(score_token_elapsed, (int, float)):
token_elapsed_ms = int(score_token_elapsed)
fingerprint = score_payload.get("fingerprint") if isinstance(score_payload.get("fingerprint"), dict) else None
elif captcha_method in SUPPORTED_API_CAPTCHA_METHODS:
if captcha_method == "capsolver" and "antcpt.com" in website_url:
# CapSolver specifically blocks antcpt.com. Test against labs.google to verify API key config.
token_value = await _solve_recaptcha_with_api_service(
method=captcha_method,
website_url="https://labs.google/",
website_key="6LdsFiUsAAAAAIjVDZcuLhaHiDn5nnHVXVRQGeMV",
action="IMAGE_GENERATION",
enterprise=True
)
if token_value:
if token_elapsed_ms <= 0:
token_elapsed_ms = int((time.time() - token_start) * 1000)
return {
"success": True,
"message": "CapSolver不支持antcpt。已成功用 Google Labs 测试连通性",
"captcha_method": captcha_method,
"website_url": "https://labs.google/",
"website_key": "6LdsFiUsAAAAAIjVDZcuLhaHiDn5nnHVXVRQGeMV",
"action": "IMAGE_GENERATION",
"verify_url": "",
"enterprise": True,
"token_acquired": True,
"token_preview": _mask_token(token_value),
"token_elapsed_ms": token_elapsed_ms,
"verify_elapsed_ms": 0,
"verify_http_status": 200,
"score": 0.9,
"verify_result": {"success": True, "message": "跳过分数校验"},
"verify_request_meta": {},
"browser_proxy_enabled": browser_proxy_enabled,
"browser_proxy_url": browser_proxy_url if browser_proxy_enabled else "",
"fingerprint": fingerprint,
"elapsed_ms": int((time.time() - started_at) * 1000)
}
else:
token_value = await _solve_recaptcha_with_api_service(
method=captcha_method,
website_url=website_url,
website_key=website_key,
action=action,
enterprise=enterprise
)
else:
return {
"success": False,
"message": f"当前打码方式不支持分数测试: {captcha_method}",
"captcha_method": captcha_method,
"website_url": website_url,
"website_key": website_key,
"action": action,
"verify_url": verify_url,
"enterprise": enterprise,
"token_acquired": False,
"elapsed_ms": int((time.time() - started_at) * 1000)
}
if token_elapsed_ms <= 0:
token_elapsed_ms = int((time.time() - token_start) * 1000)
# 远程有头打码的 custom-score 可能由页面内直接完成校验,
# 在部分实现里不会显式回传 token本地按 verify_result 兜底判定。
if captcha_method == "remote_browser" and not token_value and isinstance(verify_result, dict):
if verify_result.get("success") is True:
token_value = verify_result.get("token") or verify_result.get("gRecaptchaResponse") or "__verified_by_remote__"
if not token_value:
return {
"success": False,
"message": "未获取到 reCAPTCHA token",
"captcha_method": captcha_method,
"website_url": website_url,
"website_key": website_key,
"action": action,
"verify_url": verify_url,
"enterprise": enterprise,
"token_acquired": False,
"token_elapsed_ms": token_elapsed_ms,
"browser_proxy_enabled": browser_proxy_enabled,
"browser_proxy_url": browser_proxy_url if browser_proxy_enabled else "",
"fingerprint": fingerprint,
"elapsed_ms": int((time.time() - started_at) * 1000)
}
if verify_mode == "server_post" and not page_verify_only:
verify_start = time.time()
verify_headers = {
"accept": "application/json, text/javascript, */*; q=0.01",
"content-type": "application/json",
"origin": "https://antcpt.com",
"referer": website_url,
"x-requested-with": "XMLHttpRequest",
}
if isinstance(fingerprint, dict):
ua = (fingerprint.get("user_agent") or "").strip()
lang = (fingerprint.get("accept_language") or "").strip()
sec_ch_ua = (fingerprint.get("sec_ch_ua") or "").strip()
sec_ch_ua_mobile = (fingerprint.get("sec_ch_ua_mobile") or "").strip()
sec_ch_ua_platform = (fingerprint.get("sec_ch_ua_platform") or "").strip()
if ua:
verify_headers["user-agent"] = ua
if lang:
verify_headers["accept-language"] = lang if "," in lang else f"{lang},zh;q=0.9"
if sec_ch_ua:
verify_headers["sec-ch-ua"] = sec_ch_ua
if sec_ch_ua_mobile:
verify_headers["sec-ch-ua-mobile"] = sec_ch_ua_mobile
if sec_ch_ua_platform:
verify_headers["sec-ch-ua-platform"] = sec_ch_ua_platform
if verify_headers.get("user-agent"):
for header_name, header_value in _guess_client_hints_from_user_agent(
verify_headers.get("user-agent", "")
).items():
if header_value and not verify_headers.get(header_name):
verify_headers[header_name] = header_value
verify_impersonate = _guess_impersonate_from_user_agent(verify_headers.get("user-agent", ""))
verify_proxies, verify_proxy_used, verify_proxy_source, verify_proxy_url = (
await _resolve_score_test_verify_proxy(
captcha_method=captcha_method,
browser_proxy_enabled=browser_proxy_enabled,
browser_proxy_url=browser_proxy_url
)
)
async with AsyncSession() as session:
verify_resp = await session.post(
verify_url,
json={"g-recaptcha-response": token_value},
headers=verify_headers,
proxies=verify_proxies,
impersonate=verify_impersonate,
timeout=30
)
verify_elapsed_ms = int((time.time() - verify_start) * 1000)
verify_http_status = verify_resp.status_code
try:
verify_result = verify_resp.json()
except Exception:
verify_result = {"raw": verify_resp.text}
else:
verify_headers = {
"origin": "https://antcpt.com",
"referer": website_url,
"x-requested-with": "XMLHttpRequest",
}
if isinstance(fingerprint, dict):
verify_headers.update({
"user-agent": fingerprint.get("user_agent", ""),
"accept-language": fingerprint.get("accept_language", ""),
"sec-ch-ua": fingerprint.get("sec_ch_ua", ""),
"sec-ch-ua-mobile": fingerprint.get("sec_ch_ua_mobile", ""),
"sec-ch-ua-platform": fingerprint.get("sec_ch_ua_platform", ""),
})
verify_success = bool(verify_result.get("success")) if isinstance(verify_result, dict) else False
score_value = verify_result.get("score") if isinstance(verify_result, dict) else None
return {
"success": verify_success,
"message": "分数校验成功" if verify_success else "分数校验未通过",
"captcha_method": captcha_method,
"website_url": website_url,
"website_key": website_key,
"action": action,
"verify_url": verify_url,
"enterprise": enterprise,
"token_acquired": True,
"token_preview": _mask_token(token_value),
"token_elapsed_ms": token_elapsed_ms,
"verify_elapsed_ms": verify_elapsed_ms,
"verify_http_status": verify_http_status,
"score": score_value,
"verify_result": verify_result,
"verify_request_meta": {
"mode": verify_mode,
"proxy_used": verify_proxy_used,
"user_agent": verify_headers.get("user-agent", ""),
"accept_language": verify_headers.get("accept-language", ""),
"sec_ch_ua": verify_headers.get("sec-ch-ua", ""),
"sec_ch_ua_mobile": verify_headers.get("sec-ch-ua-mobile", ""),
"sec_ch_ua_platform": verify_headers.get("sec-ch-ua-platform", ""),
"origin": verify_headers.get("origin", ""),
"referer": verify_headers.get("referer", ""),
"x_requested_with": verify_headers.get("x-requested-with", ""),
"proxy_source": verify_proxy_source,
"proxy_url": verify_proxy_url,
"impersonate": verify_impersonate,
},
"browser_proxy_enabled": browser_proxy_enabled,
"browser_proxy_url": browser_proxy_url if browser_proxy_enabled else "",
"fingerprint": fingerprint,
"elapsed_ms": int((time.time() - started_at) * 1000)
}
except Exception as e:
return {
"success": False,
"message": f"分数测试失败: {str(e)}",
"captcha_method": captcha_method,
"website_url": website_url,
"website_key": website_key,
"action": action,
"verify_url": verify_url,
"enterprise": enterprise,
"token_acquired": bool(token_value),
"token_preview": _mask_token(token_value),
"token_elapsed_ms": token_elapsed_ms,
"verify_elapsed_ms": verify_elapsed_ms,
"verify_http_status": verify_http_status,
"verify_result": verify_result,
"verify_request_meta": {
"mode": verify_mode,
"proxy_used": verify_proxy_used,
"user_agent": verify_headers.get("user-agent", ""),
"accept_language": verify_headers.get("accept-language", ""),
"sec_ch_ua": verify_headers.get("sec-ch-ua", ""),
"sec_ch_ua_mobile": verify_headers.get("sec-ch-ua-mobile", ""),
"sec_ch_ua_platform": verify_headers.get("sec-ch-ua-platform", ""),
"origin": verify_headers.get("origin", ""),
"referer": verify_headers.get("referer", ""),
"x_requested_with": verify_headers.get("x-requested-with", ""),
"proxy_source": verify_proxy_source,
"proxy_url": verify_proxy_url,
"impersonate": verify_impersonate,
},
"browser_proxy_enabled": browser_proxy_enabled,
"browser_proxy_url": browser_proxy_url if browser_proxy_enabled else "",
"fingerprint": fingerprint,
"elapsed_ms": int((time.time() - started_at) * 1000)
}
# ========== Plugin Configuration Endpoints ==========

View File

@@ -9,10 +9,10 @@ import re
from urllib.parse import urlparse
from curl_cffi.requests import AsyncSession
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from fastapi import APIRouter, Depends, HTTPException, Query, Request, WebSocket, WebSocketDisconnect
from fastapi.responses import JSONResponse, StreamingResponse
from ..core.auth import verify_api_key_flexible
from ..core.auth import AuthManager, verify_api_key_flexible
from ..core.logger import debug_logger
from ..core.model_resolver import get_base_model_aliases, resolve_model_name
from ..core.models import (
@@ -22,6 +22,7 @@ from ..core.models import (
GeminiGenerateContentRequest,
)
from ..services.generation_handler import MODEL_CONFIG, GenerationHandler
from ..services.browser_captcha_extension import ExtensionCaptchaService
router = APIRouter()
@@ -78,6 +79,7 @@ class NormalizedGenerationRequest:
prompt: str
images: List[bytes]
messages: Optional[List[ChatMessage]] = None
video_media_id: Optional[str] = None
def set_generation_handler(handler: GenerationHandler):
@@ -286,11 +288,18 @@ def _sanitize_media_prompt(prompt: str) -> str:
async def _extract_prompt_and_images_from_openai_messages(
messages: List[ChatMessage],
) -> tuple[str, List[bytes]]:
) -> tuple[str, List[bytes], Optional[str]]:
"""Extract prompt, images, and optional video_media_id from messages.
Returns:
(prompt, images, video_media_id)
video_media_id is set when an image_url starts with "extend://"
"""
last_message = messages[-1]
content = last_message.content
prompt_parts: List[str] = []
images: List[bytes] = []
video_media_id: Optional[str] = None
if isinstance(content, str):
prompt_parts.append(content)
@@ -303,10 +312,14 @@ async def _extract_prompt_and_images_from_openai_messages(
prompt_parts.append(text)
elif item_type == "image_url":
image_url = item.get("image_url", {}).get("url", "")
images.append(await _load_image_bytes_from_uri(image_url))
# extend://MEDIA_ID 用于视频续写
if image_url.startswith("extend://"):
video_media_id = image_url[len("extend://"):]
else:
images.append(await _load_image_bytes_from_uri(image_url))
prompt = "\n".join(part for part in prompt_parts if part).strip()
return prompt, images
return prompt, images, video_media_id
async def _append_openai_reference_images(
@@ -411,7 +424,7 @@ async def _normalize_openai_request(
request: ChatCompletionRequest,
) -> NormalizedGenerationRequest:
if request.messages:
prompt, images = await _extract_prompt_and_images_from_openai_messages(
prompt, images, video_media_id = await _extract_prompt_and_images_from_openai_messages(
request.messages
)
if request.image and not images:
@@ -423,6 +436,7 @@ async def _normalize_openai_request(
prompt=prompt,
images=images,
messages=request.messages,
video_media_id=video_media_id,
)
if request.contents:
@@ -472,6 +486,7 @@ async def _collect_non_stream_result(
prompt: str,
images: List[bytes],
base_url_override: Optional[str] = None,
video_media_id: Optional[str] = None,
) -> str:
handler = _ensure_generation_handler()
result = None
@@ -481,6 +496,7 @@ async def _collect_non_stream_result(
images=images if images else None,
stream=False,
base_url_override=base_url_override,
video_media_id=video_media_id,
):
result = chunk
@@ -709,6 +725,7 @@ async def _iterate_openai_stream(
images=normalized.images if normalized.images else None,
stream=True,
base_url_override=base_url_override,
video_media_id=normalized.video_media_id,
):
if chunk.startswith("data: "):
yield chunk
@@ -732,6 +749,7 @@ async def _iterate_gemini_stream(
images=normalized.images if normalized.images else None,
stream=True,
base_url_override=base_url_override,
video_media_id=normalized.video_media_id,
):
if chunk.startswith("data: "):
payload_text = chunk[6:].strip()
@@ -854,14 +872,13 @@ async def create_chat_completion(
},
)
payload = _enrich_payload_with_direct_url(
_parse_handler_result(
await _collect_non_stream_result(
normalized.model,
normalized.prompt,
normalized.images,
request_base_url,
)
payload = _parse_handler_result(
await _collect_non_stream_result(
normalized.model,
normalized.prompt,
normalized.images,
base_url_override=request_base_url,
video_media_id=normalized.video_media_id,
)
)
return _build_openai_json_response(payload)
@@ -894,7 +911,8 @@ async def generate_content(
normalized.model,
normalized.prompt,
normalized.images,
request_base_url,
base_url_override=request_base_url,
video_media_id=normalized.video_media_id,
)
)
)
@@ -953,3 +971,32 @@ async def stream_generate_content(
status_code=500,
content=_build_gemini_error_payload(500, str(exc)),
)
@router.websocket("/captcha_ws")
async def captcha_websocket_endpoint(websocket: WebSocket):
from ..core.logger import debug_logger
api_key = (
websocket.query_params.get("key")
or websocket.query_params.get("api_key")
or websocket.headers.get("x-goog-api-key")
or ""
).strip()
authorization = (websocket.headers.get("authorization") or "").strip()
if authorization.lower().startswith("bearer "):
api_key = authorization[7:].strip()
if not api_key or not AuthManager.verify_api_key(api_key):
await websocket.close(code=1008)
return
service = await ExtensionCaptchaService.get_instance()
await service.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await service.handle_message(websocket, data)
except WebSocketDisconnect:
service.disconnect(websocket)
except Exception as e:
debug_logger.log_error(f"WebSocket error: {e}")
service.disconnect(websocket)

View File

@@ -12,8 +12,11 @@ class Config:
self._admin_password: Optional[str] = None
def _load_config(self) -> Dict[str, Any]:
"""Load configuration from setting.toml"""
config_path = Path(__file__).parent.parent.parent / "config" / "setting.toml"
"""Load configuration from setting.toml, falling back to the example file."""
config_dir = Path(__file__).parent.parent.parent / "config"
config_path = config_dir / "setting.toml"
if not config_path.exists():
config_path = config_dir / "setting_example.toml"
with open(config_path, "rb") as f:
return tomli.load(f)

View File

@@ -408,6 +408,7 @@ class Database:
("image_concurrency", "INTEGER DEFAULT -1"),
("video_concurrency", "INTEGER DEFAULT -1"),
("captcha_proxy_url", "TEXT"), # token级打码代理
("extension_route_key", "TEXT"), # extension 模式路由键
("ban_reason", "TEXT"), # 禁用原因
("banned_at", "TIMESTAMP"), # 禁用时间
]
@@ -567,6 +568,7 @@ class Database:
image_concurrency INTEGER DEFAULT -1,
video_concurrency INTEGER DEFAULT -1,
captcha_proxy_url TEXT,
extension_route_key TEXT,
ban_reason TEXT,
banned_at TIMESTAMP
)
@@ -845,13 +847,15 @@ class Database:
cursor = await db.execute("""
INSERT INTO tokens (st, at, at_expires, email, name, remark, is_active,
credits, user_paygate_tier, current_project_id, current_project_name,
image_enabled, video_enabled, image_concurrency, video_concurrency, captcha_proxy_url)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
image_enabled, video_enabled, image_concurrency, video_concurrency,
captcha_proxy_url, extension_route_key)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (token.st, token.at, token.at_expires, token.email, token.name, token.remark,
token.is_active, token.credits, token.user_paygate_tier,
token.current_project_id, token.current_project_name,
token.image_enabled, token.video_enabled,
token.image_concurrency, token.video_concurrency, token.captcha_proxy_url))
token.image_concurrency, token.video_concurrency,
token.captcha_proxy_url, token.extension_route_key))
await db.commit()
token_id = cursor.lastrowid

View File

@@ -184,6 +184,11 @@ VIDEO_BASE_MODELS = {
"landscape": "veo_3_1_r2v_fast_ultra_relaxed",
"portrait": "veo_3_1_r2v_fast_portrait_ultra_relaxed",
},
# Extend models (视频续写)
"veo_3_1_extend": {
"landscape": "veo_3_1_extend",
"portrait": "veo_3_1_extend_portrait",
},
}

View File

@@ -42,6 +42,7 @@ class Token(BaseModel):
# 打码代理token 级,可覆盖全局浏览器打码代理)
captcha_proxy_url: Optional[str] = None
extension_route_key: Optional[str] = None
# 429禁用相关
ban_reason: Optional[str] = None # 禁用原因: "429_rate_limit" 或 None

View File

@@ -0,0 +1,214 @@
import asyncio
import json
import time
import uuid
from dataclasses import dataclass, field
from typing import Any, Dict, Optional
from fastapi import WebSocket
from ..core.logger import debug_logger
@dataclass
class ExtensionConnection:
websocket: WebSocket
route_key: str = ""
client_label: str = ""
connected_at: float = field(default_factory=time.time)
class ExtensionCaptchaService:
_instance: Optional["ExtensionCaptchaService"] = None
_lock = asyncio.Lock()
def __init__(self, db=None):
self.db = db
self.active_connections: list[ExtensionConnection] = []
self.pending_requests: dict[str, tuple[asyncio.Future, WebSocket]] = {}
@classmethod
async def get_instance(cls, db=None) -> "ExtensionCaptchaService":
if cls._instance is None:
async with cls._lock:
if cls._instance is None:
cls._instance = cls(db=db)
elif db is not None and cls._instance.db is None:
cls._instance.db = db
return cls._instance
async def connect(self, websocket: WebSocket):
await websocket.accept()
conn = ExtensionConnection(
websocket=websocket,
route_key=(websocket.query_params.get("route_key") or "").strip(),
client_label=(websocket.query_params.get("client_label") or "").strip(),
)
self.active_connections.append(conn)
debug_logger.log_info(
f"[Extension Captcha] Client connected. Total: {len(self.active_connections)}, "
f"route_key={conn.route_key or '-'}, label={conn.client_label or '-'}"
)
def disconnect(self, websocket: WebSocket):
for conn in list(self.active_connections):
if conn.websocket is websocket:
self.active_connections.remove(conn)
debug_logger.log_info(
f"[Extension Captcha] Client disconnected. Total: {len(self.active_connections)}, "
f"route_key={conn.route_key or '-'}, label={conn.client_label or '-'}"
)
return
def _find_connection(self, websocket: WebSocket) -> Optional[ExtensionConnection]:
for conn in self.active_connections:
if conn.websocket is websocket:
return conn
return None
def _select_connection(self, route_key: str) -> Optional[ExtensionConnection]:
normalized_key = (route_key or "").strip()
if normalized_key:
for conn in self.active_connections:
if conn.route_key == normalized_key:
return conn
return None
# Empty token routes are only allowed to use an empty extension route.
# A keyed route such as "9223" belongs to a specific browser/account
# and must never be borrowed by another token just because it is the
# only extension online.
for conn in self.active_connections:
if not conn.route_key:
return conn
return None
def _describe_routes(self) -> str:
labels = []
for conn in self.active_connections:
label = conn.route_key or "(empty)"
if conn.client_label:
label = f"{label}:{conn.client_label}"
labels.append(label)
return ", ".join(labels)
def describe_routes(self) -> str:
return self._describe_routes()
async def _send_ack(self, websocket: WebSocket, payload: Dict[str, Any]):
try:
await websocket.send_text(json.dumps(payload))
except Exception:
pass
async def _resolve_route_key(self, token_id: Optional[int]) -> str:
if not token_id or not self.db:
return ""
try:
token = await self.db.get_token(token_id)
if token and token.extension_route_key:
return token.extension_route_key.strip()
except Exception as e:
debug_logger.log_warning(f"[Extension Captcha] Failed to resolve route key for token {token_id}: {e}")
return ""
def _has_connection_for_route_key(self, route_key: str) -> bool:
return self._select_connection(route_key) is not None
async def has_connection_for_token(self, token_id: Optional[int]) -> tuple[bool, str]:
route_key = await self._resolve_route_key(token_id)
return self._has_connection_for_route_key(route_key), route_key
async def handle_message(self, websocket: WebSocket, data: str):
try:
payload = json.loads(data)
message_type = payload.get("type")
if message_type == "register":
conn = self._find_connection(websocket)
if conn:
conn.route_key = (payload.get("route_key") or conn.route_key or "").strip()
conn.client_label = (payload.get("client_label") or conn.client_label or "").strip()
debug_logger.log_info(
f"[Extension Captcha] Client registered route_key={conn.route_key or '-'}, "
f"label={conn.client_label or '-'}"
)
await self._send_ack(
websocket,
{
"type": "register_ack",
"route_key": conn.route_key,
"client_label": conn.client_label,
},
)
return
req_id = payload.get("req_id")
if req_id and req_id in self.pending_requests:
future, owner_websocket = self.pending_requests[req_id]
if websocket is not owner_websocket:
debug_logger.log_warning(f"[Extension Captcha] Ignoring response from non-owner connection: {req_id}")
return
if not future.done():
future.set_result(payload)
except Exception as e:
debug_logger.log_error(f"[Extension Captcha] Error handling message: {e}")
async def get_token(
self,
project_id: str,
action: str = "IMAGE_GENERATION",
timeout: int = 20,
token_id: Optional[int] = None,
) -> Optional[str]:
if not self.active_connections:
debug_logger.log_warning("[Extension Captcha] No active extension connections available.")
raise RuntimeError("Chrome Extension not connected or Google Labs tab not open.")
route_key = await self._resolve_route_key(token_id)
conn = self._select_connection(route_key)
if conn is None:
available = self._describe_routes() or "none"
raise RuntimeError(
f"No Chrome Extension connection matches token_id={token_id} route_key='{route_key}'. "
f"Available route keys: {available}"
)
req_id = f"req_{uuid.uuid4().hex}"
future = asyncio.get_running_loop().create_future()
self.pending_requests[req_id] = (future, conn.websocket)
request_data = {
"type": "get_token",
"req_id": req_id,
"action": action,
"project_id": project_id,
"route_key": route_key,
}
try:
debug_logger.log_info(
f"[Extension Captcha] Dispatching token request via route_key={route_key or '-'}, "
f"label={conn.client_label or '-'}, project_id={project_id}, action={action}"
)
await conn.websocket.send_text(json.dumps(request_data))
result = await asyncio.wait_for(future, timeout=timeout)
if result.get("status") == "success":
return result.get("token")
error_msg = result.get("error")
debug_logger.log_error(f"[Extension Captcha] Error from extension: {error_msg}")
return None
except asyncio.TimeoutError:
debug_logger.log_error(f"[Extension Captcha] Timeout waiting for token (req_id: {req_id})")
return None
except Exception as e:
debug_logger.log_error(f"[Extension Captcha] Communication error: {e}")
return None
finally:
self.pending_requests.pop(req_id, None)
async def report_flow_error(self, project_id: str, error_reason: str, error_message: str = ""):
_ = project_id, error_message
debug_logger.log_warning(f"[Extension Captcha] Flow error reported (ignoring): {error_reason}")

View File

@@ -39,19 +39,18 @@ class FlowClient:
)
self._remote_browser_prefill_last_sent: Dict[str, float] = {}
# Default "real browser" headers (Android Chrome style) to reduce upstream 4xx/5xx instability.
# Default "real browser" headers (macOS Chrome Desktop) to reduce upstream 4xx/5xx instability.
# These will be applied as defaults (won't override caller-provided headers).
# NOTE: Must match the UA platform (macOS) generated by _generate_user_agent.
self._default_client_headers = {
"sec-ch-ua-mobile": "?1",
"sec-ch-ua-platform": "\"Android\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"macOS\"",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "cross-site",
"x-browser-channel": "stable",
"x-browser-copyright": "Copyright 2026 Google LLC. All Rights reserved.",
"x-browser-validation": "UujAs0GAwdnCJ9nvrswZ+O+oco0=",
"x-browser-year": "2026",
"x-client-data": "CJS2yQEIpLbJAQipncoBCNj9ygEIlKHLAQiFoM0BGP6lzwE="
}
# 发车策略改为“请求到就发”:
# 不在 flow2api 本地对提交做批次整形或排队,避免把同批请求打成阶梯。
@@ -78,50 +77,10 @@ class FlowClient:
seed = int(hashlib.md5(account_id.encode()).hexdigest()[:8], 16)
rng = random.Random(seed)
# Chrome 版本池
chrome_versions = ["130.0.0.0", "131.0.0.0", "132.0.0.0", "129.0.0.0"]
# Firefox 版本池
firefox_versions = ["133.0", "132.0", "131.0", "134.0"]
# Safari 版本池
safari_versions = ["18.2", "18.1", "18.0", "17.6"]
# Edge 版本池
edge_versions = ["130.0.0.0", "131.0.0.0", "132.0.0.0"]
# 操作系统配置
os_configs = [
# Windows
{
"platform": "Windows NT 10.0; Win64; x64",
"browsers": [
lambda r: f"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{r.choice(chrome_versions)} Safari/537.36",
lambda r: f"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:{r.choice(firefox_versions).split('.')[0]}.0) Gecko/20100101 Firefox/{r.choice(firefox_versions)}",
lambda r: f"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{r.choice(chrome_versions)} Safari/537.36 Edg/{r.choice(edge_versions)}",
]
},
# macOS
{
"platform": "Macintosh; Intel Mac OS X 10_15_7",
"browsers": [
lambda r: f"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{r.choice(chrome_versions)} Safari/537.36",
lambda r: f"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/{r.choice(safari_versions)} Safari/605.1.15",
lambda r: f"Mozilla/5.0 (Macintosh; Intel Mac OS X 14.{r.randint(0, 7)}; rv:{r.choice(firefox_versions).split('.')[0]}.0) Gecko/20100101 Firefox/{r.choice(firefox_versions)}",
]
},
# Linux
{
"platform": "X11; Linux x86_64",
"browsers": [
lambda r: f"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{r.choice(chrome_versions)} Safari/537.36",
lambda r: f"Mozilla/5.0 (X11; Linux x86_64; rv:{r.choice(firefox_versions).split('.')[0]}.0) Gecko/20100101 Firefox/{r.choice(firefox_versions)}",
lambda r: f"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:{r.choice(firefox_versions).split('.')[0]}.0) Gecko/20100101 Firefox/{r.choice(firefox_versions)}",
]
}
]
# 使用固定种子随机选择操作系统和浏览器
os_config = rng.choice(os_configs)
browser_generator = rng.choice(os_config["browsers"])
user_agent = browser_generator(rng)
# Chrome 版本池 - 匹配真实 Mac mini Chrome 147 环境
chrome_versions = ["147.0.7727.56", "146.0.7688.92", "145.0.7649.100"]
ch_version = rng.choice(chrome_versions)
user_agent = f"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{ch_version} Safari/537.36"
# 缓存结果
self._user_agent_cache[account_id] = user_agent
@@ -155,7 +114,9 @@ class FlowClient:
at_token: Optional[str] = None,
timeout: Optional[int] = None,
use_media_proxy: bool = False,
respect_fingerprint_proxy: bool = True
respect_fingerprint_proxy: bool = True,
force_no_proxy: bool = False,
allow_urllib_fallback: bool = True
) -> Dict[str, Any]:
"""统一HTTP请求处理
@@ -171,22 +132,24 @@ class FlowClient:
timeout: 自定义超时时间(秒),不传则使用默认值
use_media_proxy: 是否使用图片上传/下载代理
respect_fingerprint_proxy: 是否优先使用打码浏览器指纹里的代理
allow_urllib_fallback: curl_cffi 网络失败时是否允许 urllib 二次兜底
"""
fingerprint = self._request_fingerprint_ctx.get()
proxy_url = None
if self.proxy_manager:
if use_media_proxy and hasattr(self.proxy_manager, "get_media_proxy_url"):
proxy_url = await self.proxy_manager.get_media_proxy_url()
elif hasattr(self.proxy_manager, "get_request_proxy_url"):
proxy_url = await self.proxy_manager.get_request_proxy_url()
else:
proxy_url = await self.proxy_manager.get_proxy_url()
if not force_no_proxy:
if self.proxy_manager:
if use_media_proxy and hasattr(self.proxy_manager, "get_media_proxy_url"):
proxy_url = await self.proxy_manager.get_media_proxy_url()
elif hasattr(self.proxy_manager, "get_request_proxy_url"):
proxy_url = await self.proxy_manager.get_request_proxy_url()
else:
proxy_url = await self.proxy_manager.get_proxy_url()
if respect_fingerprint_proxy and isinstance(fingerprint, dict) and "proxy_url" in fingerprint:
proxy_url = fingerprint.get("proxy_url")
if proxy_url == "":
proxy_url = None
if respect_fingerprint_proxy and isinstance(fingerprint, dict) and "proxy_url" in fingerprint:
proxy_url = fingerprint.get("proxy_url")
if proxy_url == "":
proxy_url = None
request_timeout = timeout or self.timeout
if headers is None:
@@ -234,6 +197,22 @@ class FlowClient:
for key, value in self._default_client_headers.items():
headers.setdefault(key, value)
# Dynamic fix for sec-ch-ua headers when fingerprint is missing to avoid UA/Platform mismatch
if not isinstance(fingerprint, dict) or not fingerprint.get("sec_ch_ua_platform"):
ua_lower = headers.get("User-Agent", "").lower()
if "android" in ua_lower:
headers["sec-ch-ua-platform"] = "\"Android\""
headers["sec-ch-ua-mobile"] = "?1"
elif "mac" in ua_lower:
headers["sec-ch-ua-platform"] = "\"macOS\""
headers["sec-ch-ua-mobile"] = "?0"
elif "linux" in ua_lower or "x11" in ua_lower:
headers["sec-ch-ua-platform"] = "\"Linux\""
headers["sec-ch-ua-mobile"] = "?0"
else:
headers["sec-ch-ua-platform"] = "\"Windows\""
headers["sec-ch-ua-mobile"] = "?0"
# Log request
if config.debug_enabled:
if isinstance(fingerprint, dict):
@@ -252,14 +231,14 @@ class FlowClient:
start_time = time.time()
try:
async with AsyncSession() as session:
async with AsyncSession(trust_env=False) as session:
if method.upper() == "GET":
response = await session.get(
url,
headers=headers,
proxy=proxy_url,
timeout=request_timeout,
impersonate="chrome110"
impersonate="chrome124"
)
else: # POST
response = await session.post(
@@ -268,7 +247,7 @@ class FlowClient:
json=json_data,
proxy=proxy_url,
timeout=request_timeout,
impersonate="chrome110"
impersonate="chrome124"
)
duration_ms = (time.time() - start_time) * 1000
@@ -322,7 +301,7 @@ class FlowClient:
debug_logger.log_error(f"[API FAILED] Request Body: {json_data}")
debug_logger.log_error(f"[API FAILED] Exception: {error_msg}")
if self._should_fallback_to_urllib(error_msg):
if allow_urllib_fallback and self._should_fallback_to_urllib(error_msg):
debug_logger.log_warning(
f"[HTTP FALLBACK] curl_cffi 请求失败,回退 urllib: {method.upper()} {url}"
)
@@ -436,6 +415,19 @@ class FlowClient:
"operation timed out",
])
def _is_proxy_connection_error(self, error: Exception) -> bool:
"""识别本地/上游代理不可用导致的连接失败。"""
error_lower = str(error).lower()
return any(keyword in error_lower for keyword in [
"failed to connect to 127.0.0.1 port",
"failed to connect to localhost port",
"proxyerror",
"proxy error",
"failed to connect to proxy",
"couldn't connect to server",
"curl: (7)",
])
def _is_retryable_network_error(self, error_str: str) -> bool:
"""识别可重试的 TLS/连接类网络错误。"""
error_lower = (error_str or "").lower()
@@ -449,6 +441,10 @@ class FlowClient:
"connection reset",
"connection aborted",
"connection was reset",
"connection timed out",
"curl: (28)",
"timed out",
"timeout",
"unexpected eof",
"empty reply from server",
"recv failure",
@@ -462,6 +458,38 @@ class FlowClient:
"""控制轻量控制面请求的超时,避免认证/项目接口长时间挂起。"""
return max(5, min(int(self.timeout or 0) or 120, 10))
def _get_video_submit_timeout(self) -> int:
"""视频提交接口应快速返回 operation避免单次网络挂死拖满整条链路。"""
return max(30, min(int(self.timeout or 0) or 120, 75))
def _get_video_poll_timeout(self) -> int:
"""视频状态查询是轻量轮询,请求超时不应超过下一轮轮询太久。"""
return max(10, min(int(self.timeout or 0) or 120, 45))
async def _make_video_api_request(
self,
url: str,
json_data: Dict[str, Any],
at: str,
timeout: int,
) -> Dict[str, Any]:
"""视频 API 加硬截止,避免 curl_cffi 底层偶发卡住导致整条请求悬挂。"""
try:
return await asyncio.wait_for(
self._make_request(
method="POST",
url=url,
json_data=json_data,
use_at=True,
at_token=at,
timeout=timeout,
allow_urllib_fallback=False
),
timeout=timeout + 5
)
except asyncio.TimeoutError as exc:
raise Exception(f"Flow video API request timed out after {timeout}s") from exc
async def _acquire_image_launch_gate(
self,
token_id: Optional[int],
@@ -603,14 +631,29 @@ class FlowClient:
}
"""
url = f"{self.labs_base_url}/auth/session"
result = await self._make_request(
method="GET",
url=url,
use_st=True,
st_token=st,
timeout=self._get_control_plane_timeout(),
)
return result
try:
return await self._make_request(
method="GET",
url=url,
use_st=True,
st_token=st,
timeout=self._get_control_plane_timeout(),
)
except Exception as e:
if not self._is_proxy_connection_error(e):
raise
debug_logger.log_warning(
f"[AUTH] ST->AT failed via configured proxy, retrying direct connection: {e}"
)
return await self._make_request(
method="GET",
url=url,
use_st=True,
st_token=st,
timeout=self._get_control_plane_timeout(),
force_no_proxy=True,
)
# ========== 项目管理 (使用ST) ==========
@@ -842,6 +885,19 @@ class FlowClient:
max_retries = config.flow_max_retries
last_error: Optional[Exception] = None
captcha_method = getattr(config, "captcha_method", "personal")
if captcha_method == "personal":
try:
from .browser_captcha_personal import BrowserCaptchaService
service = await BrowserCaptchaService.get_instance(self.db)
fingerprint = service.get_last_fingerprint()
if not fingerprint:
await service.get_token(project_id, "uploadUserImage")
fingerprint = service.get_last_fingerprint()
self._set_request_fingerprint(fingerprint)
except Exception as e:
debug_logger.log_error(f"[UPLOAD] Failed to pre-fetch fingerprint: {e}")
for retry_attempt in range(max_retries):
try:
new_result = await self._make_request(
@@ -1295,12 +1351,11 @@ class FlowClient:
json_data["useV2ModelConfig"] = True
try:
result = await self._make_request(
method="POST",
result = await self._make_video_api_request(
url=url,
json_data=json_data,
use_at=True,
at_token=at
at=at,
timeout=self._get_video_submit_timeout()
)
return result
except Exception as e:
@@ -1425,12 +1480,11 @@ class FlowClient:
}
try:
result = await self._make_request(
method="POST",
result = await self._make_video_api_request(
url=url,
json_data=json_data,
use_at=True,
at_token=at
at=at,
timeout=self._get_video_submit_timeout()
)
return result
except Exception as e:
@@ -1558,12 +1612,11 @@ class FlowClient:
json_data["useV2ModelConfig"] = True
try:
result = await self._make_request(
method="POST",
result = await self._make_video_api_request(
url=url,
json_data=json_data,
use_at=True,
at_token=at
at=at,
timeout=self._get_video_submit_timeout()
)
return result
except Exception as e:
@@ -1687,12 +1740,11 @@ class FlowClient:
json_data["useV2ModelConfig"] = True
try:
result = await self._make_request(
method="POST",
result = await self._make_video_api_request(
url=url,
json_data=json_data,
use_at=True,
at_token=at
at=at,
timeout=self._get_video_submit_timeout()
)
return result
except Exception as e:
@@ -1714,6 +1766,286 @@ class FlowClient:
# 所有重试都失败
raise last_error
# ========== 视频续写 (Video Extend) ==========
async def generate_video_extend(
self,
at: str,
project_id: str,
prompt: str,
model_key: str,
aspect_ratio: str,
video_media_id: str,
user_paygate_tier: str = "PAYGATE_TIER_ONE",
token_id: Optional[int] = None,
token_video_concurrency: Optional[int] = None,
) -> dict:
"""视频续写,基于已生成的视频延伸7秒
Args:
at: Access Token
project_id: 项目ID
prompt: 续写提示词
model_key: veo_3_1_extend_portrait / veo_3_1_extend 等
aspect_ratio: 视频宽高比
video_media_id: 源视频的 mediaGenerationId
user_paygate_tier: 用户等级
Returns:
同 generate_video_text (operations 列表)
"""
url = f"{self.api_base_url}/video:batchAsyncGenerateVideoExtendVideo"
# 403/reCAPTCHA 重试逻辑 - 最多重试3次
max_retries = 3
last_error = None
for retry_attempt in range(max_retries):
launch_gate_acquired = False
launch_ok, _, _ = await self._acquire_video_launch_gate(
token_id=token_id,
token_video_concurrency=token_video_concurrency,
)
if not launch_ok:
last_error = Exception("Video launch queue wait timeout")
raise last_error
launch_gate_acquired = True
try:
recaptcha_token, browser_id = await self._get_recaptcha_token(
project_id,
action="VIDEO_GENERATION",
token_id=token_id
)
finally:
if launch_gate_acquired:
await self._release_video_launch_gate(token_id)
if not recaptcha_token:
last_error = Exception("Failed to obtain reCAPTCHA token")
should_retry = await self._handle_missing_recaptcha_token(
retry_attempt=retry_attempt,
max_retries=max_retries,
browser_id=browser_id,
project_id=project_id,
log_prefix="[VIDEO EXTEND] 续写",
)
if should_retry:
continue
raise last_error
session_id = self._generate_session_id()
workflow_id = str(uuid.uuid4())
json_data = {
"clientContext": {
"recaptchaContext": {
"token": recaptcha_token,
"applicationType": "RECAPTCHA_APPLICATION_TYPE_WEB"
},
"sessionId": session_id,
"projectId": project_id,
"tool": "PINHOLE",
"userPaygateTier": user_paygate_tier
},
"mediaGenerationContext": {
"batchId": str(uuid.uuid4())
},
"requests": [{
"aspectRatio": aspect_ratio,
"seed": random.randint(1, 99999),
"textInput": {
"structuredPrompt": {
"parts": [{"text": prompt}]
}
},
"videoInput": {
"mediaId": video_media_id
},
"videoModelKey": model_key,
"metadata": {
"workflowId": workflow_id
}
}],
"useV2ModelConfig": True
}
# Debug: 打印请求体用于调试
import json as _json
debug_logger.log_info(f"[VIDEO EXTEND] Request URL: {url}")
debug_logger.log_info(f"[VIDEO EXTEND] Request JSON: {_json.dumps(json_data, indent=2, ensure_ascii=False)[:2000]}")
try:
result = await self._make_video_api_request(
url=url,
json_data=json_data,
at=at,
timeout=self._get_video_submit_timeout()
)
return result
except Exception as e:
last_error = e
should_retry = await self._handle_retryable_generation_error(
error=e,
retry_attempt=retry_attempt,
max_retries=max_retries,
browser_id=browser_id,
project_id=project_id,
log_prefix="[VIDEO EXTEND] 续写",
)
if should_retry:
continue
raise
finally:
await self._notify_browser_captcha_request_finished(browser_id)
# 所有重试都失败
raise last_error
# ========== 视频拼接 (Video Concatenation) ==========
async def run_concatenation(
self,
at: str,
original_media_id: str,
extend_media_id: str,
) -> dict:
"""
调用 Google runVideoFxConcatenation API 拼接视频
Args:
at: 认证 token
original_media_id: 原始视频的 mediaGenerationId (UUID)
extend_media_id: 续写视频的 mediaGenerationId (UUID)
Returns:
包含 operation name 的字典
"""
url = f"{self.api_base_url}:runVideoFxConcatenation"
json_data = {
"inputVideos": [
{
"mediaGenerationId": original_media_id,
"lengthNanos": 8000,
"startTimeOffset": "0s",
"endTimeOffset": "8s"
},
{
"mediaGenerationId": extend_media_id,
"lengthNanos": 8000,
"startTimeOffset": "1s",
"endTimeOffset": "8s"
}
]
}
debug_logger.log_info(f"[CONCAT] 提交拼接任务: original={original_media_id[:12]}..., extend={extend_media_id[:12]}...")
result = await self._make_request(
method="POST",
url=url,
json_data=json_data,
use_at=True,
at_token=at
)
debug_logger.log_info(f"[CONCAT] 拼接任务已提交: {json.dumps(result, ensure_ascii=False)[:300]}")
return result
async def poll_concatenation_status(
self,
at: str,
operation_name: str,
timeout: int = 300,
poll_interval: int = 3,
) -> dict:
"""
轮询拼接任务状态,直到完成或超时
Args:
at: 认证 token
operation_name: 拼接任务的 operation name
timeout: 超时秒数
poll_interval: 轮询间隔秒数
Returns:
包含 outputUri 和 mediaGenerationId 的字典
"""
url = f"{self.api_base_url}:runVideoFxCheckConcatenationStatus"
json_data = {
"operation": {
"operation": {
"name": operation_name
}
}
}
start_time = time.time()
while time.time() - start_time < timeout:
result = await self._make_request(
method="POST",
url=url,
json_data=json_data,
use_at=True,
at_token=at,
timeout=300, # concat API returns base64 video (~14MB), needs longer timeout
)
status = result.get("status", "")
output_uri = result.get("outputUri", "")
encoded_video = result.get("encodedVideo", "")
ev_len = len(encoded_video) if encoded_video else 0
elapsed = int(time.time() - start_time)
all_keys = list(result.keys())
debug_logger.log_info(
f"[CONCAT] 状态: {status}, outputUri={'yes' if output_uri else 'no'}, "
f"encodedVideo={ev_len} chars, elapsed={elapsed}s, keys={all_keys}"
)
# 优先检查 outputUri
if output_uri:
debug_logger.log_info(f"[CONCAT] 拼接完成 (outputUri): {output_uri[:120]}")
return result
# Google API 返回 encodedVideobase64 编码的 MP4而不是 outputUri
if encoded_video and "SUCCESSFUL" in status:
try:
import os
video_bytes = base64.b64decode(encoded_video)
video_filename = f"concat_{uuid.uuid4().hex[:12]}.mp4"
# 保存到 tmp/ 目录FastAPI 已挂载为 /tmp 静态文件)
save_dir = "tmp"
os.makedirs(save_dir, exist_ok=True)
save_path = os.path.join(save_dir, video_filename)
with open(save_path, "wb") as f:
f.write(video_bytes)
# 构造 URLFastAPI 挂载了 /tmp -> /app/tmp/
serve_url = f"/tmp/{video_filename}"
debug_logger.log_info(f"[CONCAT] 拼接完成 (encodedVideo): 保存 {len(video_bytes)} bytes -> {serve_url}")
result["outputUri"] = serve_url
result["local_file"] = save_path
return result
except Exception as e:
debug_logger.log_error(f"[CONCAT] 解码 encodedVideo 失败: {e}")
raise Exception(f"解码拼接视频失败: {e}")
# SUCCESSFUL but neither outputUri nor encodedVideo
if "SUCCESSFUL" in status:
debug_logger.log_warning(f"[CONCAT] SUCCESSFUL 但无 outputUri/encodedVideo: {json.dumps(result, ensure_ascii=False)[:300]}")
if "FAILED" in status or "ERROR" in status:
debug_logger.log_error(f"[CONCAT] 失败: {status}, 响应: {json.dumps(result, ensure_ascii=False)[:300]}")
raise Exception(f"视频拼接失败: {status}")
await asyncio.sleep(poll_interval)
debug_logger.log_error(f"[CONCAT] 超时 ({timeout}s),放弃拼接")
raise Exception(f"视频拼接超时 ({timeout}s)")
# ========== 视频放大 (Video Upsampler) ==========
async def upsample_video(
@@ -1804,12 +2136,11 @@ class FlowClient:
}
try:
result = await self._make_request(
method="POST",
result = await self._make_video_api_request(
url=url,
json_data=json_data,
use_at=True,
at_token=at
at=at,
timeout=self._get_video_submit_timeout()
)
return result
except Exception as e:
@@ -1860,12 +2191,11 @@ class FlowClient:
for retry_attempt in range(max_retries):
try:
return await self._make_request(
method="POST",
return await self._make_video_api_request(
url=url,
json_data=json_data,
use_at=True,
at_token=at
at=at,
timeout=self._get_video_poll_timeout()
)
except Exception as e:
last_error = e
@@ -2013,6 +2343,17 @@ class FlowClient:
)
except Exception:
pass
elif config.captcha_method == "extension":
try:
from .browser_captcha_extension import ExtensionCaptchaService
service = await ExtensionCaptchaService.get_instance()
await service.report_flow_error(
project_id=project_id,
error_reason=error_reason or "",
error_message=error_message or "",
)
except Exception:
pass
elif config.captcha_method == "personal" and project_id:
try:
from .browser_captcha_personal import BrowserCaptchaService
@@ -2317,6 +2658,24 @@ class FlowClient:
captcha_method = config.captcha_method
debug_logger.log_info(f"[reCAPTCHA] 开始获取 token: method={captcha_method}, project_id={project_id}, action={action}")
if captcha_method == "extension":
try:
from .browser_captcha_extension import ExtensionCaptchaService
service = await ExtensionCaptchaService.get_instance(self.db)
extension_timeout = 45 if action == "VIDEO_GENERATION" else 25
token = await service.get_token(
project_id,
action,
timeout=extension_timeout,
token_id=token_id
)
self._set_request_fingerprint(None)
return token, None
except Exception as e:
debug_logger.log_error(f"[reCAPTCHA Extension] 错误: {str(e)}")
self._set_request_fingerprint(None)
return None, None
# 内置浏览器打码 (nodriver)
if captcha_method == "personal":
debug_logger.log_info(f"[reCAPTCHA] 使用 personal 模式")
@@ -2397,7 +2756,19 @@ class FlowClient:
return None, None
# API打码服务
elif captcha_method in ["yescaptcha", "capmonster", "ezcaptcha", "capsolver"]:
self._set_request_fingerprint(None)
# 为 API 打码也设置指纹(包含代理),确保 token 获取和后续请求环境一致
if self.proxy_manager:
try:
proxy_url = await self.proxy_manager.get_request_proxy_url()
if proxy_url:
self._set_request_fingerprint({"proxy_url": proxy_url})
else:
self._set_request_fingerprint(None)
except Exception as e:
debug_logger.log_warning(f"[reCAPTCHA] Failed to get proxy for API captcha: {e}")
self._set_request_fingerprint(None)
else:
self._set_request_fingerprint(None)
token = await self._get_api_captcha_token(captcha_method, project_id, action)
return token, None
else:
@@ -2443,8 +2814,23 @@ class FlowClient:
page_action = action
try:
# Do not use curl_cffi impersonation for captcha API JSON endpoints: some ASGI
# servers (for example FastAPI/Uvicorn) may receive an empty body and return 422.
# 获取代理配置让打码API请求也走代理
# 注意curl_cffi 对 SOCKS5 使用 proxy 参数HTTP 代理使用 proxies 参数
proxy = None
proxies = None
if self.proxy_manager:
try:
proxy_url = await self.proxy_manager.get_request_proxy_url()
if proxy_url:
if proxy_url.startswith("socks5://"):
# curl_cffi 对 SOCKS5 使用 proxy 参数
proxy = proxy_url
else:
# HTTP/HTTPS 代理使用 proxies 字典
proxies = {"http": proxy_url, "https": proxy_url}
except Exception as e:
debug_logger.log_warning(f"[reCAPTCHA {method}] Failed to get proxy: {e}")
async with AsyncSession() as session:
create_url = f"{base_url}/createTask"
create_data = {
@@ -2457,11 +2843,16 @@ class FlowClient:
}
}
result = await session.post(create_url, json=create_data)
if proxy:
result = await session.post(create_url, json=create_data, impersonate="chrome124", proxy=proxy)
else:
result = await session.post(create_url, json=create_data, impersonate="chrome124", proxies=proxies)
debug_logger.log_info(f"[reCAPTCHA {method}] createTask response status: {result.status_code}")
result_json = result.json()
task_id = result_json.get('taskId')
debug_logger.log_info(f"[reCAPTCHA {method}] created task_id: {task_id}")
debug_logger.log_info(f"[reCAPTCHA {method}] created task_id: {task_id}, response: {result_json}")
if not task_id:
error_desc = result_json.get('errorDescription', 'Unknown error')
@@ -2474,7 +2865,11 @@ class FlowClient:
"clientKey": client_key,
"taskId": task_id
}
result = await session.post(get_url, json=get_data)
# 根据代理类型使用不同参数
if proxy:
result = await session.post(get_url, json=get_data, impersonate="chrome124", proxy=proxy)
else:
result = await session.post(get_url, json=get_data, impersonate="chrome124", proxies=proxies)
result_json = result.json()
debug_logger.log_info(f"[reCAPTCHA {method}] polling #{i+1}: {result_json}")

View File

@@ -281,14 +281,14 @@ MODEL_CONFIG = {
"veo_3_1_t2v_portrait": {
"type": "video",
"video_type": "t2v",
"model_key": "veo_3_1_t2v_portrait",
"model_key": "veo_3_1_t2v_fast_portrait",
"aspect_ratio": "VIDEO_ASPECT_RATIO_PORTRAIT",
"supports_images": False
},
"veo_3_1_t2v_landscape": {
"type": "video",
"video_type": "t2v",
"model_key": "veo_3_1_t2v",
"model_key": "veo_3_1_t2v_fast",
"aspect_ratio": "VIDEO_ASPECT_RATIO_LANDSCAPE",
"supports_images": False
},
@@ -661,10 +661,51 @@ MODEL_CONFIG = {
"min_images": 0,
"max_images": 3,
"upsample": {"resolution": "VIDEO_RESOLUTION_1080P", "model_key": "veo_3_1_upsampler_1080p"}
}
},
# ========== 视频续写 (Extend - Video Continuation) ==========
# 基于已生成的视频续写7秒最多续写20次最长148秒
# 需要提供源视频的 mediaGenerationId
# VEO 3.1 Extend (横竖屏)
"veo_3_1_extend_portrait": {
"type": "video",
"video_type": "extend",
"model_key": "veo_3_1_extend_fast_portrait_ultra",
"aspect_ratio": "VIDEO_ASPECT_RATIO_PORTRAIT",
"supports_images": False,
"requires_video_id": True,
},
"veo_3_1_extend": {
"type": "video",
"video_type": "extend",
"model_key": "veo_3_1_extend_fast_ultra",
"aspect_ratio": "VIDEO_ASPECT_RATIO_LANDSCAPE",
"supports_images": False,
"requires_video_id": True,
},
}
def _known_video_model_keys() -> set[str]:
return {
cfg["model_key"]
for cfg in MODEL_CONFIG.values()
if cfg.get("type") == "video" and cfg.get("model_key")
}
def _resolve_tier_two_model_key(model_key: str) -> str:
"""Only upgrade to an ultra key when that exact upstream key is known valid."""
if "ultra" in model_key:
return model_key
if "_fl" in model_key:
candidate = model_key.replace("_fl", "_ultra_fl")
else:
candidate = model_key + "_ultra"
return candidate if candidate in _known_video_model_keys() else model_key
class GenerationHandler:
"""统一生成处理器"""
@@ -777,7 +818,8 @@ class GenerationHandler:
prompt: str,
images: Optional[List[bytes]] = None,
stream: bool = False,
base_url_override: Optional[str] = None
base_url_override: Optional[str] = None,
video_media_id: Optional[str] = None,
) -> AsyncGenerator:
"""统一生成入口
@@ -816,7 +858,8 @@ class GenerationHandler:
model_config = MODEL_CONFIG[model]
generation_type = model_config["type"]
request_operation = f"generate_{generation_type}"
video_type_for_op = model_config.get("video_type", "")
request_operation = "extend_video" if video_type_for_op == "extend" else f"generate_{generation_type}"
prompt_for_log = prompt if len(prompt) <= 2000 else f"{prompt[:2000]}...(truncated)"
request_payload = {
"model": model,
@@ -978,7 +1021,8 @@ class GenerationHandler:
generation_result=generation_result,
response_state=response_state,
request_log_state=request_log_state,
pending_token_state=pending_token_state
pending_token_state=pending_token_state,
video_media_id=video_media_id,
):
yield chunk
perf_trace["generation_pipeline_ms"] = int((time.time() - generation_pipeline_started_at) * 1000)
@@ -1428,7 +1472,8 @@ class GenerationHandler:
generation_result: Optional[Dict[str, Any]] = None,
response_state: Optional[Dict[str, Any]] = None,
request_log_state: Optional[Dict[str, Any]] = None,
pending_token_state: Optional[Dict[str, bool]] = None
pending_token_state: Optional[Dict[str, bool]] = None,
video_media_id: Optional[str] = None,
) -> AsyncGenerator:
"""处理视频生成 (异步轮询)"""
@@ -1458,11 +1503,40 @@ class GenerationHandler:
# 根据账号tier自动调整模型 key
user_tier = normalized_tier
model_key, tier_message = self._resolve_video_model_key_for_tier(model_config, user_tier)
if tier_message and stream:
yield self._create_stream_chunk(f"{tier_message}\n")
if model_key != model_config["model_key"]:
debug_logger.log_info(f"[VIDEO] 账号层级自动调整模型: {model_config['model_key']} -> {model_key}")
# Extend 模型跳过 ultra 自动降级(只有 ultra 版本有效)
is_extend = video_type == "extend"
# TIER_TWO 账号需要使用 ultra 版本的模型
if user_tier == "PAYGATE_TIER_TWO":
# 如果模型 key 不包含 ultra自动添加
if "ultra" not in model_key:
original_model_key = model_key
model_key = _resolve_tier_two_model_key(model_key)
if stream:
if model_key != original_model_key:
yield self._create_stream_chunk(f"TIER_TWO 账号自动切换到 ultra 模型: {model_key}\n")
else:
yield self._create_stream_chunk(f"TIER_TWO 账号保持当前模型: {model_key}\n")
if model_key != original_model_key:
debug_logger.log_info(f"[VIDEO] TIER_TWO 账号,模型自动调整: {original_model_key} -> {model_key}")
else:
debug_logger.log_info(f"[VIDEO] TIER_TWO 账号,未找到有效 ultra 变体,保持模型: {model_key}")
# TIER_ONE 账号需要使用非 ultra 版本
elif user_tier == "PAYGATE_TIER_ONE":
# 如果模型 key 包含 ultra需要移除避免用户误用
if "ultra" in model_key:
# veo_3_1_i2v_s_fast_ultra_fl -> veo_3_1_i2v_s_fast_fl
# veo_3_1_t2v_fast_ultra -> veo_3_1_t2v_fast
# veo_3_1_r2v_fast_landscape_ultra -> veo_3_1_r2v_fast_landscape
# veo_3_1_extend_fast_portrait_ultra -> veo_3_1_extend_fast_portrait
model_key = model_key.replace("_ultra_fl", "_fl").replace("_ultra", "")
if stream:
yield self._create_stream_chunk(f"TIER_ONE 账号自动切换到标准模型: {model_key}\n")
debug_logger.log_info(f"[VIDEO] TIER_ONE 账号,模型自动调整: {model_config['model_key']} -> {model_key}")
# 更新 model_config 中的 model_key
model_config = dict(model_config) # 创建副本避免修改原配置
@@ -1602,6 +1676,31 @@ class GenerationHandler:
token_video_concurrency=token.video_concurrency,
)
# Extend: 视频续写
elif video_type == "extend":
if not video_media_id:
error_msg = "❌ 视频续写需要提供源视频的 mediaGenerationId请在 image_url 中传入 extend://VIDEO_MEDIA_ID"
if stream:
yield self._create_stream_chunk(f"{error_msg}\n")
self._mark_generation_failed(generation_result, error_msg)
yield self._create_error_response(error_msg, status_code=400)
return
debug_logger.log_info(f"[EXTEND] 续写视频: {video_media_id}")
if stream:
yield self._create_stream_chunk(f"视频续写任务提交中,源视频: {video_media_id[:8]}...\n")
result = await self.flow_client.generate_video_extend(
at=token.at,
project_id=project_id,
prompt=prompt,
video_media_id=video_media_id,
model_key=model_config["model_key"],
aspect_ratio=model_config["aspect_ratio"],
user_paygate_tier=normalized_tier,
token_id=token.id,
token_video_concurrency=token.video_concurrency,
)
# T2V 或 R2V无图: 纯文本生成
else:
result = await self.flow_client.generate_video_text(
@@ -1654,6 +1753,8 @@ class GenerationHandler:
# 检查是否需要放大
upsample_config = model_config.get("upsample")
# 如果是 extend传入源视频 media_id 用于后续拼接
extend_source_id = video_media_id if video_type == "extend" else None
async for chunk in self._poll_video_result(
token,
project_id,
@@ -1663,6 +1764,7 @@ class GenerationHandler:
generation_result,
response_state,
request_log_state,
extend_source_media_id=extend_source_id,
):
yield chunk
@@ -1678,7 +1780,8 @@ class GenerationHandler:
upsample_config: Optional[Dict] = None,
generation_result: Optional[Dict[str, Any]] = None,
response_state: Optional[Dict[str, Any]] = None,
request_log_state: Optional[Dict[str, Any]] = None
request_log_state: Optional[Dict[str, Any]] = None,
extend_source_media_id: Optional[str] = None,
) -> AsyncGenerator:
"""轮询视频生成结果
@@ -1728,7 +1831,12 @@ class GenerationHandler:
metadata = operation["operation"].get("metadata", {})
video_info = metadata.get("video", {})
video_url = video_info.get("fifeUrl")
video_media_id = video_info.get("mediaGenerationId")
# Extract short UUID from Google Storage URL (e.g., /video/UUID?)
# Both extend API and concat API need this short UUID format,
# NOT the CAUS base64 mediaGenerationId from video_info
import re as _re
_uuid_match = _re.search(r'/video/([0-9a-f-]{36})', video_url or '')
video_media_id = _uuid_match.group(1) if _uuid_match else video_info.get("mediaGenerationId", "")
aspect_ratio = video_info.get("aspectRatio", "VIDEO_ASPECT_RATIO_LANDSCAPE")
if not video_url:
@@ -1764,7 +1872,14 @@ class GenerationHandler:
# 递归轮询放大结果(不再放大)
async for chunk in self._poll_video_result(
token, project_id, upsample_operations, stream, None, generation_result, response_state, request_log_state
token,
project_id,
upsample_operations,
stream,
None,
generation_result,
response_state,
request_log_state,
):
yield chunk
return
@@ -1776,6 +1891,62 @@ class GenerationHandler:
if stream:
yield self._create_stream_chunk(f"⚠️ 放大失败: {str(e)},返回原始视频\n")
# ========== Extend 视频拼接 ==========
if extend_source_media_id and video_media_id:
try:
if stream:
yield self._create_stream_chunk("\n视频续写完成,正在拼接完整视频...\n")
debug_logger.log_info(f"[CONCAT] 开始拼接: original={extend_source_media_id[:12]}..., extend={video_media_id[:12]}...")
# 提交拼接任务
concat_result = await self.flow_client.run_concatenation(
at=token.at,
original_media_id=extend_source_media_id,
extend_media_id=video_media_id,
)
# 获取 operation name
concat_op = concat_result.get("operation", {}).get("operation", {}).get("name", "")
if concat_op:
if stream:
yield self._create_stream_chunk("拼接任务已提交,等待完成...\n")
# 轮询拼接状态
concat_status = await self.flow_client.poll_concatenation_status(
at=token.at,
operation_name=concat_op,
timeout=300,
poll_interval=3,
)
concat_url = concat_status.get("outputUri", "")
if concat_url:
# 如果是本地路径(/tmp/xxx.mp4构造完整 URL
if concat_url.startswith("/tmp/"):
server_host = config.server_host or "0.0.0.0"
server_port = config.server_port or 8000
# 对外使用 localhost
host = "localhost" if server_host == "0.0.0.0" else server_host
concat_url = f"http://{host}:{server_port}{concat_url}"
video_url = concat_url # 替换为拼接后的完整视频 URL
if stream:
yield self._create_stream_chunk("✅ 视频拼接完成!返回 16s 完整视频\n")
debug_logger.log_info(f"[CONCAT] 拼接成功: {concat_url[:80]}...")
else:
if stream:
yield self._create_stream_chunk("⚠️ 拼接完成但无 URL返回续写片段\n")
else:
debug_logger.log_warning("[CONCAT] 拼接任务创建失败,返回续写片段")
if stream:
yield self._create_stream_chunk("⚠️ 拼接任务创建失败,返回续写片段\n")
except Exception as e:
import traceback
debug_logger.log_error(f"[CONCAT] 拼接失败: {str(e)}")
debug_logger.log_error(f"[CONCAT] traceback: {traceback.format_exc()}")
if stream:
yield self._create_stream_chunk(f"⚠️ 拼接失败: {str(e)},返回续写片段\n")
# 拼接失败不影响返回,继续使用 extend 片段的 URL
# 缓存视频 (如果启用)
local_url = video_url
if config.cache_enabled:
@@ -1812,7 +1983,8 @@ class GenerationHandler:
response_state["url"] = local_url
response_state["generated_assets"] = {
"type": "video",
"final_video_url": local_url
"final_video_url": local_url,
"mediaGenerationId": video_media_id,
}
# 返回结果
@@ -1820,9 +1992,10 @@ class GenerationHandler:
if stream:
yield self._create_stream_chunk(
f"<video src='{local_url}' controls style='max-width:100%'></video>",
f"<video src='{local_url}' data-media-id='{video_media_id}' controls style='max-width:100%'></video>",
finish_reason="stop"
)
else:
yield self._create_completion_response(
local_url, # 直接传URL,让方法内部格式化
@@ -1857,6 +2030,16 @@ class GenerationHandler:
self._mark_generation_failed(generation_result, error_msg)
yield self._create_error_response(error_msg, status_code=502)
return
elif status == "MEDIA_GENERATION_STATUS_ACTIVE" and attempt > 80:
# 如果持续4分钟80次 * 3秒 = 240秒依然是 ACTIVE 状态,则判定为卡死
error_msg = "视频生成超时 (上游卡顿超过4分钟已自动取消)"
await self._fail_video_task(checked_operations, error_msg)
self._mark_generation_failed(generation_result, error_msg)
if stream:
yield self._create_stream_chunk(f"{error_msg}\n")
yield self._create_error_response(error_msg, status_code=504)
return
except Exception as e:
last_poll_error = e

View File

@@ -117,6 +117,26 @@ class LoadBalancer:
self._round_robin_state[scenario] = selected["token"].id
return selected
async def _check_extension_route(self, token: Token) -> tuple[bool, str]:
"""Ensure extension captcha requests are routed to the selected account."""
if config.captcha_method != "extension":
return True, ""
try:
from .browser_captcha_extension import ExtensionCaptchaService
service = await ExtensionCaptchaService.get_instance(getattr(self.token_manager, "db", None))
has_connection, route_key = await service.has_connection_for_token(token.id)
if has_connection:
return True, ""
available = service.describe_routes() or "none"
if route_key:
return False, f"扩展路由 {route_key} 未连接(可用路由: {available}"
return False, f"扩展路由未配置或匿名插件未连接(可用路由: {available}"
except Exception as exc:
return False, f"扩展路由检查失败: {exc}"
async def select_token(
self,
for_image_generation: bool = False,
@@ -171,6 +191,11 @@ class LoadBalancer:
filtered_reasons[token.id] = "图片生成已禁用"
continue
route_ok, route_reason = await self._check_extension_route(token)
if not route_ok:
filtered_reasons[token.id] = route_reason
continue
if (
enforce_concurrency_filter
and self.concurrency_manager
@@ -184,6 +209,11 @@ class LoadBalancer:
filtered_reasons[token.id] = "视频生成已禁用"
continue
route_ok, route_reason = await self._check_extension_route(token)
if not route_ok:
filtered_reasons[token.id] = route_reason
continue
if (
enforce_concurrency_filter
and self.concurrency_manager

View File

@@ -47,6 +47,7 @@ class ProxyManager:
# 协议前缀格式
if line.startswith(("http://", "https://", "socks5://", "socks5h://")):
# 已是标准 user:pass@host:port或 host:port
if "@" in line:
return line

View File

@@ -209,7 +209,8 @@ class TokenManager:
video_enabled: bool = True,
image_concurrency: int = -1,
video_concurrency: int = -1,
captcha_proxy_url: Optional[str] = None
captcha_proxy_url: Optional[str] = None,
extension_route_key: Optional[str] = None,
) -> Token:
"""Add a new token and prepare its pooled projects."""
existing_token = await self.db.get_token_by_st(st)
@@ -284,7 +285,8 @@ class TokenManager:
video_enabled=video_enabled,
image_concurrency=image_concurrency,
video_concurrency=video_concurrency,
captcha_proxy_url=captcha_proxy_url
captcha_proxy_url=captcha_proxy_url,
extension_route_key=extension_route_key,
)
token_id = await self.db.add_token(token)
@@ -314,7 +316,8 @@ class TokenManager:
video_enabled: Optional[bool] = None,
image_concurrency: Optional[int] = None,
video_concurrency: Optional[int] = None,
captcha_proxy_url: Optional[str] = None
captcha_proxy_url: Optional[str] = None,
extension_route_key: Optional[str] = None,
):
"""Update token (支持修改project_id和project_name)
@@ -344,6 +347,8 @@ class TokenManager:
update_fields["video_concurrency"] = video_concurrency
if captcha_proxy_url is not None:
update_fields["captcha_proxy_url"] = captcha_proxy_url
if extension_route_key is not None:
update_fields["extension_route_key"] = extension_route_key
# 检查token是否因429被禁用如果是且未过期则清空429状态
token = await self.db.get_token(token_id)

View File

@@ -62,7 +62,7 @@ class VeoLiteFlowClientTests(unittest.IsolatedAsyncioTestCase):
async def test_generate_video_text_uses_v2_payload_for_lite(self):
captured = {}
async def fake_make_request(method, url, json_data, use_at, at_token):
async def fake_make_request(method, url, json_data, use_at, at_token, **kwargs):
captured["url"] = url
captured["json_data"] = json_data
return {"operations": [{"operation": {"name": "task-1"}}]}
@@ -92,7 +92,7 @@ class VeoLiteFlowClientTests(unittest.IsolatedAsyncioTestCase):
async def test_generate_video_start_end_uses_v2_payload_for_interpolation_lite(self):
captured = {}
async def fake_make_request(method, url, json_data, use_at, at_token):
async def fake_make_request(method, url, json_data, use_at, at_token, **kwargs):
captured["url"] = url
captured["json_data"] = json_data
return {"operations": [{"operation": {"name": "task-2"}}]}