fork: 个人用途可以在内置浏览器上登录对应的谷歌账号,测试可行

This commit is contained in:
dantynoel
2025-12-18 16:43:50 +08:00
parent 89cede985f
commit 3cb9e0405c
6 changed files with 421 additions and 59 deletions

2
.gitignore vendored
View File

@@ -52,3 +52,5 @@ logs.txt
*.tmp
*.bak
*.cache
browser_data

View File

@@ -12,7 +12,7 @@ max_poll_attempts = 200
[server]
host = "0.0.0.0"
port = 8000
port = 8106
[debug]
enabled = false

150
request.py Normal file
View File

@@ -0,0 +1,150 @@
import os
import json
import re
import base64
import aiohttp # Async test. Need to install
import asyncio
# --- 配置区域 ---
BASE_URL = os.getenv('GEMINI_FLOW2API_URL', 'http://127.0.0.1:8106')
BACKEND_URL = BASE_URL + "/v1/chat/completions"
API_KEY = os.getenv('GEMINI_FLOW2API_APIKEY', 'Bearer han1234')
if API_KEY is None:
raise ValueError('[gemini flow2api] api key not set')
MODEL_LANDSCAPE = "gemini-3.0-pro-image-landscape"
MODEL_PORTRAIT = "gemini-3.0-pro-image-portrait"
# 修改: 增加 model 参数,默认为 None
async def request_backend_generation(
prompt: str,
images: list[bytes] = None,
model: str = None) -> bytes | None:
"""
请求后端生成图片。
:param prompt: 提示词
:param images: 图片二进制列表
:param model: 指定模型名称 (可选)
:return: 成功返回图片bytes失败返回None
"""
# 更新token
images = images or []
# 逻辑: 如果未指定 model默认使用 Landscape
use_model = model if model else MODEL_LANDSCAPE
# 1. 构造 Payload
if images:
content_payload = [{"type": "text", "text": prompt}]
print(f"[Backend] 正在处理 {len(images)} 张图片输入...")
for img_bytes in images:
b64_str = base64.b64encode(img_bytes).decode('utf-8')
content_payload.append({
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{b64_str}"}
})
else:
content_payload = prompt
payload = {
"model": use_model, # 使用选定的模型
"messages": [{"role": "user", "content": content_payload}],
"stream": True
}
headers = {
"Authorization": API_KEY,
"Content-Type": "application/json"
}
image_url = None
print(f"[Backend] Model: {use_model} | 发起请求: {prompt[:20]}...")
try:
async with aiohttp.ClientSession() as session:
async with session.post(BACKEND_URL, json=payload, headers=headers, timeout=120) as response:
if response.status != 200:
err_text = await response.text()
content = response.content
print(f"[Backend Error] Status {response.status}: {err_text} {content}")
raise Exception(f"API Error: {response.status}: {err_text}")
async for line in response.content:
line_str = line.decode('utf-8').strip()
if line_str.startswith('{"error'):
chunk = json.loads(data_str)
delta = chunk.get("choices", [{}])[0].get("delta", {})
msg = delta['reasoning_content']
if '401' in msg:
msg += '\nAccess Token 已失效,需重新配置。'
elif '400' in msg:
msg += '\n返回内容被拦截。'
raise Exception(msg)
if not line_str or not line_str.startswith('data: '):
continue
data_str = line_str[6:]
if data_str == '[DONE]':
break
try:
chunk = json.loads(data_str)
delta = chunk.get("choices", [{}])[0].get("delta", {})
# 打印思考过程
if "reasoning_content" in delta:
print(delta['reasoning_content'], end="", flush=True)
# 提取内容中的图片链接
if "content" in delta:
content_text = delta["content"]
img_match = re.search(r'!\[.*?\]\((.*?)\)', content_text)
if img_match:
image_url = img_match.group(1)
print(f"\n[Backend] 捕获图片链接: {image_url}")
except json.JSONDecodeError:
continue
# 3. 下载生成的图片
if image_url:
async with session.get(image_url) as img_resp:
if img_resp.status == 200:
image_bytes = await img_resp.read()
return image_bytes
else:
print(f"[Backend Error] 图片下载失败: {img_resp.status}")
except Exception as e:
print(f"[Backend Exception] {e}")
raise e
return None
if __name__ == '__main__':
async def main():
print("=== AI 绘图接口测试 ===")
user_prompt = input("请输入提示词 (例如 '一只猫'): ").strip()
if not user_prompt:
user_prompt = "A cute cat in the garden"
print(f"正在请求: {user_prompt}")
# 这里的 images 传空列表用于测试文生图
# 如果想测试图生图,你需要手动读取本地文件:
# with open("output_test.jpg", "rb") as f: img_data = f.read()
# result = await request_backend_generation(user_prompt, [img_data])
result = await request_backend_generation(user_prompt)
if result:
filename = "output_test.jpg"
with open(filename, "wb") as f:
f.write(result)
print(f"\n[Success] 图片已保存为 {filename},大小: {len(result)} bytes")
else:
print("\n[Failed] 生成失败")
# 运行测试
if os.name == 'nt': # Windows 兼容性
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())

View File

@@ -74,7 +74,12 @@ async def lifespan(app: FastAPI):
# Initialize browser captcha service if needed
browser_service = None
if captcha_config.captcha_method == "browser":
if True:
from .services.browser_captcha_personal import BrowserCaptchaService
browser_service = await BrowserCaptchaService.get_instance(db)
await browser_service.open_login_window()
print("✓ Browser captcha service initialized (webui mode)")
elif captcha_config.captcha_method == "browser":
from .services.browser_captcha import BrowserCaptchaService
browser_service = await BrowserCaptchaService.get_instance(db)
print("✓ Browser captcha service initialized (headless mode)")

View File

@@ -0,0 +1,197 @@
import asyncio
import time
import re
import os
from typing import Optional, Dict
from playwright.async_api import async_playwright, BrowserContext, Page
from ..core.logger import debug_logger
# ... (保持原来的 parse_proxy_url 和 validate_browser_proxy_url 函数不变) ...
def parse_proxy_url(proxy_url: str) -> Optional[Dict[str, str]]:
"""解析代理URL分离协议、主机、端口、认证信息"""
proxy_pattern = r'^(socks5|http|https)://(?:([^:]+):([^@]+)@)?([^:]+):(\d+)$'
match = re.match(proxy_pattern, proxy_url)
if match:
protocol, username, password, host, port = match.groups()
proxy_config = {'server': f'{protocol}://{host}:{port}'}
if username and password:
proxy_config['username'] = username
proxy_config['password'] = password
return proxy_config
return None
class BrowserCaptchaService:
"""浏览器自动化获取 reCAPTCHA token持久化有头模式"""
_instance: Optional['BrowserCaptchaService'] = None
_lock = asyncio.Lock()
def __init__(self, db=None):
"""初始化服务"""
# === 修改点 1: 设置为有头模式 ===
self.headless = False
self.playwright = None
# 注意: 持久化模式下,我们操作的是 context 而不是 browser
self.context: Optional[BrowserContext] = None
self._initialized = False
self.website_key = "6LdsFiUsAAAAAIjVDZcuLhaHiDn5nnHVXVRQGeMV"
self.db = db
# === 修改点 2: 指定本地数据存储目录 ===
# 这会在脚本运行目录下生成 browser_data 文件夹,用于保存你的登录状态
self.user_data_dir = os.path.join(os.getcwd(), "browser_data")
@classmethod
async def get_instance(cls, db=None) -> 'BrowserCaptchaService':
if cls._instance is None:
async with cls._lock:
if cls._instance is None:
cls._instance = cls(db)
# 首次调用不强制初始化,等待 get_token 时懒加载或者可以在这里await
return cls._instance
async def initialize(self):
"""初始化持久化浏览器上下文"""
if self._initialized and self.context:
return
try:
proxy_url = None
if self.db:
captcha_config = await self.db.get_captcha_config()
if captcha_config.browser_proxy_enabled and captcha_config.browser_proxy_url:
proxy_url = captcha_config.browser_proxy_url
debug_logger.log_info(f"[BrowserCaptcha] 正在启动浏览器 (用户数据目录: {self.user_data_dir})...")
self.playwright = await async_playwright().start()
# 配置启动参数
launch_options = {
'headless': self.headless,
'user_data_dir': self.user_data_dir, # 指定数据目录
'viewport': {'width': 1280, 'height': 720}, # 设置默认窗口大小
'args': [
'--disable-blink-features=AutomationControlled',
'--disable-infobars',
'--no-sandbox',
'--disable-setuid-sandbox',
]
}
# 代理配置
if proxy_url:
proxy_config = parse_proxy_url(proxy_url)
if proxy_config:
launch_options['proxy'] = proxy_config
debug_logger.log_info(f"[BrowserCaptcha] 使用代理: {proxy_config['server']}")
# === 修改点 3: 使用 launch_persistent_context ===
# 这会启动一个带有状态的浏览器窗口
self.context = await self.playwright.chromium.launch_persistent_context(**launch_options)
# 设置默认超时
self.context.set_default_timeout(30000)
self._initialized = True
debug_logger.log_info(f"[BrowserCaptcha] ✅ 浏览器已启动 (Profile: {self.user_data_dir})")
except Exception as e:
debug_logger.log_error(f"[BrowserCaptcha] ❌ 浏览器启动失败: {str(e)}")
raise
async def get_token(self, project_id: str) -> Optional[str]:
"""获取 reCAPTCHA token"""
# 确保浏览器已启动
if not self._initialized or not self.context:
await self.initialize()
start_time = time.time()
page: Optional[Page] = None
try:
# === 修改点 4: 在现有上下文中新建标签页,而不是新建上下文 ===
# 这样可以复用该上下文中已保存的 Cookie (你的登录状态)
page = await self.context.new_page()
website_url = f"https://labs.google/fx/tools/flow/project/{project_id}"
debug_logger.log_info(f"[BrowserCaptcha] 访问页面: {website_url}")
# 访问页面
try:
await page.goto(website_url, wait_until="domcontentloaded")
except Exception as e:
debug_logger.log_warning(f"[BrowserCaptcha] 页面加载警告: {str(e)}")
# --- 关键点:如果需要人工介入 ---
# 你可以在这里加入一段逻辑,如果是第一次运行,或者检测到未登录,
# 可以暂停脚本,等你手动操作完再继续。
# 例如: await asyncio.sleep(30)
# ... (中间注入脚本和执行 reCAPTCHA 的代码逻辑与原版完全一致,此处省略以节省篇幅) ...
# ... 请将原代码中从 "检查并注入 reCAPTCHA v3 脚本" 到 token 获取部分的代码复制到这里 ...
# 这里为了演示,简写注入逻辑(请保留你原有的完整注入逻辑):
script_loaded = await page.evaluate("() => { return !!(window.grecaptcha && window.grecaptcha.execute); }")
if not script_loaded:
await page.evaluate(f"""
() => {{
const script = document.createElement('script');
script.src = 'https://www.google.com/recaptcha/api.js?render={self.website_key}';
script.async = true; script.defer = true;
document.head.appendChild(script);
}}
""")
# 等待加载... (保留你原有的等待循环)
await page.wait_for_timeout(2000)
# 执行获取 Token (保留你原有的 execute 逻辑)
token = await page.evaluate(f"""
async () => {{
try {{
return await window.grecaptcha.execute('{self.website_key}', {{ action: 'FLOW_GENERATION' }});
}} catch (e) {{ return null; }}
}}
""")
if token:
debug_logger.log_info(f"[BrowserCaptcha] ✅ Token获取成功")
return token
else:
debug_logger.log_error("[BrowserCaptcha] Token获取失败")
return None
except Exception as e:
debug_logger.log_error(f"[BrowserCaptcha] 异常: {str(e)}")
return None
finally:
# === 修改点 5: 只关闭 Page (标签页),不关闭 Context (浏览器窗口) ===
if page:
try:
await page.close()
except:
pass
async def close(self):
"""完全关闭浏览器(清理资源时调用)"""
try:
if self.context:
await self.context.close() # 这会关闭整个浏览器窗口
self.context = None
if self.playwright:
await self.playwright.stop()
self.playwright = None
self._initialized = False
debug_logger.log_info("[BrowserCaptcha] 浏览器服务已关闭")
except Exception as e:
debug_logger.log_error(f"[BrowserCaptcha] 关闭异常: {str(e)}")
# 增加一个辅助方法,用于手动登录
async def open_login_window(self):
"""调用此方法打开一个永久窗口供你登录Google"""
await self.initialize()
page = await self.context.new_page()
await page.goto("https://accounts.google.com/")
print("请在打开的浏览器中登录账号。登录完成后,无需关闭浏览器,脚本下次运行时会自动使用此状态。")

View File

@@ -687,8 +687,16 @@ class FlowClient:
"""获取reCAPTCHA token - 支持两种方式"""
captcha_method = config.captcha_method
if True:
try:
from .browser_captcha_personal import BrowserCaptchaService
service = await BrowserCaptchaService.get_instance(self.proxy_manager)
return await service.get_token(project_id)
except Exception as e:
debug_logger.log_error(f"[reCAPTCHA Browser] error: {str(e)}")
return None
# 浏览器打码
if captcha_method == "browser":
elif captcha_method == "browser":
try:
from .browser_captcha import BrowserCaptchaService
service = await BrowserCaptchaService.get_instance(self.proxy_manager)
@@ -696,61 +704,61 @@ class FlowClient:
except Exception as e:
debug_logger.log_error(f"[reCAPTCHA Browser] error: {str(e)}")
return None
# YesCaptcha打码
client_key = config.yescaptcha_api_key
if not client_key:
debug_logger.log_info("[reCAPTCHA] API key not configured, skipping")
return None
website_key = "6LdsFiUsAAAAAIjVDZcuLhaHiDn5nnHVXVRQGeMV"
website_url = f"https://labs.google/fx/tools/flow/project/{project_id}"
base_url = config.yescaptcha_base_url
page_action = "FLOW_GENERATION"
try:
async with AsyncSession() as session:
create_url = f"{base_url}/createTask"
create_data = {
"clientKey": client_key,
"task": {
"websiteURL": website_url,
"websiteKey": website_key,
"type": "RecaptchaV3TaskProxylessM1",
"pageAction": page_action
}
}
result = await session.post(create_url, json=create_data, impersonate="chrome110")
result_json = result.json()
task_id = result_json.get('taskId')
debug_logger.log_info(f"[reCAPTCHA] created task_id: {task_id}")
if not task_id:
return None
get_url = f"{base_url}/getTaskResult"
for i in range(40):
get_data = {
"clientKey": client_key,
"taskId": task_id
}
result = await session.post(get_url, json=get_data, impersonate="chrome110")
result_json = result.json()
debug_logger.log_info(f"[reCAPTCHA] polling #{i+1}: {result_json}")
solution = result_json.get('solution', {})
response = solution.get('gRecaptchaResponse')
if response:
return response
time.sleep(3)
else:
# YesCaptcha打码
client_key = config.yescaptcha_api_key
if not client_key:
debug_logger.log_info("[reCAPTCHA] API key not configured, skipping")
return None
except Exception as e:
debug_logger.log_error(f"[reCAPTCHA] error: {str(e)}")
return None
website_key = "6LdsFiUsAAAAAIjVDZcuLhaHiDn5nnHVXVRQGeMV"
website_url = f"https://labs.google/fx/tools/flow/project/{project_id}"
base_url = config.yescaptcha_base_url
page_action = "FLOW_GENERATION"
try:
async with AsyncSession() as session:
create_url = f"{base_url}/createTask"
create_data = {
"clientKey": client_key,
"task": {
"websiteURL": website_url,
"websiteKey": website_key,
"type": "RecaptchaV3TaskProxylessM1",
"pageAction": page_action
}
}
result = await session.post(create_url, json=create_data, impersonate="chrome110")
result_json = result.json()
task_id = result_json.get('taskId')
debug_logger.log_info(f"[reCAPTCHA] created task_id: {task_id}")
if not task_id:
return None
get_url = f"{base_url}/getTaskResult"
for i in range(40):
get_data = {
"clientKey": client_key,
"taskId": task_id
}
result = await session.post(get_url, json=get_data, impersonate="chrome110")
result_json = result.json()
debug_logger.log_info(f"[reCAPTCHA] polling #{i+1}: {result_json}")
solution = result_json.get('solution', {})
response = solution.get('gRecaptchaResponse')
if response:
return response
time.sleep(3)
return None
except Exception as e:
debug_logger.log_error(f"[reCAPTCHA] error: {str(e)}")
return None