feat: 为 Personal 模式添加完整代理支持

- 支持 HTTP/HTTPS/SOCKS5 代理
- 支持带认证的代理(通过 Chrome 扩展)
- 代理配置优先级: 验证码代理 > 全局代理
- 自动清理临时扩展目录
- 完善代理相关日志

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
genz27
2026-03-30 17:12:59 +08:00
parent 0d614870af
commit 896e4cbb60
6 changed files with 168 additions and 37 deletions

View File

@@ -213,13 +213,14 @@ LABS_URL = "https://labs.google/fx/tools/flow"
# 代理解析工具函数
# ==========================================
def parse_proxy_url(proxy_url: str) -> Optional[Dict[str, str]]:
"""解析代理URL"""
"""解析代理URL(支持 socks5h://Playwright 中按 socks5 处理)"""
if not proxy_url: return None
if not re.match(r'^(http|https|socks5)://', proxy_url): proxy_url = f"http://{proxy_url}"
match = re.match(r'^(socks5|http|https)://(?:([^:]+):([^@]+)@)?([^:]+):(\d+)$', proxy_url)
if not re.match(r'^(http|https|socks5h?|socks5)://', proxy_url): proxy_url = f"http://{proxy_url}"
match = re.match(r'^(socks5h?|socks5|http|https)://(?:([^:]+):([^@]+)@)?([^:]+):(\d+)$', proxy_url)
if match:
protocol, username, password, host, port = match.groups()
proxy_config = {'server': f'{protocol}://{host}:{port}'}
browser_protocol = "socks5" if protocol.startswith("socks5") else protocol
proxy_config = {'server': f'{browser_protocol}://{host}:{port}'}
if username and password:
proxy_config['username'] = username
proxy_config['password'] = password
@@ -229,8 +230,8 @@ def parse_proxy_url(proxy_url: str) -> Optional[Dict[str, str]]:
def normalize_browser_proxy_url(proxy_url: str) -> tuple[Optional[str], Optional[str]]:
"""将浏览器代理标准化为 Playwright/Chromium 可接受的格式。
Chromium 不支持带账号密码的 socks5 代理认证。
对于 `socks5://user:pass@host:port`,自动降级为 `http://user:pass@host:port`
Chromium 不支持带账号密码的 socks5/socks5h 代理认证。
对于 `socks5(h)://user:pass@host:port`,自动降级为 `http://user:pass@host:port`
方便兼容同时提供 HTTP/SOCKS5 双入口的代理服务商。
Returns:
@@ -240,27 +241,30 @@ def normalize_browser_proxy_url(proxy_url: str) -> tuple[Optional[str], Optional
return None, None
proxy_url = proxy_url.strip()
match = re.match(r'^(socks5|http|https)://(?:([^:]+):([^@]+)@)?([^:]+):(\d+)$', proxy_url)
match = re.match(r'^(socks5h?|socks5|http|https)://(?:([^:]+):([^@]+)@)?([^:]+):(\d+)$', proxy_url)
if not match:
if not re.match(r'^(http|https|socks5)://', proxy_url):
if not re.match(r'^(http|https|socks5h?|socks5)://', proxy_url):
proxy_url = f"http://{proxy_url}"
return proxy_url, None
protocol, username, password, host, port = match.groups()
if protocol == "socks5" and username and password:
if protocol.startswith("socks5") and username and password:
normalized = f"http://{username}:{password}@{host}:{port}"
warning = (
"检测到带认证的 SOCKS5 代理。"
f"检测到带认证的 {protocol.upper()} 代理。"
"Chromium 不支持 socks5 用户名密码认证,"
f"已自动改用 HTTP 代理启动浏览器: http://{host}:{port}"
)
return normalized, warning
if protocol == "socks5h":
proxy_url = f"socks5://{host}:{port}"
return proxy_url, None
def validate_browser_proxy_url(proxy_url: str) -> tuple[bool, str]:
if not proxy_url: return True, None
normalized_proxy_url, _ = normalize_browser_proxy_url(proxy_url)
normalized_proxy_url, _ = normalize_browser_proxy_url(proxy_url.strip())
parsed = parse_proxy_url(normalized_proxy_url)
if not parsed: return False, "代理格式错误"
return True, None

View File

@@ -8,6 +8,10 @@ import inspect
import time
import os
import sys
import re
import json
import shutil
import tempfile
import subprocess
from typing import Optional, Dict, Any, Iterable
@@ -139,6 +143,74 @@ else:
print(f"[BrowserCaptcha] ❌ nodriver 导入失败: {e}")
def _parse_proxy_url(proxy_url: str):
"""Parse a proxy URL into (protocol, host, port, username, password)."""
if not proxy_url:
return None, None, None, None, None
url = proxy_url.strip()
if not re.match(r'^(http|https|socks5h?|socks5)://', url):
url = f"http://{url}"
m = re.match(r'^(socks5h?|socks5|http|https)://(?:([^:]+):([^@]+)@)?([^:]+):(\d+)$', url)
if not m:
return None, None, None, None, None
protocol, username, password, host, port = m.groups()
if protocol == "socks5h":
protocol = "socks5"
return protocol, host, port, username, password
def _create_proxy_auth_extension(protocol: str, host: str, port: str, username: str, password: str) -> str:
"""Create a temporary Chrome extension directory for proxy authentication.
Returns the path to the extension directory."""
ext_dir = tempfile.mkdtemp(prefix="nodriver_proxy_auth_")
scheme_map = {"http": "http", "https": "https", "socks5": "socks5"}
scheme = scheme_map.get(protocol, "http")
manifest = {
"version": "1.0.0",
"manifest_version": 2,
"name": "Proxy Auth Helper",
"permissions": [
"proxy", "tabs", "unlimitedStorage", "storage",
"<all_urls>", "webRequest", "webRequestBlocking"
],
"background": {"scripts": ["background.js"]},
"minimum_chrome_version": "76.0.0"
}
background_js = (
"var config = {\n"
' mode: "fixed_servers",\n'
" rules: {\n"
" singleProxy: {\n"
f' scheme: "{scheme}",\n'
f' host: "{host}",\n'
f" port: parseInt({port})\n"
" },\n"
' bypassList: ["localhost"]\n'
" }\n"
"};\n"
'chrome.proxy.settings.set({value: config, scope: "regular"}, function(){});\n'
"chrome.webRequest.onAuthRequired.addListener(\n"
" function(details) {\n"
" return {\n"
" authCredentials: {\n"
f' username: "{username}",\n'
f' password: "{password}"\n'
" }\n"
" };\n"
" },\n"
' {urls: ["<all_urls>"]},\n'
" ['blocking']\n"
");\n"
)
with open(os.path.join(ext_dir, "manifest.json"), "w", encoding="utf-8") as f:
json.dump(manifest, f, indent=2)
with open(os.path.join(ext_dir, "background.js"), "w", encoding="utf-8") as f:
f.write(background_js)
return ext_dir
class ResidentTabInfo:
"""常驻标签页信息结构"""
def __init__(self, tab, slot_id: str, project_id: Optional[str] = None):
@@ -197,6 +269,8 @@ class BrowserCaptchaService:
self._recaptcha_ready = False # 向后兼容
self._last_fingerprint: Optional[Dict[str, Any]] = None
self._resident_error_streaks: dict[str, int] = {}
self._proxy_url: Optional[str] = None
self._proxy_ext_dir: Optional[str] = None
# 自定义站点打码常驻页(用于 score-test
self._custom_tabs: dict[str, Dict[str, Any]] = {}
self._custom_lock = asyncio.Lock()
@@ -615,6 +689,8 @@ class BrowserCaptchaService:
self.browser = None
self._initialized = False
self._last_fingerprint = None
self._cleanup_proxy_extension()
self._proxy_url = None
async with self._resident_lock:
resident_items = list(self._resident_tabs.values())
@@ -652,6 +728,40 @@ class BrowserCaptchaService:
f"[BrowserCaptcha] 停止浏览器实例失败 ({reason}): {e}"
)
async def _resolve_personal_proxy(self):
"""Read proxy config for personal captcha browser.
Priority: captcha browser_proxy > request proxy."""
if not self.db:
return None, None, None, None, None
try:
captcha_cfg = await self.db.get_captcha_config()
if captcha_cfg.browser_proxy_enabled and captcha_cfg.browser_proxy_url:
url = captcha_cfg.browser_proxy_url.strip()
if url:
debug_logger.log_info(f"[BrowserCaptcha] Personal 使用验证码代理: {url}")
return _parse_proxy_url(url)
except Exception as e:
debug_logger.log_warning(f"[BrowserCaptcha] 读取验证码代理配置失败: {e}")
try:
proxy_cfg = await self.db.get_proxy_config()
if proxy_cfg and proxy_cfg.enabled and proxy_cfg.proxy_url:
url = proxy_cfg.proxy_url.strip()
if url:
debug_logger.log_info(f"[BrowserCaptcha] Personal 回退使用请求代理: {url}")
return _parse_proxy_url(url)
except Exception as e:
debug_logger.log_warning(f"[BrowserCaptcha] 读取请求代理配置失败: {e}")
return None, None, None, None, None
def _cleanup_proxy_extension(self):
"""Remove temporary proxy auth extension directory."""
if self._proxy_ext_dir and os.path.isdir(self._proxy_ext_dir):
try:
shutil.rmtree(self._proxy_ext_dir, ignore_errors=True)
except Exception:
pass
self._proxy_ext_dir = None
async def initialize(self):
"""初始化 nodriver 浏览器"""
self._check_available()
@@ -690,27 +800,49 @@ class BrowserCaptchaService:
f"[BrowserCaptcha] 使用指定浏览器可执行文件: {browser_executable_path}"
)
# 解析代理配置
self._cleanup_proxy_extension()
self._proxy_url = None
protocol, host, port, username, password = await self._resolve_personal_proxy()
proxy_server_arg = None
if protocol and host and port:
if username and password:
self._proxy_ext_dir = _create_proxy_auth_extension(protocol, host, port, username, password)
debug_logger.log_info(
f"[BrowserCaptcha] Personal 代理需要认证,已创建扩展: {self._proxy_ext_dir}"
)
proxy_server_arg = f"--proxy-server={protocol}://{host}:{port}"
self._proxy_url = f"{protocol}://{host}:{port}"
debug_logger.log_info(f"[BrowserCaptcha] Personal 浏览器代理: {self._proxy_url}")
browser_args = [
'--disable-dev-shm-usage',
'--disable-setuid-sandbox',
'--disable-gpu',
'--window-size=1280,720',
'--window-position=3000,3000',
'--profile-directory=Default',
'--disable-background-networking',
'--disable-sync',
'--disable-translate',
'--disable-default-apps',
'--no-first-run',
'--no-default-browser-check',
]
if proxy_server_arg:
browser_args.append(proxy_server_arg)
if self._proxy_ext_dir:
browser_args.append(f'--load-extension={self._proxy_ext_dir}')
else:
browser_args.append('--disable-extensions')
# 启动 nodriver 浏览器(后台启动,不占用前台)
config = uc.Config(
headless=self.headless,
user_data_dir=self.user_data_dir,
browser_executable_path=browser_executable_path,
sandbox=False,
browser_args=[
'--disable-dev-shm-usage',
'--disable-setuid-sandbox',
'--disable-gpu',
'--window-size=1280,720',
'--window-position=3000,3000', # 窗口位置移到屏幕外
'--profile-directory=Default',
'--disable-extensions',
'--disable-background-networking',
'--disable-sync',
'--disable-translate',
'--disable-default-apps',
'--no-first-run',
'--no-default-browser-check',
]
browser_args=browser_args,
)
self.browser = await self._run_with_timeout(
uc.start(config),
@@ -1491,8 +1623,7 @@ class BrowserCaptchaService:
if not isinstance(fingerprint, dict):
return None
# personal 模式当前未单独配置浏览器代理,显式使用直连,避免与全局代理混淆。
result: Dict[str, Any] = {"proxy_url": None}
result: Dict[str, Any] = {"proxy_url": self._proxy_url}
for key in ("user_agent", "accept_language", "sec_ch_ua", "sec_ch_ua_mobile", "sec_ch_ua_platform"):
value = fingerprint.get(key)
if isinstance(value, str) and value:
@@ -2212,7 +2343,7 @@ class BrowserCaptchaService:
extracted_fingerprint = {
"user_agent": fallback_ua or "",
"accept_language": fallback_lang or "",
"proxy_url": None,
"proxy_url": self._proxy_url,
}
except Exception:
extracted_fingerprint = None

View File

@@ -47,10 +47,6 @@ class ProxyManager:
# 协议前缀格式
if line.startswith(("http://", "https://", "socks5://", "socks5h://")):
# socks5h 统一转 socks5便于后续处理
if line.startswith("socks5h://"):
line = "socks5://" + line[len("socks5h://"):]
# 已是标准 user:pass@host:port或 host:port
if "@" in line:
return line