fix(captcha): 修复内置打码 fresh 重启竞态

This commit is contained in:
genz27
2026-05-05 20:47:55 +08:00
parent cc92e1c07a
commit e223236c9b
2 changed files with 448 additions and 25 deletions

View File

@@ -546,11 +546,11 @@ def _build_personal_browser_args(
'--safebrowsing-disable-auto-update',
'--no-first-run',
'--no-default-browser-check',
'--no-startup-window',
'--no-zygote',
]
if headless:
browser_args.append('--no-startup-window')
browser_args.append('--window-position=3000,3000')
else:
browser_args.append('--window-position=80,80')
@@ -857,6 +857,14 @@ def _patch_nodriver_browser_instance(browser_instance):
raise
except Exception as e:
if _is_runtime_disconnect_error(e):
try:
setattr(self, "_flow2api_runtime_disconnected", True)
except Exception:
pass
try:
self.targets = []
except Exception:
pass
log_message = (
f"[BrowserCaptcha] nodriver.update_targets 在浏览器断连后退出: "
f"{type(e).__name__}: {e}"
@@ -871,6 +879,10 @@ def _patch_nodriver_browser_instance(browser_instance):
_patch_nodriver_connection_instance(getattr(self, "connection", None))
for target in list(getattr(self, "targets", []) or []):
_patch_nodriver_connection_instance(target)
try:
setattr(self, "_flow2api_runtime_disconnected", False)
except Exception:
pass
return result
browser_instance.update_targets = types.MethodType(patched_update_targets, browser_instance)
@@ -1423,6 +1435,7 @@ class BrowserCaptchaService:
self._successful_solves_since_browser_start = 0
self._fresh_profile_restart_pending = False
self._fresh_profile_restart_task: Optional[asyncio.Task] = None
self._fresh_profile_restart_force_pending = False
self._browser_launch_failure_streak = 0
self._browser_launch_cooldown_until = 0.0
self._browser_launch_last_error = ""
@@ -1926,6 +1939,7 @@ class BrowserCaptchaService:
def _reset_browser_rotation_budget(self) -> None:
self._successful_solves_since_browser_start = 0
self._fresh_profile_restart_pending = False
self._fresh_profile_restart_force_pending = False
self._fresh_profile_restart_pending_reason = ""
def _mark_runtime_active(self) -> None:
@@ -1952,11 +1966,24 @@ class BrowserCaptchaService:
)
debug_logger.log_warning(
"[BrowserCaptcha] 浏览器成功打码次数达到 fresh profile 轮换阈值,"
f"后续请求会等待并发清空后执行全新无状态浏览器重启 "
f"后续新取码会先等待当前并发清空并完成全新无状态浏览器重启 "
f"(count={current_count}, threshold={threshold}, reason={self._fresh_profile_restart_pending_reason})"
)
return current_count
def _mark_fresh_profile_restart_pending(self, *, reason: str, force: bool = False) -> None:
normalized_reason = str(reason or "manual").strip() or "manual"
already_pending = bool(self._fresh_profile_restart_pending)
self._fresh_profile_restart_pending = True
if force:
self._fresh_profile_restart_force_pending = True
self._fresh_profile_restart_pending_reason = normalized_reason
if not already_pending:
debug_logger.log_warning(
"[BrowserCaptcha] 已请求 fresh profile 轮换,"
f"后续新取码会等待当前并发清空并完成重启 (force={force}, reason={normalized_reason})"
)
async def _has_active_browser_work(self) -> bool:
if (
self._legacy_lock.locked()
@@ -1993,8 +2020,10 @@ class BrowserCaptchaService:
return False
threshold = max(0, int(self._fresh_profile_restart_every_n_solves or 0))
if threshold <= 0:
force_restart = bool(getattr(self, "_fresh_profile_restart_force_pending", False))
if threshold <= 0 and not force_restart:
self._fresh_profile_restart_pending = False
self._fresh_profile_restart_force_pending = False
self._fresh_profile_restart_pending_reason = ""
return False
@@ -2041,10 +2070,86 @@ class BrowserCaptchaService:
self._fresh_profile_restart_task = asyncio.create_task(_runner())
debug_logger.log_info(
f"[BrowserCaptcha] fresh profile 轮换已转入后台等待空闲执行 (project_id={project_id}, source={source})"
f"[BrowserCaptcha] fresh profile 轮换已计划执行 (project_id={project_id}, source={source})"
)
return False
async def _wait_for_pending_fresh_profile_restart_before_solve(
self,
project_id: str,
token_id: Optional[int] = None,
*,
source: str,
) -> bool:
"""达到 fresh 轮换阈值后,阻止新取码继续复用旧 resident tab。"""
waited = False
current_task = asyncio.current_task()
while True:
existing_task = getattr(self, "_fresh_profile_restart_task", None)
if existing_task is not None and not existing_task.done() and existing_task is not current_task:
if not waited:
waited = True
debug_logger.log_warning(
"[BrowserCaptcha] fresh profile 轮换正在执行/等待,"
f"当前取码先等待重启完成再分配标签页 (project_id={project_id}, source={source})"
)
try:
await asyncio.shield(existing_task)
except asyncio.CancelledError:
raise
except Exception as e:
debug_logger.log_warning(
f"[BrowserCaptcha] 等待 fresh profile 轮换任务异常 (project_id={project_id}, source={source}): {e}"
)
if not self._fresh_profile_restart_pending:
return True
await asyncio.sleep(0)
continue
if not self._fresh_profile_restart_pending:
return waited
threshold = max(0, int(self._fresh_profile_restart_every_n_solves or 0))
force_restart = bool(getattr(self, "_fresh_profile_restart_force_pending", False))
if threshold <= 0 and not force_restart:
self._fresh_profile_restart_pending = False
self._fresh_profile_restart_force_pending = False
self._fresh_profile_restart_pending_reason = ""
return waited
if not waited:
waited = True
debug_logger.log_warning(
"[BrowserCaptcha] fresh profile 轮换已到阈值,"
f"当前取码先触发并等待重启完成 (project_id={project_id}, source={source}, "
f"reason={self._fresh_profile_restart_pending_reason})"
)
await self._maybe_execute_pending_fresh_profile_restart(
project_id,
token_id=token_id,
source=source,
)
scheduled_task = getattr(self, "_fresh_profile_restart_task", None)
if scheduled_task is None or scheduled_task.done() or scheduled_task is current_task:
await asyncio.sleep(0.05)
continue
try:
await asyncio.shield(scheduled_task)
except asyncio.CancelledError:
raise
except Exception as e:
debug_logger.log_warning(
f"[BrowserCaptcha] fresh profile 轮换任务执行异常 (project_id={project_id}, source={source}): {e}"
)
if not self._fresh_profile_restart_pending:
return True
await asyncio.sleep(0)
def _requires_virtual_display(self) -> bool:
"""仅在显式有头模式下要求 Docker/Linux 提供 DISPLAY/虚拟显示。"""
return bool(IS_DOCKER and os.name == "posix" and not self.headless)
@@ -2103,7 +2208,7 @@ class BrowserCaptchaService:
if not (self._initialized and self.browser and self._last_health_probe_ok):
return False
try:
if self.browser.stopped:
if self.browser.stopped or getattr(self.browser, "_flow2api_runtime_disconnected", False):
return False
except Exception:
return False
@@ -2349,7 +2454,7 @@ class BrowserCaptchaService:
def _is_browser_runtime_error(self, error: Any) -> bool:
"""识别浏览器运行态已损坏/已关闭的典型异常。"""
return _is_runtime_disconnect_error(error)
return _is_runtime_disconnect_error(error) or self._is_no_browser_window_error(error)
@staticmethod
def _is_no_browser_window_error(error: Any) -> bool:
@@ -2419,6 +2524,9 @@ class BrowserCaptchaService:
if not self.browser:
self._invalidate_browser_health()
return False
if getattr(self.browser, "_flow2api_runtime_disconnected", False):
self._invalidate_browser_health()
return False
if self._is_browser_health_fresh():
return True
@@ -4180,6 +4288,93 @@ class BrowserCaptchaService:
await self._apply_tab_startup_spoofs(tab, label=f"{label}:host_page")
return tab
async def _create_default_context_target_tab(
self,
url: str,
*,
label: str,
timeout_seconds: Optional[float] = None,
prefer_new_tab: bool = True,
):
browser = self.browser
if browser is None or getattr(browser, "stopped", False):
raise RuntimeError("browser runtime unavailable")
from nodriver import cdp
timeout = timeout_seconds or self._navigation_timeout_seconds
target_url = str(url or "").strip() or PERSONAL_COOKIE_PREBIND_URL
initial_url = (
PERSONAL_COOKIE_PREBIND_URL
if target_url.lower() != PERSONAL_COOKIE_PREBIND_URL
else target_url
)
attempts = [False, True] if prefer_new_tab else [True, False]
last_error = None
for new_window in attempts:
try:
target_id = await self._run_with_timeout(
browser.connection.send(
cdp.target.create_target(
initial_url,
new_window=new_window,
enable_begin_frame_control=True,
)
),
timeout_seconds=timeout,
label=f"{label}:create_target:{'window' if new_window else 'tab'}",
)
except Exception as create_error:
last_error = create_error
if self._is_no_browser_window_error(create_error) and not new_window:
debug_logger.log_warning(
f"[BrowserCaptcha] CDP 新标签创建提示无宿主窗口,改用新窗口重试 ({label}): {create_error}"
)
continue
raise
for _ in range(20):
try:
await browser.update_targets()
except Exception as update_error:
if self._is_browser_runtime_error(update_error):
raise
tab = next(
(
item
for item in getattr(browser, "targets", []) or []
if getattr(item, "type_", None) == "page"
and getattr(item, "target_id", None) == target_id
),
None,
)
if tab is not None:
try:
tab._browser = browser
except Exception:
pass
await self._apply_tab_startup_spoofs(
tab,
label=f"{label}:default_context_tab",
target_url=target_url,
)
if target_url.lower() != PERSONAL_COOKIE_PREBIND_URL:
await self._tab_get(
tab,
target_url,
label=f"{label}:navigate_target",
timeout_seconds=timeout,
)
return tab
await asyncio.sleep(0.15)
last_error = RuntimeError(f"target not found after create_target (target_id={target_id})")
raise last_error or RuntimeError("failed to create browser target")
async def _open_visible_browser_tab(
self,
url: str,
@@ -4221,12 +4416,11 @@ class BrowserCaptchaService:
debug_logger.log_warning(
f"[BrowserCaptcha] 创建有头宿主窗口失败,将直接尝试打开目标标签页 ({label}): {e}"
)
return await self._browser_get(
return await self._create_default_context_target_tab(
url,
label=label,
new_tab=has_page_targets,
new_window=not has_page_targets,
timeout_seconds=timeout_seconds,
prefer_new_tab=has_page_targets,
)
async def _cleanup_startup_browser_pages(self):
@@ -5414,6 +5608,12 @@ class BrowserCaptchaService:
timeout_seconds=self._navigation_timeout_seconds,
)
except Exception as e:
if self._is_browser_runtime_error(e):
self._mark_browser_health(False)
debug_logger.log_warning(
f"[BrowserCaptcha] 打开 labs 引导页时浏览器运行态断开 ({label}): {e}"
)
raise
debug_logger.log_warning(f"[BrowserCaptcha] 打开 labs 引导页失败 ({label}): {e}")
return await _confirm_labs_surface(str(e), stage="navigate_timeout")
@@ -6551,7 +6751,8 @@ class BrowserCaptchaService:
self._resident_rebuild_tasks.clear()
self._resident_recovery_tasks.clear()
self._resident_warmup_task = None
self._fresh_profile_restart_task = None
if fresh_restart_task is not current_task:
self._fresh_profile_restart_task = None
if not tasks_to_cancel:
return
@@ -6643,13 +6844,14 @@ class BrowserCaptchaService:
return None, None, None, None, None
try:
captcha_cfg = await self.db.get_captcha_config()
if captcha_cfg.browser_proxy_pool:
browser_proxy_pool = str(getattr(captcha_cfg, "browser_proxy_pool", "") or "").strip()
if browser_proxy_pool:
pooled_proxy = await self.db.pick_browser_proxy_from_pool()
if pooled_proxy:
debug_logger.log_info(f"[BrowserCaptcha] Personal 使用验证码代理池: {pooled_proxy}")
return _parse_proxy_url(pooled_proxy)
if captcha_cfg.browser_proxy_enabled and captcha_cfg.browser_proxy_url:
url = captcha_cfg.browser_proxy_url.strip()
if getattr(captcha_cfg, "browser_proxy_enabled", False) and getattr(captcha_cfg, "browser_proxy_url", None):
url = str(getattr(captcha_cfg, "browser_proxy_url", "") or "").strip()
if url:
debug_logger.log_info(f"[BrowserCaptcha] Personal 使用验证码代理: {url}")
return _parse_proxy_url(url)
@@ -7389,6 +7591,10 @@ class BrowserCaptchaService:
debug_logger.log_warning("[BrowserCaptcha] 浏览器已停止,准备重新初始化...")
self._mark_browser_health(False)
browser_needs_restart = True
elif getattr(self.browser, "_flow2api_runtime_disconnected", False):
debug_logger.log_warning("[BrowserCaptcha] 浏览器连接已标记断开,准备重新初始化...")
self._mark_browser_health(False)
browser_needs_restart = True
elif self._is_browser_health_fresh():
self._mark_runtime_active()
if self._idle_reaper_task is None or self._idle_reaper_task.done():
@@ -7718,8 +7924,10 @@ class BrowserCaptchaService:
)
if ready_state == "complete":
return True
except Exception:
pass
except Exception as e:
if self._is_browser_runtime_error(e):
self._mark_browser_health(False)
raise
await asyncio.sleep(interval_seconds)
return False
@@ -7983,6 +8191,20 @@ class BrowserCaptchaService:
fresh_profile: bool = False,
) -> bool:
async with self._runtime_recover_lock:
if fresh_profile and await self._has_active_browser_work():
self._mark_fresh_profile_restart_pending(
reason=f"fresh_restart_deferred:{project_id}",
force=True,
)
await self._maybe_execute_pending_fresh_profile_restart(
project_id,
token_id=token_id,
source="fresh_restart_deferred_active_work",
)
debug_logger.log_warning(
f"[BrowserCaptcha] project_id={project_id} fresh profile 重启已延后到当前并发 drain 后立即执行"
)
return True
if not fresh_profile and self._was_runtime_restarted_recently():
try:
if await self._probe_browser_runtime():
@@ -8024,7 +8246,7 @@ class BrowserCaptchaService:
restart_reason = f"restart_project:{project_id}"
if fresh_profile:
debug_logger.log_warning(
f"[BrowserCaptcha] project_id={project_id} 命中特定 Flow 风控错误,准备执行全新浏览器冷启动"
f"[BrowserCaptcha] project_id={project_id} 准备执行 fresh profile 浏览器冷启动"
)
restart_reason = f"fresh_restart_project:{project_id}"
else:
@@ -8115,12 +8337,22 @@ class BrowserCaptchaService:
if self._is_force_fresh_browser_restart_error(error_text):
debug_logger.log_warning(
f"[BrowserCaptcha] project_id={project_id}, slot={resolved_slot_id} "
"命中特定 Flow 风控错误,直接销毁浏览器运行态并删除 profile 后冷启动"
"命中特定 Flow 风控错误,已标记当前 slot 不再复用;浏览器 fresh profile 轮换会等待当前并发 drain 后立即执行"
)
await self._restart_browser_for_project(
if resident_info is not None:
await self._mark_resident_slot_unavailable(
resolved_slot_id,
resident_info,
reason=f"flow_force_fresh:{project_id}:{streak}",
)
self._mark_fresh_profile_restart_pending(
reason=f"flow_force_fresh:{project_id}:{resolved_slot_id}:streak={streak}",
force=True,
)
await self._maybe_execute_pending_fresh_profile_restart(
project_id,
token_id=token_id,
fresh_profile=True,
source="flow_force_fresh_error",
)
return
@@ -8300,6 +8532,12 @@ class BrowserCaptchaService:
await tab.sleep(poll_interval_seconds)
except Exception as e:
if self._is_browser_runtime_error(e):
self._mark_browser_health(False)
debug_logger.log_warning(
f"[BrowserCaptcha] 检查 reCAPTCHA 时浏览器运行态断开,停止等待并触发恢复: {e}"
)
raise
debug_logger.log_warning(f"[BrowserCaptcha] 检查 reCAPTCHA 时异常: {e}")
await tab.sleep(0.5)
@@ -8847,9 +9085,21 @@ class BrowserCaptchaService:
)
self._mark_runtime_active()
await self._wait_for_pending_fresh_profile_restart_before_solve(
project_id,
token_id=token_id,
source="get_token_pre_initialize",
)
# 确保浏览器已初始化
await self.initialize()
await self._wait_for_pending_fresh_profile_restart_before_solve(
project_id,
token_id=token_id,
source="get_token_pre_resident_pick",
)
reserved_slot_id: Optional[str] = None
async def release_reserved_slot():
@@ -8863,12 +9113,37 @@ class BrowserCaptchaService:
f"[BrowserCaptcha] 开始从共享打码池获取标签页 (project: {project_id}, token_id={token_id}, 当前: {len(self._resident_tabs)}/{self._max_resident_tabs})"
)
resident_pick_started_at = time.monotonic()
slot_id, resident_info = await self._ensure_resident_tab(
project_id,
token_id=token_id,
reserve_for_solve=True,
return_slot_key=True,
)
try:
slot_id, resident_info = await self._ensure_resident_tab(
project_id,
token_id=token_id,
reserve_for_solve=True,
return_slot_key=True,
)
except Exception as e:
if not self._is_browser_runtime_error(e):
raise
self._mark_browser_health(False)
debug_logger.log_warning(
f"[BrowserCaptcha] 共享标签页分配时浏览器运行态断开,立即重启恢复 (project: {project_id}, token_id={token_id}): {e}"
)
slot_id, resident_info = None, None
if await self._recover_browser_runtime(project_id, reason="ensure_resident_tab_runtime_error"):
try:
slot_id, resident_info = await self._ensure_resident_tab(
project_id,
token_id=token_id,
reserve_for_solve=True,
return_slot_key=True,
)
except Exception as retry_error:
if not self._is_browser_runtime_error(retry_error):
raise
self._mark_browser_health(False)
debug_logger.log_warning(
f"[BrowserCaptcha] 浏览器恢复后分配共享标签页仍断开 (project: {project_id}, token_id={token_id}): {retry_error}"
)
slot_id, resident_info = None, None
reserved_slot_id = slot_id or None
if resident_info is None or not slot_id:
if await self._wait_for_active_resident_rebuild(timeout_seconds=min(20.0, self._solve_timeout_seconds)):
@@ -9257,6 +9532,12 @@ class BrowserCaptchaService:
debug_logger.log_info(f"[BrowserCaptcha] 页面已加载")
break
except Exception as e:
if self._is_browser_runtime_error(e):
self._mark_browser_health(False)
debug_logger.log_warning(
f"[BrowserCaptcha] 等待页面时浏览器运行态断开 (slot={slot_id}, project={project_id}, token_id={token_id}): {e}"
)
raise
debug_logger.log_warning(f"[BrowserCaptcha] 等待页面异常: {e},重试 {retry + 1}/10...")
await asyncio.sleep(0.3) # 减少重试间隔
@@ -9320,6 +9601,12 @@ class BrowserCaptchaService:
if tab is not None:
await self._dispose_browser_context_quietly(browser_context_id)
await self._close_tab_quietly(tab)
if self._is_browser_runtime_error(e):
self._mark_browser_health(False)
debug_logger.log_warning(
f"[BrowserCaptcha] 创建共享常驻标签页时浏览器运行态断开 (slot={slot_id}, project={project_id}, token_id={token_id}): {e}"
)
raise
debug_logger.log_error(
f"[BrowserCaptcha] 创建共享常驻标签页异常 (slot={slot_id}, project={project_id}, token_id={token_id}): {e}"
)
@@ -10462,6 +10749,7 @@ class _PersonalBrowserPoolService:
getattr(worker, "_initialized", False)
and browser_instance
and not getattr(browser_instance, "stopped", False)
and not getattr(browser_instance, "_flow2api_runtime_disconnected", False)
)
@staticmethod

View File

@@ -20,6 +20,9 @@ class _ClosableFakeTab:
async def close(self):
self.closed = True
async def sleep(self, _seconds):
return None
class BrowserCaptchaPersonalTests(unittest.IsolatedAsyncioTestCase):
def setUp(self):
@@ -107,6 +110,138 @@ class BrowserCaptchaPersonalTests(unittest.IsolatedAsyncioTestCase):
self.service._restart_browser_for_project_unlocked.assert_not_awaited()
self.service._ensure_resident_tab.assert_awaited_once()
async def test_wait_for_recaptcha_raises_on_runtime_disconnect(self):
tab = _ClosableFakeTab()
runtime_error = ConnectionRefusedError(1225, "远程计算机拒绝网络连接。")
self.service._inject_recaptcha_bootstrap_script = AsyncMock(return_value="remote")
self.service._tab_evaluate = AsyncMock(side_effect=runtime_error)
with self.assertRaises(ConnectionRefusedError):
await self.service._wait_for_recaptcha(tab)
self.assertFalse(self.service._last_health_probe_ok)
self.assertEqual(self.service._tab_evaluate.await_count, 1)
async def test_force_fresh_flow_error_defers_sync_browser_restart_until_drain(self):
tab = _ClosableFakeTab()
resident_info = ResidentTabInfo(
tab=tab,
slot_id="slot-1",
project_id="project-1",
token_id=1,
)
resident_info.recaptcha_ready = True
self.service.browser = types.SimpleNamespace(stopped=False)
self.service._initialized = True
self.service._resident_tabs["slot-1"] = resident_info
self.service._project_resident_affinity["project-1"] = "slot-1"
self.service._token_resident_affinity["1"] = "slot-1"
self.service._maybe_execute_pending_fresh_profile_restart = AsyncMock(return_value=False)
self.service._restart_browser_for_project = AsyncMock(return_value=True)
await self.service.report_flow_error(
"project-1",
"reCAPTCHA 验证失败",
error_message="Flow API request failed: PUBLIC_ERROR_UNUSUAL_ACTIVITY: reCAPTCHA evaluation failed",
token_id=1,
slot_id="slot-1",
)
self.assertIn("slot-1", self.service._resident_unavailable_slots)
self.assertTrue(self.service._fresh_profile_restart_pending)
self.assertTrue(self.service._fresh_profile_restart_force_pending)
self.service._restart_browser_for_project.assert_not_awaited()
self.service._maybe_execute_pending_fresh_profile_restart.assert_awaited_once()
async def test_pending_fresh_restart_task_is_preserved_during_runtime_shutdown(self):
async def runner():
self.service._fresh_profile_restart_task = asyncio.current_task()
await self.service._cancel_background_runtime_tasks(reason="unit_test")
self.assertIs(self.service._fresh_profile_restart_task, asyncio.current_task())
import asyncio
task = asyncio.create_task(runner())
await task
async def test_get_token_waits_for_pending_fresh_restart_before_resident_pick(self):
import asyncio
events = []
tab = _ClosableFakeTab()
resident_info = ResidentTabInfo(
tab=tab,
slot_id="slot-1",
project_id="project-1",
token_id=1,
)
resident_info.recaptcha_ready = True
self.service._fresh_profile_restart_every_n_solves = 5
self.service._fresh_profile_restart_pending = True
self.service._fresh_profile_restart_pending_reason = "unit:5/5"
self.service._has_active_browser_work = AsyncMock(return_value=False)
async def restart_unlocked(project_id, token_id=None, *, fresh_profile=False):
events.append("fresh_restart")
self.assertEqual(project_id, "project-1")
self.assertTrue(fresh_profile)
self.service._reset_browser_rotation_budget()
return True
async def initialize():
events.append("initialize")
async def ensure_resident(*args, **kwargs):
events.append("ensure_resident")
return "slot-1", resident_info
async def solve_resident(*args, **kwargs):
events.append("solve_resident")
return "token-1"
self.service._restart_browser_for_project_unlocked = AsyncMock(side_effect=restart_unlocked)
self.service.initialize = AsyncMock(side_effect=initialize)
self.service._ensure_resident_tab = AsyncMock(side_effect=ensure_resident)
self.service._ensure_resident_token_binding = AsyncMock(return_value=True)
self.service._solve_with_resident_tab = AsyncMock(side_effect=solve_resident)
token, slot_id = await self.service._get_token_direct(
"project-1",
token_id=1,
return_slot_id=True,
)
self.assertEqual((token, slot_id), ("token-1", "slot-1"))
self.assertEqual(events, ["fresh_restart", "initialize", "ensure_resident", "solve_resident"])
self.assertFalse(self.service._fresh_profile_restart_pending)
self.assertIsNone(self.service._fresh_profile_restart_task)
async def test_wait_for_pending_fresh_restart_awaits_existing_task(self):
import asyncio
events = []
self.service._fresh_profile_restart_pending = True
async def restart_task():
events.append("restart_start")
await asyncio.sleep(0.01)
self.service._fresh_profile_restart_pending = False
events.append("restart_done")
return True
task = asyncio.create_task(restart_task())
self.service._fresh_profile_restart_task = task
result = await self.service._wait_for_pending_fresh_profile_restart_before_solve(
"project-1",
token_id=1,
source="unit_test",
)
self.assertTrue(result)
self.assertEqual(events, ["restart_start", "restart_done"])
self.assertTrue(task.done())
if __name__ == "__main__":
unittest.main()