mirror of
https://github.com/nearai/ironclaw.git
synced 2026-05-18 22:27:53 +08:00
canary(workflow): land workflow-canary lane with periodic-reminder scenario
Phase 1A of the workflow-canary system from issue #1044. Adds a new canary lane that exercises the routine engine + cron-fire path, the foundation that the remaining four scripts (Telegram → Sheets, Calendar prep, HN monitor, CRM tracker) will layer on. Components: - scripts/workflow_canary/routines.py — direct libSQL helpers for inserting a lightweight cron routine with a backdated next_fire_at and polling routine_runs for terminal status (ok / attention / failed). Backdating beats wall-clock cron in tests by 30+ s per probe and is the same shape auth-live-seeded uses for expire_secret_in_db. - scripts/workflow_canary/run_workflow_canary.py — entrypoint that starts the Telegram mock, calls common.start_gateway_stack with workflow-tuned env (ROUTINES_ENABLED=true, ROUTINES_CRON_INTERVAL=2, IRONCLAW_TEST_HTTP_REMAP=api.telegram.org=<mock>), and runs scenario modules. CLI mirrors run_live_canary.py. - scripts/workflow_canary/scenarios/periodic_reminder.py — Script 4 Phase 1A: insert lightweight routine → wait for engine to fire → assert run row reaches a terminal status. Verified locally: 1 probe, 1 fire, status=attention. Plumbing: - .github/workflows/live-canary.yml — new workflow-canary job + lane added to the workflow_dispatch choice list and the canary-report aggregator's needs:. - scripts/live-canary/run.sh — workflow-canary case dispatches to run_workflow_canary.py. Phase 1B follow-ups in subsequent commits: - Telegram channel install + bot-token seeding (needs admin auth or direct encrypted-secrets DB write) - Verify Telegram sendMessage was emitted to the mock during the routine fire (covered by mock telegram's /__mock/sent_messages) - Scripts 1, 3, 5 (Sheets / HN / Gmail-CRM) - Script 2 (Calendar prep with web search) Local verification: $ tests/e2e/.venv/bin/python scripts/workflow_canary/run_workflow_canary.py \ --skip-build --skip-python-bootstrap [workflow-canary] mock telegram listening at http://127.0.0.1:51139 [periodic_reminder] inserted routine ..., next_fire_at backdated 60s [periodic_reminder] routine fired: status=attention [workflow-canary] all 1 probe(s) passed. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
34
.github/workflows/live-canary.yml
vendored
34
.github/workflows/live-canary.yml
vendored
@@ -31,6 +31,7 @@ on:
|
||||
- auth-channels
|
||||
- auth-live-seeded
|
||||
- auth-browser-consent
|
||||
- workflow-canary
|
||||
scenario:
|
||||
description: "Optional scenario/test filter. Use auto for rotating persona."
|
||||
type: string
|
||||
@@ -359,6 +360,38 @@ jobs:
|
||||
path: artifacts/live-canary/
|
||||
if-no-files-found: ignore
|
||||
|
||||
workflow-canary:
|
||||
name: Workflow Canary
|
||||
if: >
|
||||
(github.event_name == 'schedule' && github.event.schedule == '0 2 * * *') ||
|
||||
(github.event_name == 'workflow_dispatch' && (inputs.lane == 'all' || inputs.lane == 'workflow-canary'))
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 30
|
||||
env:
|
||||
LANE: workflow-canary
|
||||
PROVIDER: mock
|
||||
PLAYWRIGHT_INSTALL: skip
|
||||
steps:
|
||||
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6
|
||||
with:
|
||||
persist-credentials: false
|
||||
- uses: dtolnay/rust-toolchain@29eef336d9b2848a0b548edc03f92a220660cdb8 # stable
|
||||
- uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5
|
||||
with:
|
||||
python-version: "3.12"
|
||||
- name: Run workflow canary lane
|
||||
run: scripts/live-canary/run.sh
|
||||
- name: Scrub artifacts
|
||||
if: always()
|
||||
run: scripts/live-canary/scrub-artifacts.sh artifacts/live-canary
|
||||
- name: Upload artifacts
|
||||
if: always()
|
||||
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4
|
||||
with:
|
||||
name: live-canary-workflow-canary
|
||||
path: artifacts/live-canary/
|
||||
if-no-files-found: ignore
|
||||
|
||||
deterministic-replay:
|
||||
name: Deterministic Replay
|
||||
if: >
|
||||
@@ -750,6 +783,7 @@ jobs:
|
||||
- auth-channels
|
||||
- auth-live-seeded
|
||||
- auth-browser-consent
|
||||
- workflow-canary
|
||||
- deterministic-replay
|
||||
- public-smoke
|
||||
- persona-rotating
|
||||
|
||||
@@ -269,9 +269,12 @@ main() {
|
||||
auth-browser-consent)
|
||||
run_python_lane scripts/auth_live_canary/run_live_canary.py --mode browser --playwright-install "${PLAYWRIGHT_INSTALL}"
|
||||
;;
|
||||
workflow-canary)
|
||||
run_python_lane scripts/workflow_canary/run_workflow_canary.py --playwright-install "${PLAYWRIGHT_INSTALL}"
|
||||
;;
|
||||
*)
|
||||
echo "Unknown live canary lane: ${LANE}" >&2
|
||||
echo "Known lanes: deterministic-replay, public-smoke, persona-rotating, private-oauth, provider-matrix, release-public-full, upgrade-canary, auth-smoke, auth-full, auth-channels, auth-live-seeded, auth-browser-consent" >&2
|
||||
echo "Known lanes: deterministic-replay, public-smoke, persona-rotating, private-oauth, provider-matrix, release-public-full, upgrade-canary, auth-smoke, auth-full, auth-channels, auth-live-seeded, auth-browser-consent, workflow-canary" >&2
|
||||
return 2
|
||||
;;
|
||||
esac
|
||||
|
||||
201
scripts/workflow_canary/routines.py
Normal file
201
scripts/workflow_canary/routines.py
Normal file
@@ -0,0 +1,201 @@
|
||||
"""libSQL helpers for the workflow-canary lane.
|
||||
|
||||
The routine engine fires on its own internal tick — by default routines
|
||||
are picked up when `next_fire_at <= now()`. Rather than wait for
|
||||
real wall-clock cron schedules to elapse (would push lane wall-time
|
||||
to many minutes per scenario), the canary backdates `next_fire_at`
|
||||
directly in the libSQL DB so the engine fires on its very next tick
|
||||
(usually within seconds).
|
||||
|
||||
This is the same pattern auth-live-seeded uses for `expire_secret_in_db`
|
||||
in `scripts/auth_live_canary/run_live_canary.py`. Direct DB writes
|
||||
are appropriate here because:
|
||||
|
||||
- The routine engine, telegram tool, and mock telegram are the system
|
||||
under test; the agent's NL → routine_create flow is exercised by
|
||||
separate tests at the conversation tier.
|
||||
- Determinism: cron timing tests are otherwise inherently flaky on
|
||||
CI under runner load.
|
||||
- Speed: backdating + a 5 s poll loop replaces a 60 s cron interval.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sqlite3
|
||||
import time
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
def _now_iso() -> str:
|
||||
"""ISO-8601 with millisecond precision, the format libSQL uses."""
|
||||
# Match `fmt_ts(dt)` in src/db/libsql/mod.rs (RFC 3339, ms precision).
|
||||
millis = int(time.time() * 1000)
|
||||
secs, ms = divmod(millis, 1000)
|
||||
return time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(secs)) + f".{ms:03d}Z"
|
||||
|
||||
|
||||
def _past_iso(seconds_ago: int = 60) -> str:
|
||||
millis = int(time.time() * 1000) - seconds_ago * 1000
|
||||
secs, ms = divmod(millis, 1000)
|
||||
return time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(secs)) + f".{ms:03d}Z"
|
||||
|
||||
|
||||
def insert_lightweight_cron_routine(
|
||||
db_path: str | Path,
|
||||
*,
|
||||
user_id: str,
|
||||
name: str,
|
||||
prompt: str,
|
||||
schedule: str = "*/1 * * * *",
|
||||
description: str = "",
|
||||
fire_immediately: bool = True,
|
||||
) -> str:
|
||||
"""INSERT a new lightweight-cron routine. Returns the routine id.
|
||||
|
||||
The action runs the given prompt through the LLM each fire — for the
|
||||
canary's mock LLM, that prompt should match a canned tool-call
|
||||
response so the engine deterministically issues the tool call we
|
||||
want to verify (e.g. a telegram sendMessage).
|
||||
|
||||
If ``fire_immediately`` is True (the default), ``next_fire_at`` is
|
||||
backdated 60 s into the past so the engine picks it up on its very
|
||||
next tick instead of waiting for the cron schedule.
|
||||
"""
|
||||
routine_id = str(uuid.uuid4())
|
||||
trigger_config = json.dumps({"schedule": schedule, "timezone": "UTC"})
|
||||
# action_config is a flat object — `RoutineAction::from_db` reads
|
||||
# `prompt`, `context_paths`, `max_tokens`, `use_tools`,
|
||||
# `max_tool_rounds` directly from the top-level JSON. The
|
||||
# `action_type` column carries the variant tag ("lightweight").
|
||||
action_config = json.dumps(
|
||||
{
|
||||
"prompt": prompt,
|
||||
"context_paths": [],
|
||||
"max_tokens": 1024,
|
||||
"use_tools": True,
|
||||
"max_tool_rounds": 3,
|
||||
}
|
||||
)
|
||||
now = _now_iso()
|
||||
next_fire_at = _past_iso(60) if fire_immediately else None
|
||||
|
||||
with sqlite3.connect(str(db_path)) as conn:
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO routines (
|
||||
id, name, description, user_id, enabled,
|
||||
trigger_type, trigger_config,
|
||||
action_type, action_config,
|
||||
cooldown_secs, max_concurrent,
|
||||
state, next_fire_at, run_count,
|
||||
consecutive_failures, created_at, updated_at
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
routine_id,
|
||||
name,
|
||||
description,
|
||||
user_id,
|
||||
1,
|
||||
"cron",
|
||||
trigger_config,
|
||||
"lightweight",
|
||||
action_config,
|
||||
0, # cooldown_secs — 0 so back-to-back fires aren't suppressed
|
||||
1,
|
||||
"{}",
|
||||
next_fire_at,
|
||||
0,
|
||||
0,
|
||||
now,
|
||||
now,
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
return routine_id
|
||||
|
||||
|
||||
def backdate_routine(db_path: str | Path, routine_id: str, seconds_ago: int = 60) -> None:
|
||||
"""Force a routine to fire on the next engine tick by backdating
|
||||
`next_fire_at`. Useful between successive probes in one scenario."""
|
||||
with sqlite3.connect(str(db_path)) as conn:
|
||||
conn.execute(
|
||||
"UPDATE routines SET next_fire_at = ?, updated_at = ? WHERE id = ?",
|
||||
(_past_iso(seconds_ago), _now_iso(), routine_id),
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
|
||||
def get_routine_state(db_path: str | Path, routine_id: str) -> dict[str, Any] | None:
|
||||
with sqlite3.connect(str(db_path)) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
row = conn.execute(
|
||||
"SELECT id, name, enabled, run_count, consecutive_failures, "
|
||||
"last_run_at, next_fire_at FROM routines WHERE id = ?",
|
||||
(routine_id,),
|
||||
).fetchone()
|
||||
return dict(row) if row else None
|
||||
|
||||
|
||||
def list_routine_runs(db_path: str | Path, routine_id: str) -> list[dict[str, Any]]:
|
||||
with sqlite3.connect(str(db_path)) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
rows = conn.execute(
|
||||
"SELECT id, status, started_at, completed_at, "
|
||||
"result_summary, tokens_used FROM routine_runs WHERE routine_id = ? "
|
||||
"ORDER BY started_at DESC",
|
||||
(routine_id,),
|
||||
).fetchall()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
|
||||
# RunStatus variants from src/agent/routine.rs:520-540 — running is the
|
||||
# only non-terminal one. "ok" / "attention" both mean the run completed
|
||||
# end-to-end (attention = produced output worth surfacing to the user).
|
||||
TERMINAL_RUN_STATUSES = {"ok", "attention", "failed"}
|
||||
SUCCESS_RUN_STATUSES = {"ok", "attention"}
|
||||
|
||||
|
||||
async def wait_for_run(
|
||||
db_path: str | Path,
|
||||
routine_id: str,
|
||||
*,
|
||||
min_runs: int = 1,
|
||||
timeout_secs: float = 30.0,
|
||||
poll_interval: float = 0.5,
|
||||
require_terminal: bool = True,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Poll routine_runs until at least `min_runs` rows exist with a
|
||||
terminal status (or any status if require_terminal=False).
|
||||
|
||||
Raises ``TimeoutError`` if the deadline elapses with fewer matching
|
||||
rows. Returns all observed runs (may be more than `min_runs` if
|
||||
multiple fired during the wait). The terminal-status check matters
|
||||
because the engine inserts the row with status=running before
|
||||
actually executing the action — checking `len(runs) >= 1` alone
|
||||
races with the executor.
|
||||
"""
|
||||
import asyncio
|
||||
|
||||
deadline = time.monotonic() + timeout_secs
|
||||
while time.monotonic() < deadline:
|
||||
runs = list_routine_runs(db_path, routine_id)
|
||||
if require_terminal:
|
||||
terminal_runs = [
|
||||
r for r in runs if r.get("status") in TERMINAL_RUN_STATUSES
|
||||
]
|
||||
if len(terminal_runs) >= min_runs:
|
||||
return runs
|
||||
elif len(runs) >= min_runs:
|
||||
return runs
|
||||
await asyncio.sleep(poll_interval)
|
||||
final = list_routine_runs(db_path, routine_id)
|
||||
statuses = [r.get("status") for r in final]
|
||||
raise TimeoutError(
|
||||
f"Timed out waiting for routine {routine_id} to fire "
|
||||
f"(observed {len(final)} run(s) with statuses {statuses}, "
|
||||
f"expected >= {min_runs} terminal)"
|
||||
)
|
||||
267
scripts/workflow_canary/run_workflow_canary.py
Normal file
267
scripts/workflow_canary/run_workflow_canary.py
Normal file
@@ -0,0 +1,267 @@
|
||||
"""Workflow-canary runner: end-to-end coverage for issue #1044 scenarios.
|
||||
|
||||
Where ``auth-live-canary`` covers credential/auth flows, this lane covers
|
||||
the broader user-facing workflows: chat-driven extension setup, routines
|
||||
firing on cron schedules, multi-tool pipelines (Telegram → Sheets,
|
||||
Calendar prep → Telegram, etc.).
|
||||
|
||||
Architecture mirrors auth-live-canary's runner:
|
||||
|
||||
- Reuses ``scripts.live_canary.common.start_gateway_stack`` for the
|
||||
bulk of the work (mock LLM + ironclaw subprocess + drainer threads
|
||||
+ LLM settings pin via API).
|
||||
- Adds a Telegram Bot API mock subprocess (``telegram_mock.py``) and
|
||||
routes IronClaw's outbound calls to it via
|
||||
``IRONCLAW_TEST_HTTP_REMAP=api.telegram.org=<mock_url>`` so each
|
||||
scenario can verify Telegram side-effects without a real bot token.
|
||||
|
||||
CLI shape matches ``run_live_canary.py`` so the same lane wrapper
|
||||
script (``scripts/live-canary/run.sh``) can drive both.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from dataclasses import asdict
|
||||
from pathlib import Path
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[2]
|
||||
if str(ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(ROOT))
|
||||
|
||||
from scripts.live_canary.common import ( # noqa: E402
|
||||
DEFAULT_VENV,
|
||||
E2E_DIR,
|
||||
ProbeResult,
|
||||
bootstrap_python,
|
||||
cargo_build,
|
||||
install_playwright,
|
||||
start_gateway_stack,
|
||||
stop_gateway_stack,
|
||||
stop_process,
|
||||
venv_python,
|
||||
wait_for_port_line,
|
||||
)
|
||||
|
||||
DEFAULT_OUTPUT_DIR = ROOT / "artifacts" / "workflow-canary"
|
||||
|
||||
# Ordered list of scenario keys → (module, function, display name).
|
||||
# Each scenario function takes (stack, mock_telegram_url, output_dir,
|
||||
# log_dir) and returns a list[ProbeResult].
|
||||
SCENARIOS: dict[str, tuple[str, str, str]] = {
|
||||
"periodic_reminder": (
|
||||
"scripts.workflow_canary.scenarios.periodic_reminder",
|
||||
"run",
|
||||
"Script 4 — Periodic Reminder via Telegram",
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(
|
||||
description=(
|
||||
"Workflow-canary runner. Exercises end-to-end multi-tool "
|
||||
"user workflows from issue #1044."
|
||||
)
|
||||
)
|
||||
parser.add_argument(
|
||||
"--scenario",
|
||||
action="append",
|
||||
choices=sorted(SCENARIOS),
|
||||
default=[],
|
||||
help=(
|
||||
"Limit the run to the listed scenarios. May be repeated. "
|
||||
"Default runs all scenarios."
|
||||
),
|
||||
)
|
||||
parser.add_argument(
|
||||
"--venv",
|
||||
type=Path,
|
||||
default=DEFAULT_VENV,
|
||||
help=f"Virtualenv path (default: {DEFAULT_VENV})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output-dir",
|
||||
type=Path,
|
||||
default=DEFAULT_OUTPUT_DIR,
|
||||
help=f"Artifacts directory (default: {DEFAULT_OUTPUT_DIR})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--playwright-install",
|
||||
choices=("auto", "with-deps", "plain", "skip"),
|
||||
default="skip",
|
||||
help=(
|
||||
"How to install Playwright browsers. Default 'skip' since "
|
||||
"the workflow-canary scenarios don't drive a browser; the "
|
||||
"auth-live-canary lanes own that."
|
||||
),
|
||||
)
|
||||
parser.add_argument("--skip-build", action="store_true")
|
||||
parser.add_argument("--skip-python-bootstrap", action="store_true")
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def _spawn_mock_telegram(
|
||||
python: Path, log_dir: Path
|
||||
) -> tuple[subprocess.Popen[str], str]:
|
||||
"""Start the mock Telegram Bot API server and return (process, url)."""
|
||||
proc = subprocess.Popen(
|
||||
[
|
||||
str(python),
|
||||
str(Path(__file__).parent / "telegram_mock.py"),
|
||||
"--port",
|
||||
"0",
|
||||
],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
text=True,
|
||||
bufsize=1,
|
||||
)
|
||||
match = wait_for_port_line(
|
||||
proc, re.compile(r"MOCK_TELEGRAM_PORT=(\d+)"), timeout=15.0
|
||||
)
|
||||
url = f"http://127.0.0.1:{match.group(1)}"
|
||||
|
||||
# Drain remaining stdout to a log file so the pipe doesn't fill —
|
||||
# same lesson as scripts/live_canary/common.py f59981d3.
|
||||
log_dir.mkdir(parents=True, exist_ok=True)
|
||||
log_path = log_dir / "telegram_mock.log"
|
||||
import threading
|
||||
|
||||
def _drain() -> None:
|
||||
try:
|
||||
with log_path.open("a", encoding="utf-8", errors="replace") as fh:
|
||||
if proc.stdout is None:
|
||||
return
|
||||
for line in proc.stdout:
|
||||
fh.write(line)
|
||||
fh.flush()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
threading.Thread(target=_drain, daemon=True).start()
|
||||
return proc, url
|
||||
|
||||
|
||||
async def _run_scenarios(
|
||||
args: argparse.Namespace, log_dir: Path, results: list[ProbeResult]
|
||||
) -> None:
|
||||
selected = args.scenario or list(SCENARIOS)
|
||||
|
||||
python = venv_python(args.venv)
|
||||
mock_telegram_proc, mock_telegram_url = _spawn_mock_telegram(python, log_dir)
|
||||
print(
|
||||
f"[workflow-canary] mock telegram listening at {mock_telegram_url}",
|
||||
flush=True,
|
||||
)
|
||||
|
||||
try:
|
||||
stack = await start_gateway_stack(
|
||||
venv_dir=args.venv,
|
||||
owner_user_id="workflow-canary-owner",
|
||||
temp_prefix="ironclaw-workflow-canary",
|
||||
gateway_token_prefix="workflow-canary",
|
||||
extra_gateway_env={
|
||||
# Route the WASM telegram tool's outbound calls to our
|
||||
# mock. The remap is honored by IronClaw's WASM HTTP
|
||||
# client when the binary is built with debug_assertions.
|
||||
"IRONCLAW_TEST_HTTP_REMAP": (
|
||||
f"api.telegram.org={mock_telegram_url}"
|
||||
),
|
||||
# The auth-live-canary stack disables routines by default
|
||||
# (ROUTINES_ENABLED=false in build_gateway_env). The
|
||||
# workflow-canary scenarios fire routines as their core
|
||||
# under-test surface, so re-enable + tighten the cron
|
||||
# tick interval so backdated routines fire within ~2 s
|
||||
# instead of the default 15 s.
|
||||
"ROUTINES_ENABLED": "true",
|
||||
"ROUTINES_CRON_INTERVAL": "2",
|
||||
"ROUTINES_DEFAULT_COOLDOWN": "0",
|
||||
},
|
||||
log_dir=log_dir,
|
||||
)
|
||||
except Exception:
|
||||
stop_process(mock_telegram_proc)
|
||||
raise
|
||||
|
||||
try:
|
||||
for key in selected:
|
||||
module_name, fn_name, display = SCENARIOS[key]
|
||||
print(f"\n[workflow-canary] === {display} ===", flush=True)
|
||||
module = __import__(module_name, fromlist=[fn_name])
|
||||
scenario_fn = getattr(module, fn_name)
|
||||
scenario_results = await scenario_fn(
|
||||
stack=stack,
|
||||
mock_telegram_url=mock_telegram_url,
|
||||
output_dir=args.output_dir,
|
||||
log_dir=log_dir,
|
||||
)
|
||||
results.extend(scenario_results)
|
||||
finally:
|
||||
stop_gateway_stack(stack)
|
||||
stop_process(mock_telegram_proc)
|
||||
|
||||
|
||||
def _write_results(results: list[ProbeResult], output_dir: Path) -> Path:
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
payload = {
|
||||
"generated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
|
||||
"results": [asdict(r) for r in results],
|
||||
}
|
||||
path = output_dir / "results.json"
|
||||
path.write_text(json.dumps(payload, indent=2, default=str))
|
||||
return path
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
|
||||
if not args.skip_python_bootstrap:
|
||||
bootstrap_python(args.venv)
|
||||
if args.playwright_install != "skip":
|
||||
install_playwright(venv_python(args.venv), args.playwright_install)
|
||||
if not args.skip_build:
|
||||
cargo_build()
|
||||
|
||||
log_dir = args.output_dir
|
||||
log_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
results: list[ProbeResult] = []
|
||||
try:
|
||||
asyncio.run(_run_scenarios(args, log_dir, results))
|
||||
except Exception as exc:
|
||||
print(f"[workflow-canary] error: {exc}", file=sys.stderr, flush=True)
|
||||
path = _write_results(results, args.output_dir)
|
||||
print(f"[workflow-canary] results: {path}", flush=True)
|
||||
return 1
|
||||
|
||||
path = _write_results(results, args.output_dir)
|
||||
failures = [r for r in results if not r.success]
|
||||
if failures:
|
||||
print(
|
||||
f"\n[workflow-canary] {len(failures)} probe(s) failed. "
|
||||
f"Results: {path}",
|
||||
flush=True,
|
||||
)
|
||||
for r in failures:
|
||||
print(
|
||||
f" ✗ {r.provider} / {r.mode}: "
|
||||
f"{r.details.get('error', '<no error>')}"
|
||||
)
|
||||
return 1
|
||||
print(
|
||||
f"\n[workflow-canary] all {len(results)} probe(s) passed. "
|
||||
f"Results: {path}"
|
||||
)
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
160
scripts/workflow_canary/scenarios/periodic_reminder.py
Normal file
160
scripts/workflow_canary/scenarios/periodic_reminder.py
Normal file
@@ -0,0 +1,160 @@
|
||||
"""Script 4 — Periodic Reminder via Telegram (issue #1044), Phase 1A.
|
||||
|
||||
Verifies the routine engine picks up a backdated cron routine, fires
|
||||
its Lightweight action against the (mock) LLM, and records a
|
||||
``routine_runs`` row with terminal status.
|
||||
|
||||
Coverage gained vs. existing canaries:
|
||||
|
||||
- Routine engine cron-tick path (`spawn_cron_ticker` → `check_cron_triggers`)
|
||||
- Lightweight routine action execution (RoutineAction::Lightweight)
|
||||
- DB-backed routine state machine (`routines.next_fire_at` →
|
||||
`routine_runs.status`)
|
||||
|
||||
Coverage deliberately *not* included in this phase:
|
||||
|
||||
- Telegram channel install + bot-token setup (requires admin secrets
|
||||
API; deferred to Phase 1B once we either (a) wire admin auth into
|
||||
the canary stack or (b) write directly to the encrypted secrets
|
||||
table). The mock Telegram server is started and the
|
||||
`IRONCLAW_TEST_HTTP_REMAP=api.telegram.org=...` is wired so the
|
||||
next iteration only has to add the channel install + assertion.
|
||||
- Verifying the routine's prompt actually causes a Telegram
|
||||
sendMessage. Requires the channel install above.
|
||||
|
||||
Phase 1A intentionally proves the engine + mock LLM scaffolding work
|
||||
end-to-end. Phase 1B layers on the Telegram side-effect verification.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
|
||||
from scripts.live_canary.common import ProbeResult
|
||||
from scripts.workflow_canary.routines import (
|
||||
SUCCESS_RUN_STATUSES,
|
||||
insert_lightweight_cron_routine,
|
||||
list_routine_runs,
|
||||
wait_for_run,
|
||||
)
|
||||
|
||||
# The mock LLM matches `\bhello\b|\bhi\b|\bhey\b` and returns
|
||||
# "Hello! How can I help you today?" — see tests/e2e/mock_llm.py
|
||||
# CANNED_RESPONSES line 19. Keep the prompt simple so the engine
|
||||
# completes one round-trip quickly without needing tool calls.
|
||||
REMINDER_PROMPT = "hi"
|
||||
ROUTINE_NAME = "canary-periodic-reminder"
|
||||
|
||||
|
||||
async def _drain_mock_telegram(mock_telegram_url: str) -> list[dict[str, Any]]:
|
||||
async with httpx.AsyncClient(timeout=5.0) as client:
|
||||
response = await client.get(f"{mock_telegram_url}/__mock/sent_messages")
|
||||
response.raise_for_status()
|
||||
return response.json().get("messages", [])
|
||||
|
||||
|
||||
async def _reset_mock_telegram(mock_telegram_url: str) -> None:
|
||||
async with httpx.AsyncClient(timeout=5.0) as client:
|
||||
response = await client.post(f"{mock_telegram_url}/__mock/reset")
|
||||
response.raise_for_status()
|
||||
|
||||
|
||||
async def run(
|
||||
*,
|
||||
stack: Any,
|
||||
mock_telegram_url: str,
|
||||
output_dir: Path,
|
||||
log_dir: Path,
|
||||
) -> list[ProbeResult]:
|
||||
db_path = stack.db_path
|
||||
owner_user_id = "workflow-canary-owner"
|
||||
|
||||
results: list[ProbeResult] = []
|
||||
started = time.perf_counter()
|
||||
|
||||
try:
|
||||
await _reset_mock_telegram(mock_telegram_url)
|
||||
|
||||
routine_id = insert_lightweight_cron_routine(
|
||||
db_path,
|
||||
user_id=owner_user_id,
|
||||
name=ROUTINE_NAME,
|
||||
prompt=REMINDER_PROMPT,
|
||||
schedule="*/1 * * * *",
|
||||
description="canary: dog walk reminder",
|
||||
fire_immediately=True,
|
||||
)
|
||||
print(
|
||||
f"[periodic_reminder] inserted routine {routine_id}, "
|
||||
f"next_fire_at backdated 60 s",
|
||||
flush=True,
|
||||
)
|
||||
|
||||
# Cron tick is configured to 2 s in run_workflow_canary.py, plus
|
||||
# the routine's lightweight action takes a few seconds end-to-end.
|
||||
# 30 s is comfortably above the 75th percentile observed in local
|
||||
# iteration; 60 s gives headroom for a slow CI runner.
|
||||
runs = await wait_for_run(
|
||||
db_path, routine_id, min_runs=1, timeout_secs=60.0
|
||||
)
|
||||
last_run = runs[0]
|
||||
latency_ms = int((time.perf_counter() - started) * 1000)
|
||||
print(
|
||||
f"[periodic_reminder] routine fired: status={last_run['status']}, "
|
||||
f"completed_at={last_run['completed_at']}",
|
||||
flush=True,
|
||||
)
|
||||
|
||||
success = last_run["status"] in SUCCESS_RUN_STATUSES
|
||||
|
||||
results.append(
|
||||
ProbeResult(
|
||||
provider="routines",
|
||||
mode="cron_lightweight_routine",
|
||||
success=success,
|
||||
latency_ms=latency_ms,
|
||||
details={
|
||||
"routine_id": routine_id,
|
||||
"run_status": last_run["status"],
|
||||
"run_count": len(runs),
|
||||
"result_summary": last_run.get("result_summary"),
|
||||
},
|
||||
)
|
||||
)
|
||||
except TimeoutError as exc:
|
||||
latency_ms = int((time.perf_counter() - started) * 1000)
|
||||
observed = (
|
||||
list_routine_runs(db_path, locals().get("routine_id", ""))
|
||||
if "routine_id" in locals()
|
||||
else []
|
||||
)
|
||||
results.append(
|
||||
ProbeResult(
|
||||
provider="routines",
|
||||
mode="cron_lightweight_routine",
|
||||
success=False,
|
||||
latency_ms=latency_ms,
|
||||
details={
|
||||
"error": f"timeout: {exc}",
|
||||
"observed_runs": len(observed),
|
||||
"observed_statuses": [r["status"] for r in observed],
|
||||
},
|
||||
)
|
||||
)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
latency_ms = int((time.perf_counter() - started) * 1000)
|
||||
results.append(
|
||||
ProbeResult(
|
||||
provider="routines",
|
||||
mode="cron_lightweight_routine",
|
||||
success=False,
|
||||
latency_ms=latency_ms,
|
||||
details={"error": f"{type(exc).__name__}: {exc}"},
|
||||
)
|
||||
)
|
||||
|
||||
return results
|
||||
Reference in New Issue
Block a user