refactor: overhaul scheduler for AI-paced operation

Config changes:
- Remove dead auditCron (audit runs inside write pipeline)
- writeCron: daily → every 15 min
- radarCron: daily → every 6 hours
- Add chaptersPerCycle, retryDelayMs, cooldownAfterChapterMs,
  maxChaptersPerDay

Scheduler changes:
- Parallel book processing (Promise.all)
- Multi-chapter cycles per book
- Immediate retry with 30s delay (was 24h wait)
- Daily safety cap (default 50 chapters)
- Better cronToMs parsing for minute-level crons
This commit is contained in:
majx_mac
2026-03-14 16:09:38 +08:00
parent fb8cabe640
commit 3a56969159
6 changed files with 190 additions and 82 deletions

View File

@@ -41,8 +41,11 @@ export const upCommand = new Command("up")
modelOverrides: config.modelOverrides,
radarCron: config.daemon.schedule.radarCron,
writeCron: config.daemon.schedule.writeCron,
auditCron: config.daemon.schedule.auditCron,
maxConcurrentBooks: config.daemon.maxConcurrentBooks,
chaptersPerCycle: config.daemon.chaptersPerCycle,
retryDelayMs: config.daemon.retryDelayMs,
cooldownAfterChapterMs: config.daemon.cooldownAfterChapterMs,
maxChaptersPerDay: config.daemon.maxChaptersPerDay,
onChapterComplete: (bookId, chapter, status) => {
const icon = status === "approved" ? "+" : "!";
log(` [${icon}] ${bookId} Ch.${chapter}${status}`);

View File

@@ -35,9 +35,8 @@ export const initCommand = new Command("init")
notify: [],
daemon: {
schedule: {
radarCron: "0 9 * * *",
writeCron: "0 14 * * *",
auditCron: "0 17 * * *",
radarCron: "0 */6 * * *",
writeCron: "*/15 * * * *",
},
maxConcurrentBooks: 3,
},

View File

@@ -292,9 +292,10 @@ describe("ProjectConfigSchema", () => {
it("applies default daemon config", () => {
const result = ProjectConfigSchema.parse(validProject);
expect(result.daemon.maxConcurrentBooks).toBe(3);
expect(result.daemon.schedule.radarCron).toBe("0 9 * * *");
expect(result.daemon.schedule.writeCron).toBe("0 14 * * *");
expect(result.daemon.schedule.auditCron).toBe("0 17 * * *");
expect(result.daemon.schedule.radarCron).toBe("0 */6 * * *");
expect(result.daemon.schedule.writeCron).toBe("*/15 * * * *");
expect(result.daemon.chaptersPerCycle).toBe(1);
expect(result.daemon.maxChaptersPerDay).toBe(50);
});
it("applies default empty notify array", () => {

View File

@@ -66,11 +66,14 @@ export const ProjectConfigSchema = z.object({
modelOverrides: z.record(z.string(), z.string()).optional(),
daemon: z.object({
schedule: z.object({
radarCron: z.string().default("0 9 * * *"),
writeCron: z.string().default("0 14 * * *"),
auditCron: z.string().default("0 17 * * *"),
radarCron: z.string().default("0 */6 * * *"),
writeCron: z.string().default("*/15 * * * *"),
}),
maxConcurrentBooks: z.number().int().min(1).default(3),
chaptersPerCycle: z.number().int().min(1).max(20).default(1),
retryDelayMs: z.number().int().min(0).default(30_000),
cooldownAfterChapterMs: z.number().int().min(0).default(10_000),
maxChaptersPerDay: z.number().int().min(1).default(50),
qualityGates: QualityGatesSchema.default({
maxAuditRetries: 2,
pauseAfterConsecutiveFailures: 3,
@@ -78,11 +81,14 @@ export const ProjectConfigSchema = z.object({
}),
}).default({
schedule: {
radarCron: "0 9 * * *",
writeCron: "0 14 * * *",
auditCron: "0 17 * * *",
radarCron: "0 */6 * * *",
writeCron: "*/15 * * * *",
},
maxConcurrentBooks: 3,
chaptersPerCycle: 1,
retryDelayMs: 30_000,
cooldownAfterChapterMs: 10_000,
maxChaptersPerDay: 50,
qualityGates: {
maxAuditRetries: 2,
pauseAfterConsecutiveFailures: 3,

View File

@@ -9,8 +9,11 @@ import { detectChapter, detectAndRewrite } from "./detection-runner.js";
export interface SchedulerConfig extends PipelineConfig {
readonly radarCron: string;
readonly writeCron: string;
readonly auditCron: string;
readonly maxConcurrentBooks: number;
readonly chaptersPerCycle: number;
readonly retryDelayMs: number;
readonly cooldownAfterChapterMs: number;
readonly maxChaptersPerDay: number;
readonly qualityGates?: QualityGates;
readonly detection?: DetectionConfig;
readonly onChapterComplete?: (bookId: string, chapter: number, status: string) => void;
@@ -36,6 +39,8 @@ export class Scheduler {
private pausedBooks = new Set<string>();
// Failure clustering: bookId → (dimension → count)
private failureDimensions = new Map<string, Map<string, number>>();
// Daily chapter counter: "YYYY-MM-DD" → count
private dailyChapterCount = new Map<string, number>();
constructor(config: SchedulerConfig) {
this.config = config;
@@ -50,7 +55,7 @@ export class Scheduler {
// Run write cycle immediately on start, then schedule
await this.runWriteCycle();
// Schedule recurring write cycle (default: every 2 hours)
// Schedule recurring write cycle
const writeCycleMs = this.cronToMs(this.config.writeCron);
const writeTask: ScheduledTask = {
name: "write-cycle",
@@ -63,7 +68,7 @@ export class Scheduler {
}, writeCycleMs);
this.tasks.push(writeTask);
// Schedule radar scan (default: daily)
// Schedule radar scan
const radarMs = this.cronToMs(this.config.radarCron);
const radarTask: ScheduledTask = {
name: "radar-scan",
@@ -109,12 +114,38 @@ export class Scheduler {
};
}
/** Check if daily cap is reached across all books. */
private isDailyCapReached(): boolean {
const today = new Date().toISOString().slice(0, 10);
const count = this.dailyChapterCount.get(today) ?? 0;
return count >= this.config.maxChaptersPerDay;
}
/** Increment daily chapter counter. */
private recordChapterWritten(): void {
const today = new Date().toISOString().slice(0, 10);
const count = this.dailyChapterCount.get(today) ?? 0;
this.dailyChapterCount.set(today, count + 1);
// Clean up old dates (keep only today)
for (const key of this.dailyChapterCount.keys()) {
if (key !== today) this.dailyChapterCount.delete(key);
}
}
private async runWriteCycle(): Promise<void> {
if (this.isDailyCapReached()) {
process.stderr.write(
`[scheduler] Daily cap reached (${this.config.maxChaptersPerDay}), skipping cycle\n`,
);
return;
}
const bookIds = await this.state.listBooks();
const activeBooks: Array<{ id: string; config: BookConfig }> = [];
const activeBooks: Array<{ readonly id: string; readonly config: BookConfig }> = [];
for (const id of bookIds) {
if (this.pausedBooks.has(id)) continue; // Skip paused books
if (this.pausedBooks.has(id)) continue;
const config = await this.state.loadBookConfig(id);
if (config.status === "active" || config.status === "outlining") {
activeBooks.push({ id, config });
@@ -123,59 +154,104 @@ export class Scheduler {
const booksToWrite = activeBooks.slice(0, this.config.maxConcurrentBooks);
for (const book of booksToWrite) {
try {
// Compute temperature override: base 0.7 + failures * step
const failures = this.consecutiveFailures.get(book.id) ?? 0;
const tempOverride = failures > 0
? Math.min(1.2, 0.7 + failures * this.gates.retryTemperatureStep)
: undefined;
// Parallel book processing
await Promise.all(
booksToWrite.map((book) => this.processBook(book.id, book.config)),
);
}
const result = await this.pipeline.writeNextChapter(book.id, undefined, tempOverride);
/** Process a single book: write chaptersPerCycle chapters with retry + cooldown. */
private async processBook(bookId: string, bookConfig: BookConfig): Promise<void> {
for (let i = 0; i < this.config.chaptersPerCycle; i++) {
if (!this.running) return;
if (this.isDailyCapReached()) return;
if (this.pausedBooks.has(bookId)) return;
if (result.status === "approved") {
// Reset failure counter on success
this.consecutiveFailures.delete(book.id);
// Cooldown between chapters (skip for the first one)
if (i > 0 && this.config.cooldownAfterChapterMs > 0) {
await this.sleep(this.config.cooldownAfterChapterMs);
}
// Feature #6: Auto-detection loop after successful audit
if (this.config.detection?.enabled) {
try {
const bookDir = this.state.bookDir(book.id);
const chapterContent = await this.readChapterContent(bookDir, result.chapterNumber);
const detResult = await detectChapter(
this.config.detection,
chapterContent,
result.chapterNumber,
);
if (!detResult.passed && this.config.detection.autoRewrite) {
await detectAndRewrite(
this.config.detection,
{ client: this.config.client, model: this.config.model, projectRoot: this.config.projectRoot },
bookDir,
chapterContent,
result.chapterNumber,
book.config.genre,
);
}
} catch (e) {
this.config.onError?.(book.id, e as Error);
}
}
const success = await this.writeOneChapter(bookId, bookConfig);
if (!success) {
// Immediate retry with delay (if within retry limit)
const failures = this.consecutiveFailures.get(bookId) ?? 0;
if (failures <= this.gates.maxAuditRetries && this.config.retryDelayMs > 0) {
process.stderr.write(
`[scheduler] ${bookId} retrying in ${this.config.retryDelayMs}ms\n`,
);
await this.sleep(this.config.retryDelayMs);
const retrySuccess = await this.writeOneChapter(bookId, bookConfig);
if (!retrySuccess) break; // Stop this book's cycle on second failure
} else {
// Audit failed — apply quality gates
const issueCategories = result.auditResult.issues.map((i) => i.category);
await this.handleAuditFailure(book.id, result.chapterNumber, issueCategories);
break; // Stop this book's cycle
}
}
}
}
/** Write one chapter for a book. Returns true if approved. */
private async writeOneChapter(bookId: string, bookConfig: BookConfig): Promise<boolean> {
try {
// Compute temperature override: base 0.7 + failures * step
const failures = this.consecutiveFailures.get(bookId) ?? 0;
const tempOverride = failures > 0
? Math.min(1.2, 0.7 + failures * this.gates.retryTemperatureStep)
: undefined;
const result = await this.pipeline.writeNextChapter(bookId, undefined, tempOverride);
if (result.status === "approved") {
this.consecutiveFailures.delete(bookId);
this.recordChapterWritten();
// Auto-detection loop after successful audit
if (this.config.detection?.enabled) {
await this.runDetection(bookId, bookConfig, result.chapterNumber);
}
this.config.onChapterComplete?.(
book.id,
result.chapterNumber,
result.status,
);
} catch (e) {
this.config.onError?.(book.id, e as Error);
await this.handleAuditFailure(book.id, 0);
this.config.onChapterComplete?.(bookId, result.chapterNumber, result.status);
return true;
}
// Audit failed — apply quality gates
const issueCategories = result.auditResult.issues.map((i) => i.category);
await this.handleAuditFailure(bookId, result.chapterNumber, issueCategories);
this.config.onChapterComplete?.(bookId, result.chapterNumber, result.status);
return false;
} catch (e) {
this.config.onError?.(bookId, e as Error);
await this.handleAuditFailure(bookId, 0);
return false;
}
}
private async runDetection(
bookId: string,
bookConfig: BookConfig,
chapterNumber: number,
): Promise<void> {
if (!this.config.detection) return;
try {
const bookDir = this.state.bookDir(bookId);
const chapterContent = await this.readChapterContent(bookDir, chapterNumber);
const detResult = await detectChapter(
this.config.detection,
chapterContent,
chapterNumber,
);
if (!detResult.passed && this.config.detection.autoRewrite) {
await detectAndRewrite(
this.config.detection,
{ client: this.config.client, model: this.config.model, projectRoot: this.config.projectRoot },
bookDir,
chapterContent,
chapterNumber,
bookConfig.genre,
);
}
} catch (e) {
this.config.onError?.(bookId, e as Error);
}
}
@@ -189,7 +265,8 @@ export class Scheduler {
// Track failure dimensions for clustering
if (issueCategories.length > 0) {
const dimMap = this.failureDimensions.get(bookId) ?? new Map<string, number>();
const existing = this.failureDimensions.get(bookId);
const dimMap = existing ? new Map(existing) : new Map<string, number>();
for (const cat of issueCategories) {
dimMap.set(cat, (dimMap.get(cat) ?? 0) + 1);
}
@@ -205,13 +282,10 @@ export class Scheduler {
const gates = this.gates;
// Check if we should retry with higher temperature
if (failures <= gates.maxAuditRetries) {
process.stderr.write(
`[scheduler] ${bookId} audit failed (${failures}/${gates.maxAuditRetries}), retrying with higher temperature\n`,
`[scheduler] ${bookId} audit failed (${failures}/${gates.maxAuditRetries}), will retry\n`,
);
// The retry will happen in the next write cycle with adjusted temperature
// (We don't retry immediately to avoid tight loops)
return;
}
@@ -222,7 +296,6 @@ export class Scheduler {
process.stderr.write(`[scheduler] ${bookId} PAUSED: ${reason}\n`);
this.config.onPause?.(bookId, reason);
// Emit webhook event
if (this.config.notifyChannels && this.config.notifyChannels.length > 0) {
await dispatchWebhookEvent(this.config.notifyChannels, {
event: "pipeline-error",
@@ -281,20 +354,29 @@ export class Scheduler {
}
private cronToMs(cron: string): number {
// Simple cron-to-interval mapping for common patterns
// "0 9 * * *" = daily = 24h
// "0 14 * * *" = daily = 24h
// "0 */2 * * *" = every 2h
const parts = cron.split(" ");
if (parts.length >= 5) {
const hour = parts[1]!;
if (hour.startsWith("*/")) {
const interval = parseInt(hour.slice(2), 10);
return interval * 60 * 60 * 1000;
}
// Default: treat as daily
return 24 * 60 * 60 * 1000;
if (parts.length < 5) return 24 * 60 * 60 * 1000;
const minute = parts[0]!;
const hour = parts[1]!;
// "*/N * * * *" → every N minutes
if (minute.startsWith("*/")) {
const interval = parseInt(minute.slice(2), 10);
return interval * 60 * 1000;
}
// "0 */N * * *" → every N hours
if (hour.startsWith("*/")) {
const interval = parseInt(hour.slice(2), 10);
return interval * 60 * 60 * 1000;
}
// Fixed time → treat as daily
return 24 * 60 * 60 * 1000;
}
private sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}

17
test-project/inkos.json Normal file
View File

@@ -0,0 +1,17 @@
{
"name": "test-project",
"version": "0.1.0",
"llm": {
"provider": "openai",
"baseUrl": "https://api.openai.com/v1",
"model": "gpt-4o"
},
"notify": [],
"daemon": {
"schedule": {
"radarCron": "0 */6 * * *",
"writeCron": "*/15 * * * *"
},
"maxConcurrentBooks": 3
}
}