From dba4be99864b875d2e0aedc02b259bcf286eaac9 Mon Sep 17 00:00:00 2001 From: SmileQWQ Date: Thu, 23 Apr 2026 21:33:34 +0800 Subject: [PATCH] refactor: remove worker KV state and D1 transaction-heavy paths Move worker state from KV to D1 or memory. Clear affected page queries after Wallos import and avoid D1 transaction-heavy Prisma paths. --- DEPLOYMENT.md | 54 ++--- README.md | 6 +- apps/api/prisma/schema.prisma | 21 ++ .../services/channel-notification.service.ts | 166 +++++++------- .../api/src/services/exchange-rate.service.ts | 27 ++- apps/api/src/services/logo.service.ts | 20 +- apps/api/src/services/settings.service.ts | 2 + apps/api/src/services/subscription.service.ts | 60 ++++-- apps/api/src/services/tag.service.ts | 13 ++ .../api/src/services/wallos-import.service.ts | 109 +++++----- apps/api/src/services/webhook.service.ts | 65 +++--- .../src/services/worker-lite-cache.service.ts | 66 +----- .../worker-lite-repository.service.ts | 6 +- .../src/services/worker-lite-state.service.ts | 202 ++++++++++++++++++ apps/api/src/worker/database-init.ts | 27 +++ .../api/tests/integration/tags-routes.test.ts | 14 +- .../unit/channel-notification.service.test.ts | 18 +- .../tests/unit/subscription.service.test.ts | 38 +++- .../tests/unit/wallos-import-commit.test.ts | 161 +++++++------- apps/api/tests/unit/worker-lite-cache.test.ts | 40 ++++ .../unit/worker-lite-state.service.test.ts | 66 ++++++ apps/web/src/pages/SettingsPage.vue | 5 + scripts/bootstrap-cloudflare.mjs | 18 +- scripts/bootstrap-cloudflare.test.mjs | 16 +- wrangler.jsonc | 5 - 25 files changed, 772 insertions(+), 453 deletions(-) create mode 100644 apps/api/src/services/worker-lite-state.service.ts create mode 100644 apps/api/tests/unit/worker-lite-cache.test.ts create mode 100644 apps/api/tests/unit/worker-lite-state.service.test.ts diff --git a/DEPLOYMENT.md b/DEPLOYMENT.md index 7c392c5..e73413e 100644 --- a/DEPLOYMENT.md +++ b/DEPLOYMENT.md @@ -84,11 +84,7 @@ - `WORKER_NAME_PREFIX` - 默认值是:`subtracker` - - 用来决定 Worker 名称、KV / D1 / R2 资源名前缀 -- `ENABLE_KV` - - 默认值是:`true` - - 不填时默认启用 KV - - 如果明确填 `false`,部署时将不绑定 KV + - 用来决定 Worker 名称、D1 / R2 资源名前缀 - `ENABLE_R2` - 填 `true` 时启用 R2 - 不填或填其他值时,默认不开启 @@ -141,7 +137,6 @@ 就会继续复用原来的: - D1 -- KV - R2(如果启用了) 不会因为同步代码就把数据重建掉。 @@ -154,25 +149,6 @@ - **D1** -### 推荐 - -- **KV** - -用途: - -- Logo 搜索缓存 -- 通知去重 -- 导入预览缓存 - -默认会启用 KV。 -如果你在仓库 Variables 里把 `ENABLE_KV` 明确设为 `false`,系统仍然可以运行,但会有功能退化。 - -KV 关闭后的主要退化包括: - -- 读取类接口不再享受 Worker Lite 的短 TTL 缓存 -- Logo 搜索缓存失效,重复搜索会更慢 -- 通知去重与导入预览状态会退回到更重的数据库路径 - ### 可选 - **R2** @@ -214,7 +190,7 @@ https://api.resend.com/emails 这个分支面向 **Cloudflare Worker Free**,因此实现上做了明确的 Lite 化裁剪,而不是完全复刻 Docker 版的运行模型。 -### 1. 30 秒短 TTL 缓存 +### 1. isolate 内存短 TTL 缓存 以下读取类接口默认会缓存 **30 秒**: @@ -231,12 +207,13 @@ https://api.resend.com/emails - Worker Free 每次 HTTP 请求只有 **10ms CPU** - 同一页面会并发读取多份数据 - 如果每次都实时重算,很容易撞上 `Worker exceeded CPU time limit` +- 同时避免继续消耗 KV Free 的每日写入额度 这 30 秒缓存的副作用是: - 刚修改完数据后,其他页面在极短时间内可能看到旧数据 - 统计页、日历页、标签页在 30 秒窗口内可能不是绝对实时 -- 如果关闭 KV,只能退回到 isolate 内存缓存,跨实例缓存一致性会更弱 +- 这是 isolate 级缓存,不保证跨实例强一致 当前实现已经对主要写操作做了主动失效,所以正常情况下: @@ -249,7 +226,17 @@ https://api.resend.com/emails > **30 秒缓存带来的不是“数据一定延迟 30 秒”,而是“最多可能有短时间旧数据”。** -### 2. Logo 搜索是 Lite 版 +### 2. 不使用 KV + +当前分支不绑定 KV,原因是 Cloudflare Workers Free 的 KV 写入额度只有 **1000/day**,对于本项目的缓存失效、通知去重和导入预览场景来说过于紧张。 + +现在这些能力的处理方式是: + +- 读取类接口:使用 isolate 内存短缓存 +- 通知去重:回到 D1 +- 导入预览 token:回到 D1 +- Logo 搜索缓存:仅保留进程内短缓存 +### 3. Logo 搜索是 Lite 版 Worker Lite 的 Logo 搜索只保留: @@ -269,7 +256,7 @@ Worker Lite 的 Logo 搜索只保留: - 搜索结果质量不如 Docker / main 分支 - 偶尔会混入不够理想的候选图 -### 3. Cron 已拆成 Lite 版职责 +### 4. Cron 已拆成 Lite 版职责 当前默认触发器是: @@ -286,7 +273,7 @@ Worker Lite 的 Logo 搜索只保留: - 而是采用 **5 分钟窗口命中** - 以换取更稳定的执行 -### 4. 错误提示会明确说明 Worker 限制 +### 5. 错误提示会明确说明 Worker 限制 前端遇到: @@ -319,13 +306,6 @@ npm run deploy:worker:r2 - 优先读取环境变量里的 Cloudflare 凭据 - 如果没有配置 `CLOUDFLARE_API_TOKEN`,会尝试走 `wrangler login` 浏览器授权 -如果你本地想关闭 KV,也可以临时设置: - -```powershell -$env:ENABLE_KV='false' -npm run deploy:worker -``` - --- ## 十、本地开发(开发者可选) diff --git a/README.md b/README.md index 04340a7..59d476c 100644 --- a/README.md +++ b/README.md @@ -44,7 +44,8 @@ 这个分支面向 **Cloudflare Worker Free**,因此会主动做一些性能取舍来换稳定性: -- 读取类接口默认使用 **30 秒短 TTL 缓存** +- 读取类接口使用 **isolate 内存短 TTL 缓存** +- **不使用 KV** - Logo 搜索是 Lite 版,只保留网站候选 + DuckDuckGo - Cron 已拆成更轻的 Worker 版职责 - 遇到 `503` / CPU 超限时,前端会明确提示可能受 Worker 免费版限制影响 @@ -62,7 +63,7 @@ ## 技术栈 - **前端**:Vue 3、Vite、TypeScript、Naive UI、Pinia、TanStack Query、ECharts -- **后端**:Cloudflare Worker、Hono、Prisma D1 Adapter、D1、KV、可选 R2 +- **后端**:Cloudflare Worker、Hono、Prisma D1 Adapter、D1、可选 R2 ## 本地开发 @@ -114,7 +115,6 @@ npm test 常用仓库 Variables: - `WORKER_NAME_PREFIX` -- `ENABLE_KV`(默认开启) - `ENABLE_R2`(默认关闭) ## 工作流 diff --git a/apps/api/prisma/schema.prisma b/apps/api/prisma/schema.prisma index 8de2c52..2d1d85b 100644 --- a/apps/api/prisma/schema.prisma +++ b/apps/api/prisma/schema.prisma @@ -126,6 +126,27 @@ model WebhookDelivery { @@index([status, createdAt]) } +model NotificationDelivery { + id String @id @default(cuid()) + channel String + eventType String + resourceKey String + periodKey String + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + @@unique([channel, eventType, resourceKey, periodKey]) +} + +model ImportPreview { + token String @id + previewJson Json + expiresAt DateTime + createdAt DateTime @default(now()) + + @@index([expiresAt]) +} + model Setting { key String @id valueJson Json diff --git a/apps/api/src/services/channel-notification.service.ts b/apps/api/src/services/channel-notification.service.ts index 885e4b9..241d9f6 100644 --- a/apps/api/src/services/channel-notification.service.ts +++ b/apps/api/src/services/channel-notification.service.ts @@ -1,9 +1,9 @@ import dayjs from 'dayjs' import type { EmailConfigInput, PushPlusConfigInput, TelegramConfigInput, WebhookEventType } from '@subtracker/shared' import { config } from '../config' -import { getWorkerCache } from '../runtime' import { dispatchWebhookEvent } from './webhook.service' -import { getNotificationChannelSettings, getSetting, setSetting } from './settings.service' +import { getNotificationChannelSettings } from './settings.service' +import { claimNotificationDelivery, releaseNotificationDelivery } from './worker-lite-state.service' type NotificationDispatchParams = { eventType: WebhookEventType @@ -57,42 +57,6 @@ export type NotificationChannelResult = { message?: string } -function buildNotificationKey( - channel: 'email' | 'pushplus' | 'telegram', - params: NotificationDispatchParams -) { - return `notification:${channel}:${params.eventType}:${params.resourceKey}:${params.periodKey}` -} - -async function hasNotificationBeenSent( - channel: 'email' | 'pushplus' | 'telegram', - params: NotificationDispatchParams -) { - const key = buildNotificationKey(channel, params) - const cache = getWorkerCache() - if (cache) { - return Boolean(await cache.get(key)) - } - - return getSetting(key, false) -} - -async function markNotificationSent( - channel: 'email' | 'pushplus' | 'telegram', - params: NotificationDispatchParams -) { - const key = buildNotificationKey(channel, params) - const cache = getWorkerCache() - if (cache) { - await cache.put(key, '1', { - expirationTtl: 60 * 60 * 24 * 14 - }) - return - } - - await setSetting(key, true) -} - function getMergedSubscriptions(params: NotificationDispatchParams) { const subscriptions = params.payload.subscriptions return Array.isArray(subscriptions) ? (subscriptions as NotificationSubscriptionItem[]) : [] @@ -284,8 +248,13 @@ async function sendEmailNotification(params: NotificationDispatchParams): Promis } } - const alreadySent = await hasNotificationBeenSent('email', params) - if (alreadySent) { + const claimed = await claimNotificationDelivery({ + channel: 'email', + eventType: params.eventType, + resourceKey: params.resourceKey, + periodKey: params.periodKey + }) + if (!claimed) { return { channel: 'email', status: 'skipped', @@ -293,8 +262,17 @@ async function sendEmailNotification(params: NotificationDispatchParams): Promis } } - await sendEmailWithConfig(params, settings.emailConfig) - await markNotificationSent('email', params) + try { + await sendEmailWithConfig(params, settings.emailConfig) + } catch (error) { + await releaseNotificationDelivery({ + channel: 'email', + eventType: params.eventType, + resourceKey: params.resourceKey, + periodKey: params.periodKey + }) + throw error + } return { channel: 'email', @@ -378,8 +356,13 @@ async function sendPushplusNotification(params: NotificationDispatchParams): Pro } } - const alreadySent = await hasNotificationBeenSent('pushplus', params) - if (alreadySent) { + const claimed = await claimNotificationDelivery({ + channel: 'pushplus', + eventType: params.eventType, + resourceKey: params.resourceKey, + periodKey: params.periodKey + }) + if (!claimed) { return { channel: 'pushplus', status: 'skipped', @@ -387,8 +370,17 @@ async function sendPushplusNotification(params: NotificationDispatchParams): Pro } } - await sendPushplusWithConfig(params, settings.pushplusConfig) - await markNotificationSent('pushplus', params) + try { + await sendPushplusWithConfig(params, settings.pushplusConfig) + } catch (error) { + await releaseNotificationDelivery({ + channel: 'pushplus', + eventType: params.eventType, + resourceKey: params.resourceKey, + periodKey: params.periodKey + }) + throw error + } return { channel: 'pushplus', @@ -440,8 +432,13 @@ async function sendTelegramNotification(params: NotificationDispatchParams): Pro } } - const alreadySent = await hasNotificationBeenSent('telegram', params) - if (alreadySent) { + const claimed = await claimNotificationDelivery({ + channel: 'telegram', + eventType: params.eventType, + resourceKey: params.resourceKey, + periodKey: params.periodKey + }) + if (!claimed) { return { channel: 'telegram', status: 'skipped', @@ -449,8 +446,17 @@ async function sendTelegramNotification(params: NotificationDispatchParams): Pro } } - await sendTelegramWithConfig(params, settings.telegramConfig) - await markNotificationSent('telegram', params) + try { + await sendTelegramWithConfig(params, settings.telegramConfig) + } catch (error) { + await releaseNotificationDelivery({ + channel: 'telegram', + eventType: params.eventType, + resourceKey: params.resourceKey, + periodKey: params.periodKey + }) + throw error + } return { channel: 'telegram', @@ -512,16 +518,20 @@ function buildTestReminderPayload() { } export async function sendTestEmailNotification() { - const result = await sendEmailNotification({ - eventType: 'subscription.reminder_due', - resourceKey: 'test:email', - periodKey: `${new Date().toISOString().slice(0, 10)}:upcoming`, - payload: buildTestReminderPayload() - }) - - if (result.status !== 'success') { + const settings = await getNotificationChannelSettings() + if (!settings.emailNotificationsEnabled) { throw new Error('邮箱通知未启用或配置不完整') } + + await sendEmailWithConfig( + { + eventType: 'subscription.reminder_due', + resourceKey: 'test:email', + periodKey: `${new Date().toISOString().slice(0, 10)}:upcoming`, + payload: buildTestReminderPayload() + }, + settings.emailConfig + ) } export async function sendTestEmailNotificationWithConfig(config: EmailConfigInput) { @@ -537,17 +547,21 @@ export async function sendTestEmailNotificationWithConfig(config: EmailConfigInp } export async function sendTestPushplusNotification() { - const result = await sendPushplusNotification({ - eventType: 'subscription.reminder_due', - resourceKey: 'test:pushplus', - periodKey: `${new Date().toISOString().slice(0, 10)}:upcoming`, - payload: buildTestReminderPayload() - }) - - if (result.status !== 'success') { + const settings = await getNotificationChannelSettings() + if (!settings.pushplusNotificationsEnabled) { throw new Error('PushPlus 通知未启用或配置不完整') } + await sendPushplusWithConfig( + { + eventType: 'subscription.reminder_due', + resourceKey: 'test:pushplus', + periodKey: `${new Date().toISOString().slice(0, 10)}:upcoming`, + payload: buildTestReminderPayload() + }, + settings.pushplusConfig + ) + return { accepted: true, message: 'PushPlus 已使用保存的配置发送测试请求' @@ -567,17 +581,21 @@ export async function sendTestPushplusNotificationWithConfig(config: PushPlusCon } export async function sendTestTelegramNotification() { - const result = await sendTelegramNotification({ - eventType: 'subscription.reminder_due', - resourceKey: 'test:telegram', - periodKey: `${new Date().toISOString().slice(0, 10)}:upcoming`, - payload: buildTestReminderPayload() - }) - - if (result.status !== 'success') { + const settings = await getNotificationChannelSettings() + if (!settings.telegramNotificationsEnabled) { throw new Error('Telegram 通知未启用或配置不完整') } + await sendTelegramWithConfig( + { + eventType: 'subscription.reminder_due', + resourceKey: 'test:telegram', + periodKey: `${new Date().toISOString().slice(0, 10)}:upcoming`, + payload: buildTestReminderPayload() + }, + settings.telegramConfig + ) + return { success: true } } diff --git a/apps/api/src/services/exchange-rate.service.ts b/apps/api/src/services/exchange-rate.service.ts index 62ad642..c4bda3a 100644 --- a/apps/api/src/services/exchange-rate.service.ts +++ b/apps/api/src/services/exchange-rate.service.ts @@ -52,15 +52,24 @@ export async function refreshExchangeRates(baseCurrency?: string) { throw new Error('Rate payload is empty') } - return await prisma.exchangeRateSnapshot.upsert({ - where: { baseCurrency: base }, - update: { - ratesJson: rates, - provider: config.exchangeRateProvider, - fetchedAt: new Date(), - isStale: false - }, - create: { + const existing = await prisma.exchangeRateSnapshot.findUnique({ + where: { baseCurrency: base } + }) + + if (existing) { + return await prisma.exchangeRateSnapshot.update({ + where: { baseCurrency: base }, + data: { + ratesJson: rates, + provider: config.exchangeRateProvider, + fetchedAt: new Date(), + isStale: false + } + }) + } + + return await prisma.exchangeRateSnapshot.create({ + data: { baseCurrency: base, ratesJson: rates, provider: config.exchangeRateProvider, diff --git a/apps/api/src/services/logo.service.ts b/apps/api/src/services/logo.service.ts index 61ae35a..914b916 100644 --- a/apps/api/src/services/logo.service.ts +++ b/apps/api/src/services/logo.service.ts @@ -1,12 +1,11 @@ import crypto from 'node:crypto' import { prisma } from '../db' import type { LogoSearchInput, LogoSearchResultDto, LogoUploadInput } from '@subtracker/shared' -import { getWorkerCache, getWorkerLogoBucket } from '../runtime' +import { getWorkerLogoBucket } from '../runtime' const SEARCH_USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36' const LOGO_REQUEST_TIMEOUT_MS = 20000 -const SEARCH_CACHE_TTL_SECONDS = 5 * 60 const SEARCH_CACHE_TTL_MS = 5 * 60 * 1000 const SEARCH_CACHE_PREFIX = 'logo-search:worker-parity:' const MAX_DUCKDUCKGO_CANDIDATES = 12 @@ -135,15 +134,6 @@ function makeCandidate( } async function cacheGet(key: string) { - const kv = getWorkerCache() - if (kv) { - const raw = await kv.get(key, 'json') - if (!raw) return null - const entry = raw as SearchCacheEntry - if (entry.expiresAt <= Date.now()) return null - return entry - } - const cached = searchCache.get(key) if (!cached || cached.expiresAt <= Date.now()) { searchCache.delete(key) @@ -158,14 +148,6 @@ async function cacheSet(key: string, items: LogoSearchResultDto[]) { items } - const kv = getWorkerCache() - if (kv) { - await kv.put(key, JSON.stringify(entry), { - expirationTtl: SEARCH_CACHE_TTL_SECONDS - }) - return - } - searchCache.set(key, entry) } diff --git a/apps/api/src/services/settings.service.ts b/apps/api/src/services/settings.service.ts index 392d6c8..b7b55d5 100644 --- a/apps/api/src/services/settings.service.ts +++ b/apps/api/src/services/settings.service.ts @@ -16,6 +16,7 @@ import { resolveDefaultAdvanceReminderRules, resolveDefaultOverdueReminderRules } from './reminder-rules.service' +import { invalidateWorkerLiteCache } from './worker-lite-cache.service' import { getSettingLite, listSettingsLite, setSettingLite } from './worker-lite-repository.service' export async function getSetting(key: string, fallback: T): Promise { @@ -24,6 +25,7 @@ export async function getSetting(key: string, fallback: T): Promise { export async function setSetting(key: string, value: T): Promise { await setSettingLite(key, value) + await invalidateWorkerLiteCache(['settings']) } function readSettingsValue(settingsMap: Map, key: string, fallback: T): T { diff --git a/apps/api/src/services/subscription.service.ts b/apps/api/src/services/subscription.service.ts index 4d20cab..64b5508 100644 --- a/apps/api/src/services/subscription.service.ts +++ b/apps/api/src/services/subscription.service.ts @@ -24,28 +24,39 @@ export async function renewSubscription(subscriptionId: string, paidAt?: Date, p subscription.billingIntervalUnit ) - const [payment, updated] = await prisma.$transaction([ - prisma.paymentRecord.create({ - data: { - subscriptionId: subscription.id, - amount, - currency, - baseCurrency, - convertedAmount, - exchangeRate, - paidAt: paidAt ?? new Date(), - periodStart, - periodEnd - } - }), - prisma.subscription.update({ + const payment = await prisma.paymentRecord.create({ + data: { + subscriptionId: subscription.id, + amount, + currency, + baseCurrency, + convertedAmount, + exchangeRate, + paidAt: paidAt ?? new Date(), + periodStart, + periodEnd + } + }) + + let updated + try { + updated = await prisma.subscription.update({ where: { id: subscription.id }, data: { nextRenewalDate: periodEnd, status: 'active' } }) - ]) + } catch (error) { + try { + await prisma.paymentRecord.delete({ + where: { id: payment.id } + }) + } catch { + // ignore compensation failures and surface the original update error + } + throw error + } return { payment, @@ -87,17 +98,26 @@ export async function autoRenewDueSubscriptions(today = new Date()) { export async function reconcileExpiredSubscriptions(today = new Date()) { const cutoff = dayjs(today).startOf('day').toDate() - const result = await prisma.subscription.updateMany({ + const rows = await prisma.subscription.findMany({ where: { status: 'active', nextRenewalDate: { lt: cutoff } }, - data: { - status: 'expired' + select: { + id: true } }) - return result.count + for (const row of rows) { + await prisma.subscription.update({ + where: { id: row.id }, + data: { + status: 'expired' + } + }) + } + + return rows.length } diff --git a/apps/api/src/services/tag.service.ts b/apps/api/src/services/tag.service.ts index 8286215..b8ea825 100644 --- a/apps/api/src/services/tag.service.ts +++ b/apps/api/src/services/tag.service.ts @@ -1,4 +1,5 @@ import type { Prisma, PrismaClient } from '@prisma/client' +import { isWorkerRuntime } from '../runtime' type DbClient = Prisma.TransactionClient | PrismaClient @@ -13,6 +14,18 @@ export async function replaceSubscriptionTags(db: DbClient, subscriptionId: stri if (tagIds.length === 0) return + if (isWorkerRuntime()) { + for (const tagId of tagIds) { + await db.subscriptionTag.create({ + data: { + subscriptionId, + tagId + } + }) + } + return + } + await db.subscriptionTag.createMany({ data: tagIds.map((tagId) => ({ subscriptionId, diff --git a/apps/api/src/services/wallos-import.service.ts b/apps/api/src/services/wallos-import.service.ts index 5db58be..8fef0bf 100644 --- a/apps/api/src/services/wallos-import.service.ts +++ b/apps/api/src/services/wallos-import.service.ts @@ -9,13 +9,13 @@ import type { WallosImportTagDto } from '@subtracker/shared' import { prisma } from '../db' -import { getWorkerCache } from '../runtime' -import { getAppSettings, getSetting, setSetting } from './settings.service' +import { isWorkerRuntime } from '../runtime' +import { getAppSettings } from './settings.service' import { appendSubscriptionOrders } from './subscription-order.service' +import { deleteImportPreview, getImportPreview, storeImportPreview } from './worker-lite-state.service' const IMPORT_TOKEN_TTL_SECONDS = 15 * 60 const IMPORT_TOKEN_TTL_MS = IMPORT_TOKEN_TTL_SECONDS * 1000 -const IMPORT_PREVIEW_PREFIX = 'wallosImportPreview:' const IMPORT_TAG_COLORS = [ '#3b82f6', '#8b5cf6', @@ -30,10 +30,6 @@ const IMPORT_TAG_COLORS = [ ] type WallosJsonRow = Record -type StoredPreviewEntry = { - expiresAt: number - preview: WallosImportInspectResultDto -} function decodeBase64(value: string) { const binary = atob(value.trim()) @@ -58,37 +54,6 @@ function createImportId() { return `c${timestamp}${random}`.slice(0, 25) } -async function getStoredPreview(token: string) { - const kv = getWorkerCache() - if (kv) { - return (await kv.get(`${IMPORT_PREVIEW_PREFIX}${token}`, 'json')) as StoredPreviewEntry | null - } - - return getSetting(`${IMPORT_PREVIEW_PREFIX}${token}`, null) -} - -async function setStoredPreview(token: string, entry: StoredPreviewEntry) { - const kv = getWorkerCache() - if (kv) { - await kv.put(`${IMPORT_PREVIEW_PREFIX}${token}`, JSON.stringify(entry), { - expirationTtl: IMPORT_TOKEN_TTL_SECONDS - }) - return - } - - await setSetting(`${IMPORT_PREVIEW_PREFIX}${token}`, entry) -} - -async function deleteStoredPreview(token: string) { - const kv = getWorkerCache() - if (kv) { - await kv.delete(`${IMPORT_PREVIEW_PREFIX}${token}`) - return - } - - await setSetting(`${IMPORT_PREVIEW_PREFIX}${token}`, null) -} - function normalizeWallosTagName(name: string | null | undefined) { const value = String(name ?? '').trim() if (!value) return null @@ -416,23 +381,19 @@ export async function inspectWallosImportFile(input: WallosImportInspectInput): importToken: token } - await setStoredPreview(token, { - expiresAt: Date.now() + IMPORT_TOKEN_TTL_MS, - preview - }) + await storeImportPreview(token, preview, IMPORT_TOKEN_TTL_MS) return preview } export async function commitWallosImport(input: WallosImportCommitInput): Promise { - const entry = await getStoredPreview(input.importToken) - if (!entry || entry.expiresAt <= Date.now()) { - await deleteStoredPreview(input.importToken) + const preview = await getImportPreview(input.importToken) + if (!preview) { + await deleteImportPreview(input.importToken) throw new Error('导入令牌不存在或已失效,请重新生成预览') } - const preview = entry.preview - await deleteStoredPreview(input.importToken) + await deleteImportPreview(input.importToken) const existingTags = await prisma.tag.findMany({ where: { @@ -448,13 +409,25 @@ export async function commitWallosImport(input: WallosImportCommitInput): Promis const missingTags = preview.usedTags.filter((tag) => !tagIdByName.has(tag.name)) if (missingTags.length > 0) { - await prisma.tag.createMany({ - data: missingTags.map((tag) => ({ - name: tag.name, - color: getImportedTagColor(tag.name), - sortOrder: tag.sortOrder - })) - }) + if (isWorkerRuntime()) { + for (const tag of missingTags) { + await prisma.tag.create({ + data: { + name: tag.name, + color: getImportedTagColor(tag.name), + sortOrder: tag.sortOrder + } + }) + } + } else { + await prisma.tag.createMany({ + data: missingTags.map((tag) => ({ + name: tag.name, + color: getImportedTagColor(tag.name), + sortOrder: tag.sortOrder + })) + }) + } importedTags = missingTags.length const refreshedTags = await prisma.tag.findMany({ @@ -524,15 +497,31 @@ export async function commitWallosImport(input: WallosImportCommitInput): Promis } if (subscriptionRows.length > 0) { - await prisma.subscription.createMany({ - data: subscriptionRows - }) + if (isWorkerRuntime()) { + for (const row of subscriptionRows) { + await prisma.subscription.create({ + data: row + }) + } + } else { + await prisma.subscription.createMany({ + data: subscriptionRows + }) + } } if (subscriptionTagRows.length > 0) { - await prisma.subscriptionTag.createMany({ - data: subscriptionTagRows - }) + if (isWorkerRuntime()) { + for (const row of subscriptionTagRows) { + await prisma.subscriptionTag.create({ + data: row + }) + } + } else { + await prisma.subscriptionTag.createMany({ + data: subscriptionTagRows + }) + } } await appendSubscriptionOrders(createdSubscriptionIds) diff --git a/apps/api/src/services/webhook.service.ts b/apps/api/src/services/webhook.service.ts index 235732c..6fe88ed 100644 --- a/apps/api/src/services/webhook.service.ts +++ b/apps/api/src/services/webhook.service.ts @@ -220,7 +220,7 @@ export async function dispatchWebhookEvent(params: { payload: params.payload }) - await prisma.webhookDelivery.upsert({ + const existing = await prisma.webhookDelivery.findUnique({ where: { eventType_resourceKey_periodKey: { eventType: params.eventType, @@ -228,35 +228,46 @@ export async function dispatchWebhookEvent(params: { periodKey: params.periodKey } }, - update: { - subscriptionId: params.subscriptionId ?? null, - targetUrl: endpoint.url, - requestMethod: endpoint.requestMethod, - payloadJson: params.payload as Prisma.InputJsonValue, - status: result.statusCode >= 400 ? 'failed' : 'success', - responseCode: result.statusCode, - responseBody: result.responseBody, - attemptCount: { - increment: 1 - }, - lastAttemptAt: new Date() - }, - create: { - subscriptionId: params.subscriptionId ?? null, - eventType: params.eventType, - resourceKey: params.resourceKey, - periodKey: params.periodKey, - targetUrl: endpoint.url, - requestMethod: endpoint.requestMethod, - payloadJson: params.payload as Prisma.InputJsonValue, - status: result.statusCode >= 400 ? 'failed' : 'success', - responseCode: result.statusCode, - responseBody: result.responseBody, - attemptCount: 1, - lastAttemptAt: new Date() + select: { + id: true, + attemptCount: true } }) + if (existing) { + await prisma.webhookDelivery.update({ + where: { id: existing.id }, + data: { + subscriptionId: params.subscriptionId ?? null, + targetUrl: endpoint.url, + requestMethod: endpoint.requestMethod, + payloadJson: params.payload as Prisma.InputJsonValue, + status: result.statusCode >= 400 ? 'failed' : 'success', + responseCode: result.statusCode, + responseBody: result.responseBody, + attemptCount: existing.attemptCount + 1, + lastAttemptAt: new Date() + } + }) + } else { + await prisma.webhookDelivery.create({ + data: { + subscriptionId: params.subscriptionId ?? null, + eventType: params.eventType, + resourceKey: params.resourceKey, + periodKey: params.periodKey, + targetUrl: endpoint.url, + requestMethod: endpoint.requestMethod, + payloadJson: params.payload as Prisma.InputJsonValue, + status: result.statusCode >= 400 ? 'failed' : 'success', + responseCode: result.statusCode, + responseBody: result.responseBody, + attemptCount: 1, + lastAttemptAt: new Date() + } + }) + } + if (result.statusCode >= 400) { throw new Error(`Webhook dispatch failed: HTTP ${result.statusCode}`) } diff --git a/apps/api/src/services/worker-lite-cache.service.ts b/apps/api/src/services/worker-lite-cache.service.ts index bd9d2c9..2392307 100644 --- a/apps/api/src/services/worker-lite-cache.service.ts +++ b/apps/api/src/services/worker-lite-cache.service.ts @@ -1,9 +1,5 @@ -import { getWorkerCache } from '../runtime' - const DEFAULT_CACHE_TTL_SECONDS = 30 -const KV_MIN_EXPIRATION_TTL_SECONDS = 60 -const CACHE_VALUE_PREFIX = 'liteCache:' -const CACHE_VERSION_PREFIX = 'liteCacheVersion:' +const CACHE_KEY_PREFIX = 'liteMemoryCache:' type CacheEntry = { expiresAt: number @@ -11,21 +7,12 @@ type CacheEntry = { } const memoryCache = new Map>() -const memoryVersions = new Map() const inflightLoads = new Map>() function now() { return Date.now() } -function getMemoryVersion(namespace: string) { - return memoryVersions.get(namespace) ?? 0 -} - -function setMemoryVersion(namespace: string, version: number) { - memoryVersions.set(namespace, version) -} - function getMemoryCache(key: string) { const cached = memoryCache.get(key) as CacheEntry | undefined if (!cached) return null @@ -43,50 +30,11 @@ function setMemoryCache(key: string, value: T, ttlSeconds: number) { }) } -async function getNamespaceVersion(namespace: string) { - const cache = getWorkerCache() - if (!cache) { - return getMemoryVersion(namespace) - } - - const raw = await cache.get(`${CACHE_VERSION_PREFIX}${namespace}`, 'text') - return raw ? Number(raw) || 0 : 0 -} - -async function bumpNamespaceVersion(namespace: string) { - const nextVersion = (await getNamespaceVersion(namespace)) + 1 - const cache = getWorkerCache() - if (cache) { - await cache.put(`${CACHE_VERSION_PREFIX}${namespace}`, String(nextVersion)) - } - setMemoryVersion(namespace, nextVersion) -} - async function getCacheValue(key: string) { - const cache = getWorkerCache() - if (cache) { - const cached = (await cache.get(key, 'json')) as CacheEntry | null - if (cached && cached.expiresAt > now()) { - return cached.value - } - } - return getMemoryCache(key) } async function setCacheValue(key: string, value: T, ttlSeconds: number) { - const entry: CacheEntry = { - value, - expiresAt: now() + ttlSeconds * 1000 - } - - const cache = getWorkerCache() - if (cache) { - await cache.put(key, JSON.stringify(entry), { - expirationTtl: Math.max(ttlSeconds, KV_MIN_EXPIRATION_TTL_SECONDS) - }) - } - setMemoryCache(key, value, ttlSeconds) } @@ -96,8 +44,7 @@ export async function withWorkerLiteCache( loader: () => Promise, ttlSeconds = DEFAULT_CACHE_TTL_SECONDS ) { - const version = await getNamespaceVersion(namespace) - const resolvedKey = `${CACHE_VALUE_PREFIX}${namespace}:v${version}:${cacheKey}` + const resolvedKey = `${CACHE_KEY_PREFIX}${namespace}:${cacheKey}` const cached = await getCacheValue(resolvedKey) if (cached !== null) { return cached @@ -121,5 +68,12 @@ export async function withWorkerLiteCache( } export async function invalidateWorkerLiteCache(namespaces: string[]) { - await Promise.all(Array.from(new Set(namespaces)).map((namespace) => bumpNamespaceVersion(namespace))) + for (const namespace of new Set(namespaces)) { + const prefix = `${CACHE_KEY_PREFIX}${namespace}:` + for (const key of Array.from(memoryCache.keys())) { + if (key.startsWith(prefix)) { + memoryCache.delete(key) + } + } + } } diff --git a/apps/api/src/services/worker-lite-repository.service.ts b/apps/api/src/services/worker-lite-repository.service.ts index 8054a30..b5f13dd 100644 --- a/apps/api/src/services/worker-lite-repository.service.ts +++ b/apps/api/src/services/worker-lite-repository.service.ts @@ -303,10 +303,8 @@ export async function updateTagLite( export async function deleteTagLite(id: string) { if (!getD1()) { - await prisma.$transaction([ - prisma.subscriptionTag.deleteMany({ where: { tagId: id } }), - prisma.tag.delete({ where: { id } }) - ]) + await prisma.subscriptionTag.deleteMany({ where: { tagId: id } }) + await prisma.tag.delete({ where: { id } }) return } diff --git a/apps/api/src/services/worker-lite-state.service.ts b/apps/api/src/services/worker-lite-state.service.ts new file mode 100644 index 0000000..bdca810 --- /dev/null +++ b/apps/api/src/services/worker-lite-state.service.ts @@ -0,0 +1,202 @@ +import { Prisma } from '@prisma/client' +import type { WallosImportInspectResultDto, WebhookEventType } from '@subtracker/shared' +import { prisma } from '../db' +import { getRuntimeD1Database, isWorkerRuntime } from '../runtime' + +type NotificationChannel = 'email' | 'pushplus' | 'telegram' + +type ImportPreviewRow = { + token: string + previewJson: unknown + expiresAt: string +} + +function getD1() { + if (!isWorkerRuntime()) return null + return getRuntimeD1Database() +} + +async function d1First(sql: string, params: unknown[] = []) { + const db = getD1() + if (!db) { + throw new Error('D1 unavailable') + } + const statement = db.prepare(sql) + const executed = params.length ? statement.bind(...params) : statement + return (await executed.first()) as T | null +} + +async function d1Run(sql: string, params: unknown[] = []) { + const db = getD1() + if (!db) { + throw new Error('D1 unavailable') + } + const statement = db.prepare(sql) + const executed = params.length ? statement.bind(...params) : statement + return executed.run() +} + +function createCuidLike() { + const random = crypto.randomUUID().replaceAll('-', '').slice(0, 16) + const timestamp = Date.now().toString(36).slice(-8) + return `c${timestamp}${random}`.slice(0, 25) +} + +function parseJsonValue(value: unknown) { + if (value === null || value === undefined) { + throw new Error('State JSON value is empty') + } + + if (typeof value === 'object') { + if (value instanceof Uint8Array) { + return JSON.parse(new TextDecoder().decode(value)) as T + } + if (value instanceof ArrayBuffer) { + return JSON.parse(new TextDecoder().decode(new Uint8Array(value))) as T + } + if (typeof Buffer !== 'undefined' && Buffer.isBuffer(value)) { + return JSON.parse(value.toString('utf8')) as T + } + return value as T + } + + return JSON.parse(String(value)) as T +} + +function isUniqueConstraintError(error: unknown) { + if (error instanceof Prisma.PrismaClientKnownRequestError) { + return error.code === 'P2002' + } + + const message = error instanceof Error ? error.message : String(error) + return /unique/i.test(message) +} + +export async function claimNotificationDelivery(params: { + channel: NotificationChannel + eventType: WebhookEventType + resourceKey: string + periodKey: string +}) { + if (!getD1()) { + try { + await prisma.notificationDelivery.create({ + data: { + channel: params.channel, + eventType: params.eventType, + resourceKey: params.resourceKey, + periodKey: params.periodKey + } + }) + return true + } catch (error) { + if (isUniqueConstraintError(error)) { + return false + } + throw error + } + } + + try { + await d1Run( + `INSERT INTO NotificationDelivery (id, channel, eventType, resourceKey, periodKey, createdAt, updatedAt) + VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)`, + [createCuidLike(), params.channel, params.eventType, params.resourceKey, params.periodKey] + ) + return true + } catch (error) { + if (isUniqueConstraintError(error)) { + return false + } + throw error + } +} + +export async function releaseNotificationDelivery(params: { + channel: NotificationChannel + eventType: WebhookEventType + resourceKey: string + periodKey: string +}) { + if (!getD1()) { + await prisma.notificationDelivery.deleteMany({ + where: { + channel: params.channel, + eventType: params.eventType, + resourceKey: params.resourceKey, + periodKey: params.periodKey + } + }) + return + } + + await d1Run( + `DELETE FROM NotificationDelivery + WHERE channel = ? AND eventType = ? AND resourceKey = ? AND periodKey = ?`, + [params.channel, params.eventType, params.resourceKey, params.periodKey] + ) +} + +export async function storeImportPreview(token: string, preview: WallosImportInspectResultDto, ttlMs: number) { + const expiresAt = new Date(Date.now() + ttlMs) + + if (!getD1()) { + await prisma.importPreview.upsert({ + where: { token }, + update: { + previewJson: preview as unknown as Prisma.InputJsonValue, + expiresAt + }, + create: { + token, + previewJson: preview as unknown as Prisma.InputJsonValue, + expiresAt + } + }) + return + } + + await d1Run( + `INSERT INTO ImportPreview (token, previewJson, expiresAt, createdAt) + VALUES (?, ?, ?, CURRENT_TIMESTAMP) + ON CONFLICT(token) DO UPDATE SET previewJson = excluded.previewJson, expiresAt = excluded.expiresAt`, + [token, JSON.stringify(preview), expiresAt.toISOString()] + ) +} + +export async function getImportPreview(token: string) { + if (!getD1()) { + const row = await prisma.importPreview.findUnique({ where: { token } }) + if (!row) return null + if (row.expiresAt.getTime() <= Date.now()) { + await prisma.importPreview.deleteMany({ where: { token } }) + return null + } + return row.previewJson as unknown as WallosImportInspectResultDto + } + + const row = await d1First( + `SELECT token, previewJson, expiresAt + FROM ImportPreview + WHERE token = ? + LIMIT 1`, + [token] + ) + if (!row) return null + + if (new Date(row.expiresAt).getTime() <= Date.now()) { + await deleteImportPreview(token) + return null + } + + return parseJsonValue(row.previewJson) +} + +export async function deleteImportPreview(token: string) { + if (!getD1()) { + await prisma.importPreview.deleteMany({ where: { token } }) + return + } + + await d1Run('DELETE FROM ImportPreview WHERE token = ?', [token]) +} diff --git a/apps/api/src/worker/database-init.ts b/apps/api/src/worker/database-init.ts index 868617d..c27f844 100644 --- a/apps/api/src/worker/database-init.ts +++ b/apps/api/src/worker/database-init.ts @@ -122,6 +122,33 @@ const schemaStatements = [ ON "WebhookDelivery"("status", "createdAt") `, ` + CREATE TABLE IF NOT EXISTS "NotificationDelivery" ( + "id" TEXT NOT NULL PRIMARY KEY, + "channel" TEXT NOT NULL, + "eventType" TEXT NOT NULL, + "resourceKey" TEXT NOT NULL, + "periodKey" TEXT NOT NULL, + "createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP + ) + `, + ` + CREATE UNIQUE INDEX IF NOT EXISTS "NotificationDelivery_channel_eventType_resourceKey_periodKey_key" + ON "NotificationDelivery"("channel", "eventType", "resourceKey", "periodKey") + `, + ` + CREATE TABLE IF NOT EXISTS "ImportPreview" ( + "token" TEXT NOT NULL PRIMARY KEY, + "previewJson" TEXT NOT NULL, + "expiresAt" DATETIME NOT NULL, + "createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP + ) + `, + ` + CREATE INDEX IF NOT EXISTS "ImportPreview_expiresAt_idx" + ON "ImportPreview"("expiresAt") + `, + ` CREATE TABLE IF NOT EXISTS "Setting" ( "key" TEXT NOT NULL PRIMARY KEY, "valueJson" TEXT NOT NULL, diff --git a/apps/api/tests/integration/tags-routes.test.ts b/apps/api/tests/integration/tags-routes.test.ts index eb33e7f..23f3195 100644 --- a/apps/api/tests/integration/tags-routes.test.ts +++ b/apps/api/tests/integration/tags-routes.test.ts @@ -3,12 +3,6 @@ import { beforeEach, describe, expect, it, vi } from 'vitest' const tagRouteMocks = vi.hoisted(() => ({ prisma: { - $transaction: vi.fn(async (operations: unknown) => { - if (typeof operations === 'function') { - throw new Error('interactive transaction should not be used for tag deletion') - } - return operations - }), subscriptionTag: { deleteMany: vi.fn(async () => ({ count: 1 })) }, @@ -42,8 +36,12 @@ describe('tag routes D1 compatibility', () => { }) expect(response.statusCode).toBe(200) - expect(tagRouteMocks.prisma.$transaction).toHaveBeenCalledTimes(1) - expect(typeof tagRouteMocks.prisma.$transaction.mock.calls[0]?.[0]).not.toBe('function') + expect(tagRouteMocks.prisma.subscriptionTag.deleteMany).toHaveBeenCalledWith({ + where: { tagId: 'cksubtracker0000000000000tag1' } + }) + expect(tagRouteMocks.prisma.tag.delete).toHaveBeenCalledWith({ + where: { id: 'cksubtracker0000000000000tag1' } + }) await app.close() }) diff --git a/apps/api/tests/unit/channel-notification.service.test.ts b/apps/api/tests/unit/channel-notification.service.test.ts index 29018d1..cbcb3c8 100644 --- a/apps/api/tests/unit/channel-notification.service.test.ts +++ b/apps/api/tests/unit/channel-notification.service.test.ts @@ -1,9 +1,5 @@ import { beforeEach, describe, expect, it, vi } from 'vitest' -vi.mock('../../src/runtime', () => ({ - getWorkerCache: vi.fn(() => undefined) -})) - vi.mock('../../src/config', () => ({ config: { resendApiUrl: 'https://api.resend.com/emails' @@ -11,7 +7,7 @@ vi.mock('../../src/config', () => ({ })) vi.mock('../../src/services/settings.service', () => ({ - getAppSettings: vi.fn(async () => ({ + getNotificationChannelSettings: vi.fn(async () => ({ emailNotificationsEnabled: false, pushplusNotificationsEnabled: false, telegramNotificationsEnabled: false, @@ -30,9 +26,7 @@ vi.mock('../../src/services/settings.service', () => ({ botToken: '', chatId: '' } - })), - getSetting: vi.fn(async () => false), - setSetting: vi.fn(async () => undefined) + })) })) vi.mock('../../src/services/webhook.service', () => ({ @@ -43,6 +37,14 @@ vi.mock('../../src/services/webhook.service', () => ({ })) })) +vi.mock('../../src/services/worker-lite-state.service', () => ({ + claimNotificationDelivery: vi.fn(async () => true), + releaseNotificationDelivery: vi.fn(async () => undefined), + storeImportPreview: vi.fn(), + getImportPreview: vi.fn(), + deleteImportPreview: vi.fn() +})) + import { sendTestEmailNotificationWithConfig } from '../../src/services/channel-notification.service' function jsonResponse(payload: unknown, status = 200) { diff --git a/apps/api/tests/unit/subscription.service.test.ts b/apps/api/tests/unit/subscription.service.test.ts index 0ba36aa..3db5b26 100644 --- a/apps/api/tests/unit/subscription.service.test.ts +++ b/apps/api/tests/unit/subscription.service.test.ts @@ -2,18 +2,13 @@ import { beforeEach, describe, expect, it, vi } from 'vitest' const serviceMocks = vi.hoisted(() => ({ prisma: { - $transaction: vi.fn(async (operations: unknown) => { - if (typeof operations === 'function') { - throw new Error('interactive transaction should not be used in renewSubscription') - } - return Promise.all(operations as Promise[]) - }), subscription: { findUnique: vi.fn(), update: vi.fn() }, paymentRecord: { - create: vi.fn() + create: vi.fn(), + delete: vi.fn() } }, getBaseCurrency: vi.fn(async () => 'CNY'), @@ -40,7 +35,7 @@ describe('subscription service D1 compatibility', () => { vi.clearAllMocks() }) - it('renews subscriptions with batch-style transaction operations instead of interactive transactions', async () => { + it('renews subscriptions without relying on Prisma transactions', async () => { const nextRenewalDate = new Date('2026-04-22T00:00:00.000Z') const periodEnd = new Date('2026-05-22T00:00:00.000Z') @@ -67,7 +62,30 @@ describe('subscription service D1 compatibility', () => { id: 'sub_1', status: 'active' }) - expect(serviceMocks.prisma.$transaction).toHaveBeenCalledTimes(1) - expect(typeof serviceMocks.prisma.$transaction.mock.calls[0]?.[0]).not.toBe('function') + expect(serviceMocks.prisma.paymentRecord.create).toHaveBeenCalledTimes(1) + expect(serviceMocks.prisma.subscription.update).toHaveBeenCalledTimes(1) + }) + + it('rolls back the payment record when subscription update fails', async () => { + const nextRenewalDate = new Date('2026-04-22T00:00:00.000Z') + + serviceMocks.prisma.subscription.findUnique.mockResolvedValue({ + id: 'sub_1', + amount: 25, + currency: 'CNY', + billingIntervalCount: 1, + billingIntervalUnit: 'month', + nextRenewalDate + }) + serviceMocks.prisma.paymentRecord.create.mockResolvedValue({ id: 'payment_1' }) + serviceMocks.prisma.subscription.update.mockRejectedValue(new Error('update failed')) + serviceMocks.prisma.paymentRecord.delete.mockResolvedValue({ id: 'payment_1' }) + + const { renewSubscription } = await import('../../src/services/subscription.service') + + await expect(renewSubscription('sub_1')).rejects.toThrow('update failed') + expect(serviceMocks.prisma.paymentRecord.delete).toHaveBeenCalledWith({ + where: { id: 'payment_1' } + }) }) }) diff --git a/apps/api/tests/unit/wallos-import-commit.test.ts b/apps/api/tests/unit/wallos-import-commit.test.ts index dfe1e8f..f6c35d7 100644 --- a/apps/api/tests/unit/wallos-import-commit.test.ts +++ b/apps/api/tests/unit/wallos-import-commit.test.ts @@ -1,7 +1,5 @@ import { beforeEach, describe, expect, it, vi } from 'vitest' -const settingsStore = new Map() - const { prismaMock, appendSubscriptionOrders } = vi.hoisted(() => ({ prismaMock: { tag: { @@ -22,18 +20,8 @@ vi.mock('../../src/db', () => ({ prisma: prismaMock })) -vi.mock('../../src/runtime', () => ({ - getWorkerCache: vi.fn(() => undefined) -})) - vi.mock('../../src/services/settings.service', () => ({ - getAppSettings: vi.fn(), - getSetting: vi.fn(async (key: string, fallbackValue: T) => - (settingsStore.has(key) ? (settingsStore.get(key) as T) : fallbackValue) - ), - setSetting: vi.fn(async (key: string, value: unknown) => { - settingsStore.set(key, value) - }) + getAppSettings: vi.fn() })) vi.mock('../../src/services/subscription-order.service', () => ({ @@ -41,88 +29,93 @@ vi.mock('../../src/services/subscription-order.service', () => ({ appendSubscriptionOrders })) +const previewState = vi.hoisted(() => ({ + getImportPreview: vi.fn(), + storeImportPreview: vi.fn(), + deleteImportPreview: vi.fn() +})) + +vi.mock('../../src/services/worker-lite-state.service', () => previewState) + import { commitWallosImport } from '../../src/services/wallos-import.service' describe('commitWallosImport', () => { beforeEach(() => { - settingsStore.clear() prismaMock.tag.findMany.mockReset() prismaMock.tag.createMany.mockReset() prismaMock.subscription.createMany.mockReset() prismaMock.subscriptionTag.createMany.mockReset() appendSubscriptionOrders.mockClear() + previewState.getImportPreview.mockReset() + previewState.deleteImportPreview.mockReset() + previewState.getImportPreview.mockResolvedValue({ + importToken: 'token-1', + isWallos: true, + summary: { + fileType: 'json', + subscriptionsTotal: 2, + tagsTotal: 2, + usedTagsTotal: 2, + supportedSubscriptions: 2, + skippedSubscriptions: 0, + globalNotifyDays: 3, + zipLogoMatched: 0, + zipLogoMissing: 0 + }, + usedTags: [ + { sourceId: 1, name: 'Video', sortOrder: 1 }, + { sourceId: 2, name: 'Music', sortOrder: 2 } + ], + tags: [], + subscriptionsPreview: [ + { + sourceId: 1, + name: 'Netflix', + amount: 10, + currency: 'USD', + status: 'active', + autoRenew: true, + billingIntervalCount: 1, + billingIntervalUnit: 'month', + startDate: '2026-04-01', + nextRenewalDate: '2026-05-01', + notifyDaysBefore: 3, + webhookEnabled: true, + notes: '', + description: '', + websiteUrl: 'https://netflix.com', + tagNames: ['Video'], + logoRef: null, + logoImportStatus: 'none', + warnings: [] + }, + { + sourceId: 2, + name: 'Spotify', + amount: 15, + currency: 'USD', + status: 'active', + autoRenew: true, + billingIntervalCount: 1, + billingIntervalUnit: 'month', + startDate: '2026-04-02', + nextRenewalDate: '2026-05-02', + notifyDaysBefore: 3, + webhookEnabled: true, + notes: '', + description: '', + websiteUrl: 'https://spotify.com', + tagNames: ['Music'], + logoRef: null, + logoImportStatus: 'none', + warnings: [] + } + ], + warnings: [] + }) }) it('batches imported tags, subscription tags and subscription order writes', async () => { - settingsStore.set('wallosImportPreview:token-1', { - expiresAt: Date.now() + 60_000, - preview: { - importToken: 'token-1', - isWallos: true, - summary: { - fileType: 'json', - subscriptionsTotal: 2, - tagsTotal: 2, - usedTagsTotal: 2, - supportedSubscriptions: 2, - skippedSubscriptions: 0, - globalNotifyDays: 3, - zipLogoMatched: 0, - zipLogoMissing: 0 - }, - usedTags: [ - { sourceId: 1, name: 'Video', sortOrder: 1 }, - { sourceId: 2, name: 'Music', sortOrder: 2 } - ], - tags: [], - subscriptionsPreview: [ - { - sourceId: 1, - name: 'Netflix', - amount: 10, - currency: 'USD', - status: 'active', - autoRenew: true, - billingIntervalCount: 1, - billingIntervalUnit: 'month', - startDate: '2026-04-01', - nextRenewalDate: '2026-05-01', - notifyDaysBefore: 3, - webhookEnabled: true, - notes: '', - description: '', - websiteUrl: 'https://netflix.com', - tagNames: ['Video'], - logoRef: null, - logoImportStatus: 'none', - warnings: [] - }, - { - sourceId: 2, - name: 'Spotify', - amount: 15, - currency: 'USD', - status: 'active', - autoRenew: true, - billingIntervalCount: 1, - billingIntervalUnit: 'month', - startDate: '2026-04-02', - nextRenewalDate: '2026-05-02', - notifyDaysBefore: 3, - webhookEnabled: true, - notes: '', - description: '', - websiteUrl: 'https://spotify.com', - tagNames: ['Music'], - logoRef: null, - logoImportStatus: 'none', - warnings: [] - } - ], - warnings: [] - } - }) - prismaMock.tag.findMany .mockResolvedValueOnce([]) .mockResolvedValueOnce([ @@ -158,6 +151,6 @@ describe('commitWallosImport', () => { importedSubscriptions: 2, skippedSubscriptions: 0 }) - expect(settingsStore.get('wallosImportPreview:token-1')).toBeNull() + expect(previewState.deleteImportPreview).toHaveBeenCalledWith('token-1') }) }) diff --git a/apps/api/tests/unit/worker-lite-cache.test.ts b/apps/api/tests/unit/worker-lite-cache.test.ts new file mode 100644 index 0000000..45feeac --- /dev/null +++ b/apps/api/tests/unit/worker-lite-cache.test.ts @@ -0,0 +1,40 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest' + +describe('worker-lite memory cache', () => { + beforeEach(() => { + vi.resetModules() + }) + + it('reuses in-memory values within ttl and inflight window', async () => { + const { withWorkerLiteCache } = await import('../../src/services/worker-lite-cache.service') + const loader = vi.fn(async () => 'cached-value') + + const [first, second] = await Promise.all([ + withWorkerLiteCache('settings', 'app-settings', loader, 30), + withWorkerLiteCache('settings', 'app-settings', loader, 30) + ]) + + expect(first).toBe('cached-value') + expect(second).toBe('cached-value') + expect(loader).toHaveBeenCalledTimes(1) + + const third = await withWorkerLiteCache('settings', 'app-settings', loader, 30) + expect(third).toBe('cached-value') + expect(loader).toHaveBeenCalledTimes(1) + }) + + it('clears current-isolate entries on namespace invalidation', async () => { + const { invalidateWorkerLiteCache, withWorkerLiteCache } = await import('../../src/services/worker-lite-cache.service') + let version = 0 + const loader = vi.fn(async () => ({ version: ++version })) + + const first = await withWorkerLiteCache('subscriptions', 'all', loader, 30) + expect(first).toEqual({ version: 1 }) + + await invalidateWorkerLiteCache(['subscriptions']) + + const second = await withWorkerLiteCache('subscriptions', 'all', loader, 30) + expect(second).toEqual({ version: 2 }) + expect(loader).toHaveBeenCalledTimes(2) + }) +}) diff --git a/apps/api/tests/unit/worker-lite-state.service.test.ts b/apps/api/tests/unit/worker-lite-state.service.test.ts new file mode 100644 index 0000000..8272a94 --- /dev/null +++ b/apps/api/tests/unit/worker-lite-state.service.test.ts @@ -0,0 +1,66 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const notificationDelivery = { + create: vi.fn(), + deleteMany: vi.fn() +} + +const importPreview = { + upsert: vi.fn(), + findUnique: vi.fn(), + deleteMany: vi.fn() +} + +vi.mock('../../src/db', () => ({ + prisma: { + notificationDelivery, + importPreview + } +})) + +vi.mock('../../src/runtime', () => ({ + getRuntimeD1Database: vi.fn(() => undefined), + isWorkerRuntime: vi.fn(() => false) +})) + +describe('worker-lite state service', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + it('treats unique notification-delivery conflicts as already claimed', async () => { + const { Prisma } = await import('@prisma/client') + const { claimNotificationDelivery } = await import('../../src/services/worker-lite-state.service') + + notificationDelivery.create.mockRejectedValueOnce( + new Prisma.PrismaClientKnownRequestError('duplicate', { + code: 'P2002', + clientVersion: 'test' + }) + ) + + await expect( + claimNotificationDelivery({ + channel: 'email', + eventType: 'subscription.reminder_due', + resourceKey: 'subscription:1', + periodKey: '2026-04-23:upcoming' + }) + ).resolves.toBe(false) + }) + + it('expires stored import previews on read', async () => { + const { getImportPreview } = await import('../../src/services/worker-lite-state.service') + + importPreview.findUnique.mockResolvedValueOnce({ + token: 'preview-token', + previewJson: { importToken: 'preview-token' }, + expiresAt: new Date(Date.now() - 1000) + }) + + await expect(getImportPreview('preview-token')).resolves.toBeNull() + expect(importPreview.deleteMany).toHaveBeenCalledWith({ + where: { token: 'preview-token' } + }) + }) +}) diff --git a/apps/web/src/pages/SettingsPage.vue b/apps/web/src/pages/SettingsPage.vue index 4f9e317..edf4f4d 100644 --- a/apps/web/src/pages/SettingsPage.vue +++ b/apps/web/src/pages/SettingsPage.vue @@ -968,6 +968,11 @@ async function exportSubscriptions(format: 'csv' | 'json') { } function handleWallosImported() { + queryClient.removeQueries({ queryKey: ['subscriptions'] }) + queryClient.removeQueries({ queryKey: ['tags'] }) + queryClient.removeQueries({ queryKey: ['statistics-overview'] }) + queryClient.removeQueries({ queryKey: ['statistics-budgets'] }) + queryClient.removeQueries({ queryKey: ['calendar-events'] }) showWallosImportModal.value = false message.success('Wallos 数据已导入') } diff --git a/scripts/bootstrap-cloudflare.mjs b/scripts/bootstrap-cloudflare.mjs index cbb1f3f..0e39d94 100644 --- a/scripts/bootstrap-cloudflare.mjs +++ b/scripts/bootstrap-cloudflare.mjs @@ -7,7 +7,6 @@ const cwd = process.cwd() const cliArgs = process.argv.slice(2) const args = new Set(cliArgs) const withR2 = args.has('--with-r2') -const withoutKv = args.has('--without-kv') const skipBuild = args.has('--skip-build') const configPath = path.resolve(cwd, 'wrangler.jsonc') const generatedConfigPath = path.resolve(cwd, '.wrangler.generated.jsonc') @@ -15,7 +14,6 @@ const generatedConfigPath = path.resolve(cwd, '.wrangler.generated.jsonc') function bindingResourceName(workerName, binding) { const suffixMap = { DB: 'db', - SUBTRACKER_CACHE: 'cache', SUBTRACKER_LOGOS: 'logos' } @@ -51,13 +49,9 @@ function resolveBooleanFlag({ return defaultValue } -function applyOptionalBindings(config, { enableKv, enableR2 }) { +function applyOptionalBindings(config, { enableR2 }) { const next = { ...config } - if (!enableKv) { - delete next.kv_namespaces - } - if (enableR2) { return withProvisionedR2Binding(next) } @@ -268,13 +262,6 @@ async function buildGeneratedConfig() { let config = JSON.parse(raw) const packageJson = JSON.parse(await readFile(path.resolve(cwd, 'package.json'), 'utf8')) const gitSha = (await run('git', ['rev-parse', '--short', 'HEAD'], { captureOutput: true })).stdout.trim() - const enableKv = withoutKv - ? false - : resolveBooleanFlag({ - envName: 'ENABLE_KV', - defaultValue: true - }) - config.name = resolveWorkerName({ defaultName: config.name || 'subtracker', cliArgs @@ -289,13 +276,12 @@ async function buildGeneratedConfig() { } config = applyOptionalBindings(config, { - enableKv, enableR2: withR2 }) config = syncCronTriggers(config) const inventory = await discoverExistingResources({ - includeKv: enableKv + includeKv: false }) attachExistingResources(config, inventory) diff --git a/scripts/bootstrap-cloudflare.test.mjs b/scripts/bootstrap-cloudflare.test.mjs index 452c778..fc8244c 100644 --- a/scripts/bootstrap-cloudflare.test.mjs +++ b/scripts/bootstrap-cloudflare.test.mjs @@ -14,7 +14,6 @@ import { } from './bootstrap-cloudflare.mjs' test('bindingResourceName uses worker-prefixed kebab-case names', () => { - assert.equal(bindingResourceName('subtracker', 'SUBTRACKER_CACHE'), 'subtracker-cache') assert.equal(bindingResourceName('subtracker', 'DB'), 'subtracker-db') assert.equal(bindingResourceName('subtracker', 'SUBTRACKER_LOGOS'), 'subtracker-logos') }) @@ -64,22 +63,19 @@ test('resolveAppVersion prefers explicit app version and falls back to package v ) }) -test('attachExistingResources reuses existing kv, d1 and r2 resources', () => { +test('attachExistingResources reuses existing d1 and r2 resources', () => { const config = { name: 'subtracker', - kv_namespaces: [{ binding: 'SUBTRACKER_CACHE' }], d1_databases: [{ binding: 'DB' }], r2_buckets: [{ binding: 'SUBTRACKER_LOGOS', bucket_name: 'subtracker-logos' }] } const next = attachExistingResources(config, { - kv: [{ title: 'subtracker-cache', id: 'kv-123' }], + kv: [], d1: [{ name: 'subtracker-db', uuid: 'd1-123' }], r2: [] }) - assert.equal(next.kv_namespaces[0].id, 'kv-123') - assert.equal(next.kv_namespaces[0].preview_id, 'kv-123') assert.equal(next.d1_databases[0].database_id, 'd1-123') assert.equal(next.d1_databases[0].database_name, 'subtracker-db') assert.equal(next.r2_buckets[0].bucket_name, 'subtracker-logos') @@ -88,7 +84,6 @@ test('attachExistingResources reuses existing kv, d1 and r2 resources', () => { test('attachExistingResources leaves missing resources unresolved for auto provisioning', () => { const config = { name: 'subtracker', - kv_namespaces: [{ binding: 'SUBTRACKER_CACHE' }], d1_databases: [{ binding: 'DB' }] } @@ -98,13 +93,11 @@ test('attachExistingResources leaves missing resources unresolved for auto provi r2: [] }) - assert.equal(next.kv_namespaces[0].id, undefined) assert.equal(next.d1_databases[0].database_id, undefined) }) test('getInventoryCommands matches wrangler 4 command capabilities', () => { const commands = getInventoryCommands() - assert.deepEqual(commands.kv, ['npx', ['wrangler', 'kv', 'namespace', 'list']]) assert.deepEqual(commands.d1, ['npx', ['wrangler', 'd1', 'list', '--json']]) }) @@ -144,19 +137,16 @@ test('withProvisionedR2Binding sets deterministic bucket name', () => { ]) }) -test('applyOptionalBindings can disable kv while preserving d1', () => { +test('applyOptionalBindings leaves d1 untouched when r2 is disabled', () => { const config = applyOptionalBindings( { - kv_namespaces: [{ binding: 'SUBTRACKER_CACHE' }], d1_databases: [{ binding: 'DB' }] }, { - enableKv: false, enableR2: false } ) - assert.equal(config.kv_namespaces, undefined) assert.deepEqual(config.d1_databases, [{ binding: 'DB' }]) }) diff --git a/wrangler.jsonc b/wrangler.jsonc index 5c752e9..78f0d49 100644 --- a/wrangler.jsonc +++ b/wrangler.jsonc @@ -42,10 +42,5 @@ { "binding": "DB" } - ], - "kv_namespaces": [ - { - "binding": "SUBTRACKER_CACHE" - } ] }