feat: 新增验证码分数测试并优化浏览器打码流程

- 后台新增 /api/captcha/score-test,支持按当前打码配置直接测分并返回详细结果
- 管理后台新增当前打码分数测试入口,展示 score、耗时、action、hostname 等信息
- 优化 browser 与 personal 模式的 reCAPTCHA 执行、页面测分、代理兼容与 UA 策略
This commit is contained in:
genz27
2026-03-01 07:41:37 +08:00
parent e5ef238849
commit cb50ea0a5c
4 changed files with 1501 additions and 48 deletions

View File

@@ -1,10 +1,12 @@
"""Admin API routes"""
import asyncio
from fastapi import APIRouter, Depends, HTTPException, Header, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import Optional, List
from typing import Optional, List, Dict, Any
import secrets
import time
import re
from curl_cffi.requests import AsyncSession
from ..core.auth import AuthManager
from ..core.database import Database
@@ -21,6 +23,187 @@ db: Database = None
# Store active admin session tokens (in production, use Redis or database)
active_admin_tokens = set()
SUPPORTED_API_CAPTCHA_METHODS = {"yescaptcha", "capmonster", "ezcaptcha", "capsolver"}
def _mask_token(token: Optional[str]) -> str:
if not token:
return ""
if len(token) <= 24:
return token
return f"{token[:18]}...{token[-8:]}"
def _guess_client_hints_from_user_agent(user_agent: str) -> Dict[str, str]:
"""根据 UA 补全常见的 sec-ch-* 头。"""
ua = (user_agent or "").strip()
if not ua:
return {}
headers: Dict[str, str] = {}
major_match = re.search(r"(?:Chrome|Chromium|Edg|EdgA|EdgiOS)/(\d+)", ua)
is_mobile = any(token in ua for token in ("Android", "iPhone", "iPad", "Mobile"))
headers["sec-ch-ua-mobile"] = "?1" if is_mobile else "?0"
if "Windows" in ua:
headers["sec-ch-ua-platform"] = '"Windows"'
elif "Macintosh" in ua or "Mac OS X" in ua:
headers["sec-ch-ua-platform"] = '"macOS"'
elif "Android" in ua:
headers["sec-ch-ua-platform"] = '"Android"'
elif "iPhone" in ua or "iPad" in ua:
headers["sec-ch-ua-platform"] = '"iOS"'
elif "Linux" in ua:
headers["sec-ch-ua-platform"] = '"Linux"'
if major_match:
major = major_match.group(1)
if "Edg/" in ua:
headers["sec-ch-ua"] = (
f'"Not:A-Brand";v="99", "Microsoft Edge";v="{major}", "Chromium";v="{major}"'
)
else:
headers["sec-ch-ua"] = (
f'"Not:A-Brand";v="99", "Google Chrome";v="{major}", "Chromium";v="{major}"'
)
return headers
def _guess_impersonate_from_user_agent(user_agent: str) -> str:
"""从 UA 选择可用的 curl_cffi 浏览器指纹版本。"""
ua = (user_agent or "").strip()
major_match = re.search(r"(?:Chrome|Chromium|Edg|EdgA|EdgiOS)/(\d+)", ua)
if not major_match:
return "chrome120"
try:
major = int(major_match.group(1))
except Exception:
return "chrome120"
if major >= 124:
return "chrome124"
if major >= 120:
return "chrome120"
return "chrome120"
def _build_proxy_map(proxy_url: str) -> Optional[Dict[str, str]]:
normalized = (proxy_url or "").strip()
if not normalized:
return None
return {"http": normalized, "https": normalized}
async def _resolve_score_test_verify_proxy(
captcha_method: str,
browser_proxy_enabled: bool,
browser_proxy_url: str
) -> tuple[Optional[Dict[str, str]], bool, str, str]:
"""
选择 score-test 的 verify 请求代理,优先与浏览器打码代理保持一致。
返回: (proxies, used, source, proxy_url)
"""
# 浏览器打码模式优先使用 browser_proxy确保与取 token 出口一致
if captcha_method in {"browser", "personal"} and browser_proxy_enabled and browser_proxy_url:
proxy_map = _build_proxy_map(browser_proxy_url)
if proxy_map:
return proxy_map, True, "captcha_browser_proxy", browser_proxy_url
# 退回请求代理配置
try:
if proxy_manager:
proxy_cfg = await proxy_manager.get_proxy_config()
if proxy_cfg and proxy_cfg.enabled and proxy_cfg.proxy_url:
proxy_map = _build_proxy_map(proxy_cfg.proxy_url)
if proxy_map:
return proxy_map, True, "request_proxy", proxy_cfg.proxy_url
except Exception:
pass
return None, False, "none", ""
async def _solve_recaptcha_with_api_service(
method: str,
website_url: str,
website_key: str,
action: str,
enterprise: bool = False
) -> Optional[str]:
"""使用当前配置的第三方打码服务获取 token。"""
if method == "yescaptcha":
client_key = config.yescaptcha_api_key
base_url = config.yescaptcha_base_url
task_type = "RecaptchaV3TaskProxylessM1"
elif method == "capmonster":
client_key = config.capmonster_api_key
base_url = config.capmonster_base_url
task_type = "RecaptchaV3TaskProxyless"
elif method == "ezcaptcha":
client_key = config.ezcaptcha_api_key
base_url = config.ezcaptcha_base_url
task_type = "ReCaptchaV3TaskProxylessS9"
elif method == "capsolver":
client_key = config.capsolver_api_key
base_url = config.capsolver_base_url
task_type = "ReCaptchaV3EnterpriseTaskProxyLess" if enterprise else "ReCaptchaV3TaskProxyLess"
else:
raise RuntimeError(f"不支持的打码方式: {method}")
if not client_key:
raise RuntimeError(f"{method} API Key 未配置")
task: Dict[str, Any] = {
"websiteURL": website_url,
"websiteKey": website_key,
"type": task_type,
"pageAction": action,
}
if enterprise and method == "capsolver":
task["isEnterprise"] = True
create_url = f"{base_url.rstrip('/')}/createTask"
get_url = f"{base_url.rstrip('/')}/getTaskResult"
async with AsyncSession() as session:
create_resp = await session.post(
create_url,
json={"clientKey": client_key, "task": task},
impersonate="chrome120",
timeout=30
)
create_json = create_resp.json()
task_id = create_json.get("taskId")
if not task_id:
error_desc = create_json.get("errorDescription") or create_json.get("errorMessage") or str(create_json)
raise RuntimeError(f"{method} createTask 失败: {error_desc}")
for _ in range(40):
poll_resp = await session.post(
get_url,
json={"clientKey": client_key, "taskId": task_id},
impersonate="chrome120",
timeout=30
)
poll_json = poll_resp.json()
if poll_json.get("status") == "ready":
solution = poll_json.get("solution", {}) or {}
token = solution.get("gRecaptchaResponse") or solution.get("token")
if token:
return token
raise RuntimeError(f"{method} 返回结果缺少 token: {poll_json}")
if poll_json.get("errorId") not in (None, 0):
error_desc = poll_json.get("errorDescription") or poll_json.get("errorMessage") or str(poll_json)
raise RuntimeError(f"{method} getTaskResult 失败: {error_desc}")
await asyncio.sleep(3)
raise RuntimeError(f"{method} 获取 token 超时")
def set_dependencies(tm: TokenManager, pm: ProxyManager, database: Database):
@@ -73,6 +256,14 @@ class ProxyTestRequest(BaseModel):
timeout_seconds: Optional[int] = 15
class CaptchaScoreTestRequest(BaseModel):
website_url: Optional[str] = "https://antcpt.com/score_detector/"
website_key: Optional[str] = "6LcR_okUAAAAAPYrPe-HK_0RULO1aZM15ENyM-Mf"
action: Optional[str] = "homepage"
verify_url: Optional[str] = "https://antcpt.com/score_detector/verify.php"
enterprise: Optional[bool] = False
class GenerationConfigRequest(BaseModel):
image_timeout: int
video_timeout: int
@@ -1036,6 +1227,282 @@ async def get_captcha_config(token: str = Depends(verify_admin_token)):
}
@router.post("/api/captcha/score-test")
async def test_captcha_score(
request: Optional[CaptchaScoreTestRequest] = None,
token: str = Depends(verify_admin_token)
):
"""使用当前打码方式获取 token并提交到 antcpt 校验分数。"""
req = request or CaptchaScoreTestRequest()
website_url = (req.website_url or "https://antcpt.com/score_detector/").strip()
website_key = (req.website_key or "6LcR_okUAAAAAPYrPe-HK_0RULO1aZM15ENyM-Mf").strip()
action = (req.action or "homepage").strip()
verify_url = (req.verify_url or "https://antcpt.com/score_detector/verify.php").strip()
enterprise = bool(req.enterprise)
started_at = time.time()
captcha_config = await db.get_captcha_config()
captcha_method = (captcha_config.captcha_method or config.captcha_method or "").strip().lower()
browser_proxy_enabled = bool(captcha_config.browser_proxy_enabled)
browser_proxy_url = captcha_config.browser_proxy_url or ""
token_value: Optional[str] = None
fingerprint: Optional[Dict[str, Any]] = None
token_elapsed_ms = 0
verify_elapsed_ms = 0
verify_http_status = None
verify_result: Dict[str, Any] = {}
verify_headers: Dict[str, str] = {}
verify_proxy_used = False
verify_proxy_source = "none"
verify_proxy_url = ""
verify_impersonate = "chrome120"
page_verify_only = captcha_method in {"browser", "personal"}
verify_mode = "browser_page" if page_verify_only else "server_post"
try:
token_start = time.time()
if captcha_method == "browser":
from ..services.browser_captcha import BrowserCaptchaService
service = await BrowserCaptchaService.get_instance(db)
score_payload, browser_id = await service.get_custom_score(
website_url=website_url,
website_key=website_key,
verify_url=verify_url,
action=action,
enterprise=enterprise
)
if isinstance(score_payload, dict):
token_value = score_payload.get("token")
verify_elapsed_ms = int(score_payload.get("verify_elapsed_ms") or 0)
verify_http_status = score_payload.get("verify_http_status")
verify_result = score_payload.get("verify_result") if isinstance(score_payload.get("verify_result"), dict) else {}
verify_mode = score_payload.get("verify_mode") or "browser_page"
score_token_elapsed = score_payload.get("token_elapsed_ms")
if isinstance(score_token_elapsed, (int, float)):
token_elapsed_ms = int(score_token_elapsed)
if token_value:
fingerprint = await service.get_fingerprint(browser_id)
verify_proxy_used = bool(browser_proxy_enabled and browser_proxy_url)
verify_proxy_source = "captcha_browser_proxy" if verify_proxy_used else "browser_direct"
verify_proxy_url = browser_proxy_url if verify_proxy_used else ""
elif captcha_method == "personal":
from ..services.browser_captcha_personal import BrowserCaptchaService
service = await BrowserCaptchaService.get_instance(db)
score_payload = await service.get_custom_score(
website_url=website_url,
website_key=website_key,
verify_url=verify_url,
action=action,
enterprise=enterprise
)
if isinstance(score_payload, dict):
token_value = score_payload.get("token")
verify_elapsed_ms = int(score_payload.get("verify_elapsed_ms") or 0)
verify_http_status = score_payload.get("verify_http_status")
verify_result = score_payload.get("verify_result") if isinstance(score_payload.get("verify_result"), dict) else {}
verify_mode = score_payload.get("verify_mode") or "browser_page"
score_token_elapsed = score_payload.get("token_elapsed_ms")
if isinstance(score_token_elapsed, (int, float)):
token_elapsed_ms = int(score_token_elapsed)
if token_value:
fingerprint = service.get_last_fingerprint()
verify_proxy_used = bool(browser_proxy_enabled and browser_proxy_url)
verify_proxy_source = "captcha_browser_proxy" if verify_proxy_used else "browser_direct"
verify_proxy_url = browser_proxy_url if verify_proxy_used else ""
elif captcha_method in SUPPORTED_API_CAPTCHA_METHODS:
token_value = await _solve_recaptcha_with_api_service(
method=captcha_method,
website_url=website_url,
website_key=website_key,
action=action,
enterprise=enterprise
)
else:
return {
"success": False,
"message": f"当前打码方式不支持分数测试: {captcha_method}",
"captcha_method": captcha_method,
"website_url": website_url,
"website_key": website_key,
"action": action,
"verify_url": verify_url,
"enterprise": enterprise,
"token_acquired": False,
"elapsed_ms": int((time.time() - started_at) * 1000)
}
if token_elapsed_ms <= 0:
token_elapsed_ms = int((time.time() - token_start) * 1000)
if not token_value:
return {
"success": False,
"message": "未获取到 reCAPTCHA token",
"captcha_method": captcha_method,
"website_url": website_url,
"website_key": website_key,
"action": action,
"verify_url": verify_url,
"enterprise": enterprise,
"token_acquired": False,
"token_elapsed_ms": token_elapsed_ms,
"browser_proxy_enabled": browser_proxy_enabled,
"browser_proxy_url": browser_proxy_url if browser_proxy_enabled else "",
"fingerprint": fingerprint,
"elapsed_ms": int((time.time() - started_at) * 1000)
}
if verify_mode == "server_post" and not page_verify_only:
verify_start = time.time()
verify_headers = {
"accept": "application/json, text/javascript, */*; q=0.01",
"content-type": "application/json",
"origin": "https://antcpt.com",
"referer": website_url,
"x-requested-with": "XMLHttpRequest",
}
if isinstance(fingerprint, dict):
ua = (fingerprint.get("user_agent") or "").strip()
lang = (fingerprint.get("accept_language") or "").strip()
sec_ch_ua = (fingerprint.get("sec_ch_ua") or "").strip()
sec_ch_ua_mobile = (fingerprint.get("sec_ch_ua_mobile") or "").strip()
sec_ch_ua_platform = (fingerprint.get("sec_ch_ua_platform") or "").strip()
if ua:
verify_headers["user-agent"] = ua
if lang:
verify_headers["accept-language"] = lang if "," in lang else f"{lang},zh;q=0.9"
if sec_ch_ua:
verify_headers["sec-ch-ua"] = sec_ch_ua
if sec_ch_ua_mobile:
verify_headers["sec-ch-ua-mobile"] = sec_ch_ua_mobile
if sec_ch_ua_platform:
verify_headers["sec-ch-ua-platform"] = sec_ch_ua_platform
if verify_headers.get("user-agent"):
for header_name, header_value in _guess_client_hints_from_user_agent(
verify_headers.get("user-agent", "")
).items():
if header_value and not verify_headers.get(header_name):
verify_headers[header_name] = header_value
verify_impersonate = _guess_impersonate_from_user_agent(verify_headers.get("user-agent", ""))
verify_proxies, verify_proxy_used, verify_proxy_source, verify_proxy_url = (
await _resolve_score_test_verify_proxy(
captcha_method=captcha_method,
browser_proxy_enabled=browser_proxy_enabled,
browser_proxy_url=browser_proxy_url
)
)
async with AsyncSession() as session:
verify_resp = await session.post(
verify_url,
json={"g-recaptcha-response": token_value},
headers=verify_headers,
proxies=verify_proxies,
impersonate=verify_impersonate,
timeout=30
)
verify_elapsed_ms = int((time.time() - verify_start) * 1000)
verify_http_status = verify_resp.status_code
try:
verify_result = verify_resp.json()
except Exception:
verify_result = {"raw": verify_resp.text}
else:
verify_headers = {
"origin": "https://antcpt.com",
"referer": website_url,
"x-requested-with": "XMLHttpRequest",
}
if isinstance(fingerprint, dict):
verify_headers.update({
"user-agent": fingerprint.get("user_agent", ""),
"accept-language": fingerprint.get("accept_language", ""),
"sec-ch-ua": fingerprint.get("sec_ch_ua", ""),
"sec-ch-ua-mobile": fingerprint.get("sec_ch_ua_mobile", ""),
"sec-ch-ua-platform": fingerprint.get("sec_ch_ua_platform", ""),
})
verify_success = bool(verify_result.get("success")) if isinstance(verify_result, dict) else False
score_value = verify_result.get("score") if isinstance(verify_result, dict) else None
return {
"success": verify_success,
"message": "分数校验成功" if verify_success else "分数校验未通过",
"captcha_method": captcha_method,
"website_url": website_url,
"website_key": website_key,
"action": action,
"verify_url": verify_url,
"enterprise": enterprise,
"token_acquired": True,
"token_preview": _mask_token(token_value),
"token_elapsed_ms": token_elapsed_ms,
"verify_elapsed_ms": verify_elapsed_ms,
"verify_http_status": verify_http_status,
"score": score_value,
"verify_result": verify_result,
"verify_request_meta": {
"mode": verify_mode,
"proxy_used": verify_proxy_used,
"user_agent": verify_headers.get("user-agent", ""),
"accept_language": verify_headers.get("accept-language", ""),
"sec_ch_ua": verify_headers.get("sec-ch-ua", ""),
"sec_ch_ua_mobile": verify_headers.get("sec-ch-ua-mobile", ""),
"sec_ch_ua_platform": verify_headers.get("sec-ch-ua-platform", ""),
"origin": verify_headers.get("origin", ""),
"referer": verify_headers.get("referer", ""),
"x_requested_with": verify_headers.get("x-requested-with", ""),
"proxy_source": verify_proxy_source,
"proxy_url": verify_proxy_url,
"impersonate": verify_impersonate,
},
"browser_proxy_enabled": browser_proxy_enabled,
"browser_proxy_url": browser_proxy_url if browser_proxy_enabled else "",
"fingerprint": fingerprint,
"elapsed_ms": int((time.time() - started_at) * 1000)
}
except Exception as e:
return {
"success": False,
"message": f"分数测试失败: {str(e)}",
"captcha_method": captcha_method,
"website_url": website_url,
"website_key": website_key,
"action": action,
"verify_url": verify_url,
"enterprise": enterprise,
"token_acquired": bool(token_value),
"token_preview": _mask_token(token_value),
"token_elapsed_ms": token_elapsed_ms,
"verify_elapsed_ms": verify_elapsed_ms,
"verify_http_status": verify_http_status,
"verify_result": verify_result,
"verify_request_meta": {
"mode": verify_mode,
"proxy_used": verify_proxy_used,
"user_agent": verify_headers.get("user-agent", ""),
"accept_language": verify_headers.get("accept-language", ""),
"sec_ch_ua": verify_headers.get("sec-ch-ua", ""),
"sec_ch_ua_mobile": verify_headers.get("sec-ch-ua-mobile", ""),
"sec_ch_ua_platform": verify_headers.get("sec-ch-ua-platform", ""),
"origin": verify_headers.get("origin", ""),
"referer": verify_headers.get("referer", ""),
"x_requested_with": verify_headers.get("x-requested-with", ""),
"proxy_source": verify_proxy_source,
"proxy_url": verify_proxy_url,
"impersonate": verify_impersonate,
},
"browser_proxy_enabled": browser_proxy_enabled,
"browser_proxy_url": browser_proxy_url if browser_proxy_enabled else "",
"fingerprint": fingerprint,
"elapsed_ms": int((time.time() - started_at) * 1000)
}
# ========== Plugin Configuration Endpoints ==========
@router.get("/api/plugin/config")