refactor: remove worker KV state and D1 transaction-heavy paths

Move worker state from KV to D1 or memory.
Clear affected page queries after Wallos import and avoid D1 transaction-heavy Prisma paths.
This commit is contained in:
SmileQWQ
2026-04-23 21:33:34 +08:00
parent 79ff667ced
commit dba4be9986
25 changed files with 772 additions and 453 deletions

View File

@@ -84,11 +84,7 @@
- `WORKER_NAME_PREFIX`
- 默认值是:`subtracker`
- 用来决定 Worker 名称、KV / D1 / R2 资源名前缀
- `ENABLE_KV`
- 默认值是:`true`
- 不填时默认启用 KV
- 如果明确填 `false`,部署时将不绑定 KV
- 用来决定 Worker 名称、D1 / R2 资源名前缀
- `ENABLE_R2`
-`true` 时启用 R2
- 不填或填其他值时,默认不开启
@@ -141,7 +137,6 @@
就会继续复用原来的:
- D1
- KV
- R2如果启用了
不会因为同步代码就把数据重建掉。
@@ -154,25 +149,6 @@
- **D1**
### 推荐
- **KV**
用途:
- Logo 搜索缓存
- 通知去重
- 导入预览缓存
默认会启用 KV。
如果你在仓库 Variables 里把 `ENABLE_KV` 明确设为 `false`,系统仍然可以运行,但会有功能退化。
KV 关闭后的主要退化包括:
- 读取类接口不再享受 Worker Lite 的短 TTL 缓存
- Logo 搜索缓存失效,重复搜索会更慢
- 通知去重与导入预览状态会退回到更重的数据库路径
### 可选
- **R2**
@@ -214,7 +190,7 @@ https://api.resend.com/emails
这个分支面向 **Cloudflare Worker Free**,因此实现上做了明确的 Lite 化裁剪,而不是完全复刻 Docker 版的运行模型。
### 1. 30 秒短 TTL 缓存
### 1. isolate 内存短 TTL 缓存
以下读取类接口默认会缓存 **30 秒**
@@ -231,12 +207,13 @@ https://api.resend.com/emails
- Worker Free 每次 HTTP 请求只有 **10ms CPU**
- 同一页面会并发读取多份数据
- 如果每次都实时重算,很容易撞上 `Worker exceeded CPU time limit`
- 同时避免继续消耗 KV Free 的每日写入额度
这 30 秒缓存的副作用是:
- 刚修改完数据后,其他页面在极短时间内可能看到旧数据
- 统计页、日历页、标签页在 30 秒窗口内可能不是绝对实时
- 如果关闭 KV只能退回到 isolate 内存缓存,跨实例缓存一致性会更弱
- 这是 isolate 缓存,不保证跨实例强一致
当前实现已经对主要写操作做了主动失效,所以正常情况下:
@@ -249,7 +226,17 @@ https://api.resend.com/emails
> **30 秒缓存带来的不是“数据一定延迟 30 秒”,而是“最多可能有短时间旧数据”。**
### 2. Logo 搜索是 Lite 版
### 2. 不使用 KV
当前分支不绑定 KV原因是 Cloudflare Workers Free 的 KV 写入额度只有 **1000/day**,对于本项目的缓存失效、通知去重和导入预览场景来说过于紧张。
现在这些能力的处理方式是:
- 读取类接口:使用 isolate 内存短缓存
- 通知去重:回到 D1
- 导入预览 token回到 D1
- Logo 搜索缓存:仅保留进程内短缓存
### 3. Logo 搜索是 Lite 版
Worker Lite 的 Logo 搜索只保留:
@@ -269,7 +256,7 @@ Worker Lite 的 Logo 搜索只保留:
- 搜索结果质量不如 Docker / main 分支
- 偶尔会混入不够理想的候选图
### 3. Cron 已拆成 Lite 版职责
### 4. Cron 已拆成 Lite 版职责
当前默认触发器是:
@@ -286,7 +273,7 @@ Worker Lite 的 Logo 搜索只保留:
- 而是采用 **5 分钟窗口命中**
- 以换取更稳定的执行
### 4. 错误提示会明确说明 Worker 限制
### 5. 错误提示会明确说明 Worker 限制
前端遇到:
@@ -319,13 +306,6 @@ npm run deploy:worker:r2
- 优先读取环境变量里的 Cloudflare 凭据
- 如果没有配置 `CLOUDFLARE_API_TOKEN`,会尝试走 `wrangler login` 浏览器授权
如果你本地想关闭 KV也可以临时设置
```powershell
$env:ENABLE_KV='false'
npm run deploy:worker
```
---
## 十、本地开发(开发者可选)

View File

@@ -44,7 +44,8 @@
这个分支面向 **Cloudflare Worker Free**,因此会主动做一些性能取舍来换稳定性:
- 读取类接口默认使用 **30 秒短 TTL 缓存**
- 读取类接口使用 **isolate 内存短 TTL 缓存**
- **不使用 KV**
- Logo 搜索是 Lite 版,只保留网站候选 + DuckDuckGo
- Cron 已拆成更轻的 Worker 版职责
- 遇到 `503` / CPU 超限时,前端会明确提示可能受 Worker 免费版限制影响
@@ -62,7 +63,7 @@
## 技术栈
- **前端**Vue 3、Vite、TypeScript、Naive UI、Pinia、TanStack Query、ECharts
- **后端**Cloudflare Worker、Hono、Prisma D1 Adapter、D1、KV、可选 R2
- **后端**Cloudflare Worker、Hono、Prisma D1 Adapter、D1、可选 R2
## 本地开发
@@ -114,7 +115,6 @@ npm test
常用仓库 Variables
- `WORKER_NAME_PREFIX`
- `ENABLE_KV`(默认开启)
- `ENABLE_R2`(默认关闭)
## 工作流

View File

@@ -126,6 +126,27 @@ model WebhookDelivery {
@@index([status, createdAt])
}
model NotificationDelivery {
id String @id @default(cuid())
channel String
eventType String
resourceKey String
periodKey String
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
@@unique([channel, eventType, resourceKey, periodKey])
}
model ImportPreview {
token String @id
previewJson Json
expiresAt DateTime
createdAt DateTime @default(now())
@@index([expiresAt])
}
model Setting {
key String @id
valueJson Json

View File

@@ -1,9 +1,9 @@
import dayjs from 'dayjs'
import type { EmailConfigInput, PushPlusConfigInput, TelegramConfigInput, WebhookEventType } from '@subtracker/shared'
import { config } from '../config'
import { getWorkerCache } from '../runtime'
import { dispatchWebhookEvent } from './webhook.service'
import { getNotificationChannelSettings, getSetting, setSetting } from './settings.service'
import { getNotificationChannelSettings } from './settings.service'
import { claimNotificationDelivery, releaseNotificationDelivery } from './worker-lite-state.service'
type NotificationDispatchParams = {
eventType: WebhookEventType
@@ -57,42 +57,6 @@ export type NotificationChannelResult = {
message?: string
}
function buildNotificationKey(
channel: 'email' | 'pushplus' | 'telegram',
params: NotificationDispatchParams
) {
return `notification:${channel}:${params.eventType}:${params.resourceKey}:${params.periodKey}`
}
async function hasNotificationBeenSent(
channel: 'email' | 'pushplus' | 'telegram',
params: NotificationDispatchParams
) {
const key = buildNotificationKey(channel, params)
const cache = getWorkerCache()
if (cache) {
return Boolean(await cache.get(key))
}
return getSetting<boolean>(key, false)
}
async function markNotificationSent(
channel: 'email' | 'pushplus' | 'telegram',
params: NotificationDispatchParams
) {
const key = buildNotificationKey(channel, params)
const cache = getWorkerCache()
if (cache) {
await cache.put(key, '1', {
expirationTtl: 60 * 60 * 24 * 14
})
return
}
await setSetting(key, true)
}
function getMergedSubscriptions(params: NotificationDispatchParams) {
const subscriptions = params.payload.subscriptions
return Array.isArray(subscriptions) ? (subscriptions as NotificationSubscriptionItem[]) : []
@@ -284,8 +248,13 @@ async function sendEmailNotification(params: NotificationDispatchParams): Promis
}
}
const alreadySent = await hasNotificationBeenSent('email', params)
if (alreadySent) {
const claimed = await claimNotificationDelivery({
channel: 'email',
eventType: params.eventType,
resourceKey: params.resourceKey,
periodKey: params.periodKey
})
if (!claimed) {
return {
channel: 'email',
status: 'skipped',
@@ -293,8 +262,17 @@ async function sendEmailNotification(params: NotificationDispatchParams): Promis
}
}
await sendEmailWithConfig(params, settings.emailConfig)
await markNotificationSent('email', params)
try {
await sendEmailWithConfig(params, settings.emailConfig)
} catch (error) {
await releaseNotificationDelivery({
channel: 'email',
eventType: params.eventType,
resourceKey: params.resourceKey,
periodKey: params.periodKey
})
throw error
}
return {
channel: 'email',
@@ -378,8 +356,13 @@ async function sendPushplusNotification(params: NotificationDispatchParams): Pro
}
}
const alreadySent = await hasNotificationBeenSent('pushplus', params)
if (alreadySent) {
const claimed = await claimNotificationDelivery({
channel: 'pushplus',
eventType: params.eventType,
resourceKey: params.resourceKey,
periodKey: params.periodKey
})
if (!claimed) {
return {
channel: 'pushplus',
status: 'skipped',
@@ -387,8 +370,17 @@ async function sendPushplusNotification(params: NotificationDispatchParams): Pro
}
}
await sendPushplusWithConfig(params, settings.pushplusConfig)
await markNotificationSent('pushplus', params)
try {
await sendPushplusWithConfig(params, settings.pushplusConfig)
} catch (error) {
await releaseNotificationDelivery({
channel: 'pushplus',
eventType: params.eventType,
resourceKey: params.resourceKey,
periodKey: params.periodKey
})
throw error
}
return {
channel: 'pushplus',
@@ -440,8 +432,13 @@ async function sendTelegramNotification(params: NotificationDispatchParams): Pro
}
}
const alreadySent = await hasNotificationBeenSent('telegram', params)
if (alreadySent) {
const claimed = await claimNotificationDelivery({
channel: 'telegram',
eventType: params.eventType,
resourceKey: params.resourceKey,
periodKey: params.periodKey
})
if (!claimed) {
return {
channel: 'telegram',
status: 'skipped',
@@ -449,8 +446,17 @@ async function sendTelegramNotification(params: NotificationDispatchParams): Pro
}
}
await sendTelegramWithConfig(params, settings.telegramConfig)
await markNotificationSent('telegram', params)
try {
await sendTelegramWithConfig(params, settings.telegramConfig)
} catch (error) {
await releaseNotificationDelivery({
channel: 'telegram',
eventType: params.eventType,
resourceKey: params.resourceKey,
periodKey: params.periodKey
})
throw error
}
return {
channel: 'telegram',
@@ -512,16 +518,20 @@ function buildTestReminderPayload() {
}
export async function sendTestEmailNotification() {
const result = await sendEmailNotification({
eventType: 'subscription.reminder_due',
resourceKey: 'test:email',
periodKey: `${new Date().toISOString().slice(0, 10)}:upcoming`,
payload: buildTestReminderPayload()
})
if (result.status !== 'success') {
const settings = await getNotificationChannelSettings()
if (!settings.emailNotificationsEnabled) {
throw new Error('邮箱通知未启用或配置不完整')
}
await sendEmailWithConfig(
{
eventType: 'subscription.reminder_due',
resourceKey: 'test:email',
periodKey: `${new Date().toISOString().slice(0, 10)}:upcoming`,
payload: buildTestReminderPayload()
},
settings.emailConfig
)
}
export async function sendTestEmailNotificationWithConfig(config: EmailConfigInput) {
@@ -537,17 +547,21 @@ export async function sendTestEmailNotificationWithConfig(config: EmailConfigInp
}
export async function sendTestPushplusNotification() {
const result = await sendPushplusNotification({
eventType: 'subscription.reminder_due',
resourceKey: 'test:pushplus',
periodKey: `${new Date().toISOString().slice(0, 10)}:upcoming`,
payload: buildTestReminderPayload()
})
if (result.status !== 'success') {
const settings = await getNotificationChannelSettings()
if (!settings.pushplusNotificationsEnabled) {
throw new Error('PushPlus 通知未启用或配置不完整')
}
await sendPushplusWithConfig(
{
eventType: 'subscription.reminder_due',
resourceKey: 'test:pushplus',
periodKey: `${new Date().toISOString().slice(0, 10)}:upcoming`,
payload: buildTestReminderPayload()
},
settings.pushplusConfig
)
return {
accepted: true,
message: 'PushPlus 已使用保存的配置发送测试请求'
@@ -567,17 +581,21 @@ export async function sendTestPushplusNotificationWithConfig(config: PushPlusCon
}
export async function sendTestTelegramNotification() {
const result = await sendTelegramNotification({
eventType: 'subscription.reminder_due',
resourceKey: 'test:telegram',
periodKey: `${new Date().toISOString().slice(0, 10)}:upcoming`,
payload: buildTestReminderPayload()
})
if (result.status !== 'success') {
const settings = await getNotificationChannelSettings()
if (!settings.telegramNotificationsEnabled) {
throw new Error('Telegram 通知未启用或配置不完整')
}
await sendTelegramWithConfig(
{
eventType: 'subscription.reminder_due',
resourceKey: 'test:telegram',
periodKey: `${new Date().toISOString().slice(0, 10)}:upcoming`,
payload: buildTestReminderPayload()
},
settings.telegramConfig
)
return { success: true }
}

View File

@@ -52,15 +52,24 @@ export async function refreshExchangeRates(baseCurrency?: string) {
throw new Error('Rate payload is empty')
}
return await prisma.exchangeRateSnapshot.upsert({
where: { baseCurrency: base },
update: {
ratesJson: rates,
provider: config.exchangeRateProvider,
fetchedAt: new Date(),
isStale: false
},
create: {
const existing = await prisma.exchangeRateSnapshot.findUnique({
where: { baseCurrency: base }
})
if (existing) {
return await prisma.exchangeRateSnapshot.update({
where: { baseCurrency: base },
data: {
ratesJson: rates,
provider: config.exchangeRateProvider,
fetchedAt: new Date(),
isStale: false
}
})
}
return await prisma.exchangeRateSnapshot.create({
data: {
baseCurrency: base,
ratesJson: rates,
provider: config.exchangeRateProvider,

View File

@@ -1,12 +1,11 @@
import crypto from 'node:crypto'
import { prisma } from '../db'
import type { LogoSearchInput, LogoSearchResultDto, LogoUploadInput } from '@subtracker/shared'
import { getWorkerCache, getWorkerLogoBucket } from '../runtime'
import { getWorkerLogoBucket } from '../runtime'
const SEARCH_USER_AGENT =
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36'
const LOGO_REQUEST_TIMEOUT_MS = 20000
const SEARCH_CACHE_TTL_SECONDS = 5 * 60
const SEARCH_CACHE_TTL_MS = 5 * 60 * 1000
const SEARCH_CACHE_PREFIX = 'logo-search:worker-parity:'
const MAX_DUCKDUCKGO_CANDIDATES = 12
@@ -135,15 +134,6 @@ function makeCandidate(
}
async function cacheGet(key: string) {
const kv = getWorkerCache()
if (kv) {
const raw = await kv.get(key, 'json')
if (!raw) return null
const entry = raw as SearchCacheEntry
if (entry.expiresAt <= Date.now()) return null
return entry
}
const cached = searchCache.get(key)
if (!cached || cached.expiresAt <= Date.now()) {
searchCache.delete(key)
@@ -158,14 +148,6 @@ async function cacheSet(key: string, items: LogoSearchResultDto[]) {
items
}
const kv = getWorkerCache()
if (kv) {
await kv.put(key, JSON.stringify(entry), {
expirationTtl: SEARCH_CACHE_TTL_SECONDS
})
return
}
searchCache.set(key, entry)
}

View File

@@ -16,6 +16,7 @@ import {
resolveDefaultAdvanceReminderRules,
resolveDefaultOverdueReminderRules
} from './reminder-rules.service'
import { invalidateWorkerLiteCache } from './worker-lite-cache.service'
import { getSettingLite, listSettingsLite, setSettingLite } from './worker-lite-repository.service'
export async function getSetting<T>(key: string, fallback: T): Promise<T> {
@@ -24,6 +25,7 @@ export async function getSetting<T>(key: string, fallback: T): Promise<T> {
export async function setSetting<T>(key: string, value: T): Promise<void> {
await setSettingLite(key, value)
await invalidateWorkerLiteCache(['settings'])
}
function readSettingsValue<T>(settingsMap: Map<string, unknown>, key: string, fallback: T): T {

View File

@@ -24,28 +24,39 @@ export async function renewSubscription(subscriptionId: string, paidAt?: Date, p
subscription.billingIntervalUnit
)
const [payment, updated] = await prisma.$transaction([
prisma.paymentRecord.create({
data: {
subscriptionId: subscription.id,
amount,
currency,
baseCurrency,
convertedAmount,
exchangeRate,
paidAt: paidAt ?? new Date(),
periodStart,
periodEnd
}
}),
prisma.subscription.update({
const payment = await prisma.paymentRecord.create({
data: {
subscriptionId: subscription.id,
amount,
currency,
baseCurrency,
convertedAmount,
exchangeRate,
paidAt: paidAt ?? new Date(),
periodStart,
periodEnd
}
})
let updated
try {
updated = await prisma.subscription.update({
where: { id: subscription.id },
data: {
nextRenewalDate: periodEnd,
status: 'active'
}
})
])
} catch (error) {
try {
await prisma.paymentRecord.delete({
where: { id: payment.id }
})
} catch {
// ignore compensation failures and surface the original update error
}
throw error
}
return {
payment,
@@ -87,17 +98,26 @@ export async function autoRenewDueSubscriptions(today = new Date()) {
export async function reconcileExpiredSubscriptions(today = new Date()) {
const cutoff = dayjs(today).startOf('day').toDate()
const result = await prisma.subscription.updateMany({
const rows = await prisma.subscription.findMany({
where: {
status: 'active',
nextRenewalDate: {
lt: cutoff
}
},
data: {
status: 'expired'
select: {
id: true
}
})
return result.count
for (const row of rows) {
await prisma.subscription.update({
where: { id: row.id },
data: {
status: 'expired'
}
})
}
return rows.length
}

View File

@@ -1,4 +1,5 @@
import type { Prisma, PrismaClient } from '@prisma/client'
import { isWorkerRuntime } from '../runtime'
type DbClient = Prisma.TransactionClient | PrismaClient
@@ -13,6 +14,18 @@ export async function replaceSubscriptionTags(db: DbClient, subscriptionId: stri
if (tagIds.length === 0) return
if (isWorkerRuntime()) {
for (const tagId of tagIds) {
await db.subscriptionTag.create({
data: {
subscriptionId,
tagId
}
})
}
return
}
await db.subscriptionTag.createMany({
data: tagIds.map((tagId) => ({
subscriptionId,

View File

@@ -9,13 +9,13 @@ import type {
WallosImportTagDto
} from '@subtracker/shared'
import { prisma } from '../db'
import { getWorkerCache } from '../runtime'
import { getAppSettings, getSetting, setSetting } from './settings.service'
import { isWorkerRuntime } from '../runtime'
import { getAppSettings } from './settings.service'
import { appendSubscriptionOrders } from './subscription-order.service'
import { deleteImportPreview, getImportPreview, storeImportPreview } from './worker-lite-state.service'
const IMPORT_TOKEN_TTL_SECONDS = 15 * 60
const IMPORT_TOKEN_TTL_MS = IMPORT_TOKEN_TTL_SECONDS * 1000
const IMPORT_PREVIEW_PREFIX = 'wallosImportPreview:'
const IMPORT_TAG_COLORS = [
'#3b82f6',
'#8b5cf6',
@@ -30,10 +30,6 @@ const IMPORT_TAG_COLORS = [
]
type WallosJsonRow = Record<string, unknown>
type StoredPreviewEntry = {
expiresAt: number
preview: WallosImportInspectResultDto
}
function decodeBase64(value: string) {
const binary = atob(value.trim())
@@ -58,37 +54,6 @@ function createImportId() {
return `c${timestamp}${random}`.slice(0, 25)
}
async function getStoredPreview(token: string) {
const kv = getWorkerCache()
if (kv) {
return (await kv.get(`${IMPORT_PREVIEW_PREFIX}${token}`, 'json')) as StoredPreviewEntry | null
}
return getSetting<StoredPreviewEntry | null>(`${IMPORT_PREVIEW_PREFIX}${token}`, null)
}
async function setStoredPreview(token: string, entry: StoredPreviewEntry) {
const kv = getWorkerCache()
if (kv) {
await kv.put(`${IMPORT_PREVIEW_PREFIX}${token}`, JSON.stringify(entry), {
expirationTtl: IMPORT_TOKEN_TTL_SECONDS
})
return
}
await setSetting(`${IMPORT_PREVIEW_PREFIX}${token}`, entry)
}
async function deleteStoredPreview(token: string) {
const kv = getWorkerCache()
if (kv) {
await kv.delete(`${IMPORT_PREVIEW_PREFIX}${token}`)
return
}
await setSetting(`${IMPORT_PREVIEW_PREFIX}${token}`, null)
}
function normalizeWallosTagName(name: string | null | undefined) {
const value = String(name ?? '').trim()
if (!value) return null
@@ -416,23 +381,19 @@ export async function inspectWallosImportFile(input: WallosImportInspectInput):
importToken: token
}
await setStoredPreview(token, {
expiresAt: Date.now() + IMPORT_TOKEN_TTL_MS,
preview
})
await storeImportPreview(token, preview, IMPORT_TOKEN_TTL_MS)
return preview
}
export async function commitWallosImport(input: WallosImportCommitInput): Promise<WallosImportCommitResultDto> {
const entry = await getStoredPreview(input.importToken)
if (!entry || entry.expiresAt <= Date.now()) {
await deleteStoredPreview(input.importToken)
const preview = await getImportPreview(input.importToken)
if (!preview) {
await deleteImportPreview(input.importToken)
throw new Error('导入令牌不存在或已失效,请重新生成预览')
}
const preview = entry.preview
await deleteStoredPreview(input.importToken)
await deleteImportPreview(input.importToken)
const existingTags = await prisma.tag.findMany({
where: {
@@ -448,13 +409,25 @@ export async function commitWallosImport(input: WallosImportCommitInput): Promis
const missingTags = preview.usedTags.filter((tag) => !tagIdByName.has(tag.name))
if (missingTags.length > 0) {
await prisma.tag.createMany({
data: missingTags.map((tag) => ({
name: tag.name,
color: getImportedTagColor(tag.name),
sortOrder: tag.sortOrder
}))
})
if (isWorkerRuntime()) {
for (const tag of missingTags) {
await prisma.tag.create({
data: {
name: tag.name,
color: getImportedTagColor(tag.name),
sortOrder: tag.sortOrder
}
})
}
} else {
await prisma.tag.createMany({
data: missingTags.map((tag) => ({
name: tag.name,
color: getImportedTagColor(tag.name),
sortOrder: tag.sortOrder
}))
})
}
importedTags = missingTags.length
const refreshedTags = await prisma.tag.findMany({
@@ -524,15 +497,31 @@ export async function commitWallosImport(input: WallosImportCommitInput): Promis
}
if (subscriptionRows.length > 0) {
await prisma.subscription.createMany({
data: subscriptionRows
})
if (isWorkerRuntime()) {
for (const row of subscriptionRows) {
await prisma.subscription.create({
data: row
})
}
} else {
await prisma.subscription.createMany({
data: subscriptionRows
})
}
}
if (subscriptionTagRows.length > 0) {
await prisma.subscriptionTag.createMany({
data: subscriptionTagRows
})
if (isWorkerRuntime()) {
for (const row of subscriptionTagRows) {
await prisma.subscriptionTag.create({
data: row
})
}
} else {
await prisma.subscriptionTag.createMany({
data: subscriptionTagRows
})
}
}
await appendSubscriptionOrders(createdSubscriptionIds)

View File

@@ -220,7 +220,7 @@ export async function dispatchWebhookEvent(params: {
payload: params.payload
})
await prisma.webhookDelivery.upsert({
const existing = await prisma.webhookDelivery.findUnique({
where: {
eventType_resourceKey_periodKey: {
eventType: params.eventType,
@@ -228,35 +228,46 @@ export async function dispatchWebhookEvent(params: {
periodKey: params.periodKey
}
},
update: {
subscriptionId: params.subscriptionId ?? null,
targetUrl: endpoint.url,
requestMethod: endpoint.requestMethod,
payloadJson: params.payload as Prisma.InputJsonValue,
status: result.statusCode >= 400 ? 'failed' : 'success',
responseCode: result.statusCode,
responseBody: result.responseBody,
attemptCount: {
increment: 1
},
lastAttemptAt: new Date()
},
create: {
subscriptionId: params.subscriptionId ?? null,
eventType: params.eventType,
resourceKey: params.resourceKey,
periodKey: params.periodKey,
targetUrl: endpoint.url,
requestMethod: endpoint.requestMethod,
payloadJson: params.payload as Prisma.InputJsonValue,
status: result.statusCode >= 400 ? 'failed' : 'success',
responseCode: result.statusCode,
responseBody: result.responseBody,
attemptCount: 1,
lastAttemptAt: new Date()
select: {
id: true,
attemptCount: true
}
})
if (existing) {
await prisma.webhookDelivery.update({
where: { id: existing.id },
data: {
subscriptionId: params.subscriptionId ?? null,
targetUrl: endpoint.url,
requestMethod: endpoint.requestMethod,
payloadJson: params.payload as Prisma.InputJsonValue,
status: result.statusCode >= 400 ? 'failed' : 'success',
responseCode: result.statusCode,
responseBody: result.responseBody,
attemptCount: existing.attemptCount + 1,
lastAttemptAt: new Date()
}
})
} else {
await prisma.webhookDelivery.create({
data: {
subscriptionId: params.subscriptionId ?? null,
eventType: params.eventType,
resourceKey: params.resourceKey,
periodKey: params.periodKey,
targetUrl: endpoint.url,
requestMethod: endpoint.requestMethod,
payloadJson: params.payload as Prisma.InputJsonValue,
status: result.statusCode >= 400 ? 'failed' : 'success',
responseCode: result.statusCode,
responseBody: result.responseBody,
attemptCount: 1,
lastAttemptAt: new Date()
}
})
}
if (result.statusCode >= 400) {
throw new Error(`Webhook dispatch failed: HTTP ${result.statusCode}`)
}

View File

@@ -1,9 +1,5 @@
import { getWorkerCache } from '../runtime'
const DEFAULT_CACHE_TTL_SECONDS = 30
const KV_MIN_EXPIRATION_TTL_SECONDS = 60
const CACHE_VALUE_PREFIX = 'liteCache:'
const CACHE_VERSION_PREFIX = 'liteCacheVersion:'
const CACHE_KEY_PREFIX = 'liteMemoryCache:'
type CacheEntry<T> = {
expiresAt: number
@@ -11,21 +7,12 @@ type CacheEntry<T> = {
}
const memoryCache = new Map<string, CacheEntry<unknown>>()
const memoryVersions = new Map<string, number>()
const inflightLoads = new Map<string, Promise<unknown>>()
function now() {
return Date.now()
}
function getMemoryVersion(namespace: string) {
return memoryVersions.get(namespace) ?? 0
}
function setMemoryVersion(namespace: string, version: number) {
memoryVersions.set(namespace, version)
}
function getMemoryCache<T>(key: string) {
const cached = memoryCache.get(key) as CacheEntry<T> | undefined
if (!cached) return null
@@ -43,50 +30,11 @@ function setMemoryCache<T>(key: string, value: T, ttlSeconds: number) {
})
}
async function getNamespaceVersion(namespace: string) {
const cache = getWorkerCache()
if (!cache) {
return getMemoryVersion(namespace)
}
const raw = await cache.get(`${CACHE_VERSION_PREFIX}${namespace}`, 'text')
return raw ? Number(raw) || 0 : 0
}
async function bumpNamespaceVersion(namespace: string) {
const nextVersion = (await getNamespaceVersion(namespace)) + 1
const cache = getWorkerCache()
if (cache) {
await cache.put(`${CACHE_VERSION_PREFIX}${namespace}`, String(nextVersion))
}
setMemoryVersion(namespace, nextVersion)
}
async function getCacheValue<T>(key: string) {
const cache = getWorkerCache()
if (cache) {
const cached = (await cache.get(key, 'json')) as CacheEntry<T> | null
if (cached && cached.expiresAt > now()) {
return cached.value
}
}
return getMemoryCache<T>(key)
}
async function setCacheValue<T>(key: string, value: T, ttlSeconds: number) {
const entry: CacheEntry<T> = {
value,
expiresAt: now() + ttlSeconds * 1000
}
const cache = getWorkerCache()
if (cache) {
await cache.put(key, JSON.stringify(entry), {
expirationTtl: Math.max(ttlSeconds, KV_MIN_EXPIRATION_TTL_SECONDS)
})
}
setMemoryCache(key, value, ttlSeconds)
}
@@ -96,8 +44,7 @@ export async function withWorkerLiteCache<T>(
loader: () => Promise<T>,
ttlSeconds = DEFAULT_CACHE_TTL_SECONDS
) {
const version = await getNamespaceVersion(namespace)
const resolvedKey = `${CACHE_VALUE_PREFIX}${namespace}:v${version}:${cacheKey}`
const resolvedKey = `${CACHE_KEY_PREFIX}${namespace}:${cacheKey}`
const cached = await getCacheValue<T>(resolvedKey)
if (cached !== null) {
return cached
@@ -121,5 +68,12 @@ export async function withWorkerLiteCache<T>(
}
export async function invalidateWorkerLiteCache(namespaces: string[]) {
await Promise.all(Array.from(new Set(namespaces)).map((namespace) => bumpNamespaceVersion(namespace)))
for (const namespace of new Set(namespaces)) {
const prefix = `${CACHE_KEY_PREFIX}${namespace}:`
for (const key of Array.from(memoryCache.keys())) {
if (key.startsWith(prefix)) {
memoryCache.delete(key)
}
}
}
}

View File

@@ -303,10 +303,8 @@ export async function updateTagLite(
export async function deleteTagLite(id: string) {
if (!getD1()) {
await prisma.$transaction([
prisma.subscriptionTag.deleteMany({ where: { tagId: id } }),
prisma.tag.delete({ where: { id } })
])
await prisma.subscriptionTag.deleteMany({ where: { tagId: id } })
await prisma.tag.delete({ where: { id } })
return
}

View File

@@ -0,0 +1,202 @@
import { Prisma } from '@prisma/client'
import type { WallosImportInspectResultDto, WebhookEventType } from '@subtracker/shared'
import { prisma } from '../db'
import { getRuntimeD1Database, isWorkerRuntime } from '../runtime'
type NotificationChannel = 'email' | 'pushplus' | 'telegram'
type ImportPreviewRow = {
token: string
previewJson: unknown
expiresAt: string
}
function getD1() {
if (!isWorkerRuntime()) return null
return getRuntimeD1Database()
}
async function d1First<T>(sql: string, params: unknown[] = []) {
const db = getD1()
if (!db) {
throw new Error('D1 unavailable')
}
const statement = db.prepare(sql)
const executed = params.length ? statement.bind(...params) : statement
return (await executed.first<T>()) as T | null
}
async function d1Run(sql: string, params: unknown[] = []) {
const db = getD1()
if (!db) {
throw new Error('D1 unavailable')
}
const statement = db.prepare(sql)
const executed = params.length ? statement.bind(...params) : statement
return executed.run()
}
function createCuidLike() {
const random = crypto.randomUUID().replaceAll('-', '').slice(0, 16)
const timestamp = Date.now().toString(36).slice(-8)
return `c${timestamp}${random}`.slice(0, 25)
}
function parseJsonValue<T>(value: unknown) {
if (value === null || value === undefined) {
throw new Error('State JSON value is empty')
}
if (typeof value === 'object') {
if (value instanceof Uint8Array) {
return JSON.parse(new TextDecoder().decode(value)) as T
}
if (value instanceof ArrayBuffer) {
return JSON.parse(new TextDecoder().decode(new Uint8Array(value))) as T
}
if (typeof Buffer !== 'undefined' && Buffer.isBuffer(value)) {
return JSON.parse(value.toString('utf8')) as T
}
return value as T
}
return JSON.parse(String(value)) as T
}
function isUniqueConstraintError(error: unknown) {
if (error instanceof Prisma.PrismaClientKnownRequestError) {
return error.code === 'P2002'
}
const message = error instanceof Error ? error.message : String(error)
return /unique/i.test(message)
}
export async function claimNotificationDelivery(params: {
channel: NotificationChannel
eventType: WebhookEventType
resourceKey: string
periodKey: string
}) {
if (!getD1()) {
try {
await prisma.notificationDelivery.create({
data: {
channel: params.channel,
eventType: params.eventType,
resourceKey: params.resourceKey,
periodKey: params.periodKey
}
})
return true
} catch (error) {
if (isUniqueConstraintError(error)) {
return false
}
throw error
}
}
try {
await d1Run(
`INSERT INTO NotificationDelivery (id, channel, eventType, resourceKey, periodKey, createdAt, updatedAt)
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)`,
[createCuidLike(), params.channel, params.eventType, params.resourceKey, params.periodKey]
)
return true
} catch (error) {
if (isUniqueConstraintError(error)) {
return false
}
throw error
}
}
export async function releaseNotificationDelivery(params: {
channel: NotificationChannel
eventType: WebhookEventType
resourceKey: string
periodKey: string
}) {
if (!getD1()) {
await prisma.notificationDelivery.deleteMany({
where: {
channel: params.channel,
eventType: params.eventType,
resourceKey: params.resourceKey,
periodKey: params.periodKey
}
})
return
}
await d1Run(
`DELETE FROM NotificationDelivery
WHERE channel = ? AND eventType = ? AND resourceKey = ? AND periodKey = ?`,
[params.channel, params.eventType, params.resourceKey, params.periodKey]
)
}
export async function storeImportPreview(token: string, preview: WallosImportInspectResultDto, ttlMs: number) {
const expiresAt = new Date(Date.now() + ttlMs)
if (!getD1()) {
await prisma.importPreview.upsert({
where: { token },
update: {
previewJson: preview as unknown as Prisma.InputJsonValue,
expiresAt
},
create: {
token,
previewJson: preview as unknown as Prisma.InputJsonValue,
expiresAt
}
})
return
}
await d1Run(
`INSERT INTO ImportPreview (token, previewJson, expiresAt, createdAt)
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(token) DO UPDATE SET previewJson = excluded.previewJson, expiresAt = excluded.expiresAt`,
[token, JSON.stringify(preview), expiresAt.toISOString()]
)
}
export async function getImportPreview(token: string) {
if (!getD1()) {
const row = await prisma.importPreview.findUnique({ where: { token } })
if (!row) return null
if (row.expiresAt.getTime() <= Date.now()) {
await prisma.importPreview.deleteMany({ where: { token } })
return null
}
return row.previewJson as unknown as WallosImportInspectResultDto
}
const row = await d1First<ImportPreviewRow>(
`SELECT token, previewJson, expiresAt
FROM ImportPreview
WHERE token = ?
LIMIT 1`,
[token]
)
if (!row) return null
if (new Date(row.expiresAt).getTime() <= Date.now()) {
await deleteImportPreview(token)
return null
}
return parseJsonValue<WallosImportInspectResultDto>(row.previewJson)
}
export async function deleteImportPreview(token: string) {
if (!getD1()) {
await prisma.importPreview.deleteMany({ where: { token } })
return
}
await d1Run('DELETE FROM ImportPreview WHERE token = ?', [token])
}

View File

@@ -122,6 +122,33 @@ const schemaStatements = [
ON "WebhookDelivery"("status", "createdAt")
`,
`
CREATE TABLE IF NOT EXISTS "NotificationDelivery" (
"id" TEXT NOT NULL PRIMARY KEY,
"channel" TEXT NOT NULL,
"eventType" TEXT NOT NULL,
"resourceKey" TEXT NOT NULL,
"periodKey" TEXT NOT NULL,
"createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
)
`,
`
CREATE UNIQUE INDEX IF NOT EXISTS "NotificationDelivery_channel_eventType_resourceKey_periodKey_key"
ON "NotificationDelivery"("channel", "eventType", "resourceKey", "periodKey")
`,
`
CREATE TABLE IF NOT EXISTS "ImportPreview" (
"token" TEXT NOT NULL PRIMARY KEY,
"previewJson" TEXT NOT NULL,
"expiresAt" DATETIME NOT NULL,
"createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
)
`,
`
CREATE INDEX IF NOT EXISTS "ImportPreview_expiresAt_idx"
ON "ImportPreview"("expiresAt")
`,
`
CREATE TABLE IF NOT EXISTS "Setting" (
"key" TEXT NOT NULL PRIMARY KEY,
"valueJson" TEXT NOT NULL,

View File

@@ -3,12 +3,6 @@ import { beforeEach, describe, expect, it, vi } from 'vitest'
const tagRouteMocks = vi.hoisted(() => ({
prisma: {
$transaction: vi.fn(async (operations: unknown) => {
if (typeof operations === 'function') {
throw new Error('interactive transaction should not be used for tag deletion')
}
return operations
}),
subscriptionTag: {
deleteMany: vi.fn(async () => ({ count: 1 }))
},
@@ -42,8 +36,12 @@ describe('tag routes D1 compatibility', () => {
})
expect(response.statusCode).toBe(200)
expect(tagRouteMocks.prisma.$transaction).toHaveBeenCalledTimes(1)
expect(typeof tagRouteMocks.prisma.$transaction.mock.calls[0]?.[0]).not.toBe('function')
expect(tagRouteMocks.prisma.subscriptionTag.deleteMany).toHaveBeenCalledWith({
where: { tagId: 'cksubtracker0000000000000tag1' }
})
expect(tagRouteMocks.prisma.tag.delete).toHaveBeenCalledWith({
where: { id: 'cksubtracker0000000000000tag1' }
})
await app.close()
})

View File

@@ -1,9 +1,5 @@
import { beforeEach, describe, expect, it, vi } from 'vitest'
vi.mock('../../src/runtime', () => ({
getWorkerCache: vi.fn(() => undefined)
}))
vi.mock('../../src/config', () => ({
config: {
resendApiUrl: 'https://api.resend.com/emails'
@@ -11,7 +7,7 @@ vi.mock('../../src/config', () => ({
}))
vi.mock('../../src/services/settings.service', () => ({
getAppSettings: vi.fn(async () => ({
getNotificationChannelSettings: vi.fn(async () => ({
emailNotificationsEnabled: false,
pushplusNotificationsEnabled: false,
telegramNotificationsEnabled: false,
@@ -30,9 +26,7 @@ vi.mock('../../src/services/settings.service', () => ({
botToken: '',
chatId: ''
}
})),
getSetting: vi.fn(async () => false),
setSetting: vi.fn(async () => undefined)
}))
}))
vi.mock('../../src/services/webhook.service', () => ({
@@ -43,6 +37,14 @@ vi.mock('../../src/services/webhook.service', () => ({
}))
}))
vi.mock('../../src/services/worker-lite-state.service', () => ({
claimNotificationDelivery: vi.fn(async () => true),
releaseNotificationDelivery: vi.fn(async () => undefined),
storeImportPreview: vi.fn(),
getImportPreview: vi.fn(),
deleteImportPreview: vi.fn()
}))
import { sendTestEmailNotificationWithConfig } from '../../src/services/channel-notification.service'
function jsonResponse(payload: unknown, status = 200) {

View File

@@ -2,18 +2,13 @@ import { beforeEach, describe, expect, it, vi } from 'vitest'
const serviceMocks = vi.hoisted(() => ({
prisma: {
$transaction: vi.fn(async (operations: unknown) => {
if (typeof operations === 'function') {
throw new Error('interactive transaction should not be used in renewSubscription')
}
return Promise.all(operations as Promise<unknown>[])
}),
subscription: {
findUnique: vi.fn(),
update: vi.fn()
},
paymentRecord: {
create: vi.fn()
create: vi.fn(),
delete: vi.fn()
}
},
getBaseCurrency: vi.fn(async () => 'CNY'),
@@ -40,7 +35,7 @@ describe('subscription service D1 compatibility', () => {
vi.clearAllMocks()
})
it('renews subscriptions with batch-style transaction operations instead of interactive transactions', async () => {
it('renews subscriptions without relying on Prisma transactions', async () => {
const nextRenewalDate = new Date('2026-04-22T00:00:00.000Z')
const periodEnd = new Date('2026-05-22T00:00:00.000Z')
@@ -67,7 +62,30 @@ describe('subscription service D1 compatibility', () => {
id: 'sub_1',
status: 'active'
})
expect(serviceMocks.prisma.$transaction).toHaveBeenCalledTimes(1)
expect(typeof serviceMocks.prisma.$transaction.mock.calls[0]?.[0]).not.toBe('function')
expect(serviceMocks.prisma.paymentRecord.create).toHaveBeenCalledTimes(1)
expect(serviceMocks.prisma.subscription.update).toHaveBeenCalledTimes(1)
})
it('rolls back the payment record when subscription update fails', async () => {
const nextRenewalDate = new Date('2026-04-22T00:00:00.000Z')
serviceMocks.prisma.subscription.findUnique.mockResolvedValue({
id: 'sub_1',
amount: 25,
currency: 'CNY',
billingIntervalCount: 1,
billingIntervalUnit: 'month',
nextRenewalDate
})
serviceMocks.prisma.paymentRecord.create.mockResolvedValue({ id: 'payment_1' })
serviceMocks.prisma.subscription.update.mockRejectedValue(new Error('update failed'))
serviceMocks.prisma.paymentRecord.delete.mockResolvedValue({ id: 'payment_1' })
const { renewSubscription } = await import('../../src/services/subscription.service')
await expect(renewSubscription('sub_1')).rejects.toThrow('update failed')
expect(serviceMocks.prisma.paymentRecord.delete).toHaveBeenCalledWith({
where: { id: 'payment_1' }
})
})
})

View File

@@ -1,7 +1,5 @@
import { beforeEach, describe, expect, it, vi } from 'vitest'
const settingsStore = new Map<string, unknown>()
const { prismaMock, appendSubscriptionOrders } = vi.hoisted(() => ({
prismaMock: {
tag: {
@@ -22,18 +20,8 @@ vi.mock('../../src/db', () => ({
prisma: prismaMock
}))
vi.mock('../../src/runtime', () => ({
getWorkerCache: vi.fn(() => undefined)
}))
vi.mock('../../src/services/settings.service', () => ({
getAppSettings: vi.fn(),
getSetting: vi.fn(async <T>(key: string, fallbackValue: T) =>
(settingsStore.has(key) ? (settingsStore.get(key) as T) : fallbackValue)
),
setSetting: vi.fn(async (key: string, value: unknown) => {
settingsStore.set(key, value)
})
getAppSettings: vi.fn()
}))
vi.mock('../../src/services/subscription-order.service', () => ({
@@ -41,88 +29,93 @@ vi.mock('../../src/services/subscription-order.service', () => ({
appendSubscriptionOrders
}))
const previewState = vi.hoisted(() => ({
getImportPreview: vi.fn(),
storeImportPreview: vi.fn(),
deleteImportPreview: vi.fn()
}))
vi.mock('../../src/services/worker-lite-state.service', () => previewState)
import { commitWallosImport } from '../../src/services/wallos-import.service'
describe('commitWallosImport', () => {
beforeEach(() => {
settingsStore.clear()
prismaMock.tag.findMany.mockReset()
prismaMock.tag.createMany.mockReset()
prismaMock.subscription.createMany.mockReset()
prismaMock.subscriptionTag.createMany.mockReset()
appendSubscriptionOrders.mockClear()
previewState.getImportPreview.mockReset()
previewState.deleteImportPreview.mockReset()
previewState.getImportPreview.mockResolvedValue({
importToken: 'token-1',
isWallos: true,
summary: {
fileType: 'json',
subscriptionsTotal: 2,
tagsTotal: 2,
usedTagsTotal: 2,
supportedSubscriptions: 2,
skippedSubscriptions: 0,
globalNotifyDays: 3,
zipLogoMatched: 0,
zipLogoMissing: 0
},
usedTags: [
{ sourceId: 1, name: 'Video', sortOrder: 1 },
{ sourceId: 2, name: 'Music', sortOrder: 2 }
],
tags: [],
subscriptionsPreview: [
{
sourceId: 1,
name: 'Netflix',
amount: 10,
currency: 'USD',
status: 'active',
autoRenew: true,
billingIntervalCount: 1,
billingIntervalUnit: 'month',
startDate: '2026-04-01',
nextRenewalDate: '2026-05-01',
notifyDaysBefore: 3,
webhookEnabled: true,
notes: '',
description: '',
websiteUrl: 'https://netflix.com',
tagNames: ['Video'],
logoRef: null,
logoImportStatus: 'none',
warnings: []
},
{
sourceId: 2,
name: 'Spotify',
amount: 15,
currency: 'USD',
status: 'active',
autoRenew: true,
billingIntervalCount: 1,
billingIntervalUnit: 'month',
startDate: '2026-04-02',
nextRenewalDate: '2026-05-02',
notifyDaysBefore: 3,
webhookEnabled: true,
notes: '',
description: '',
websiteUrl: 'https://spotify.com',
tagNames: ['Music'],
logoRef: null,
logoImportStatus: 'none',
warnings: []
}
],
warnings: []
})
})
it('batches imported tags, subscription tags and subscription order writes', async () => {
settingsStore.set('wallosImportPreview:token-1', {
expiresAt: Date.now() + 60_000,
preview: {
importToken: 'token-1',
isWallos: true,
summary: {
fileType: 'json',
subscriptionsTotal: 2,
tagsTotal: 2,
usedTagsTotal: 2,
supportedSubscriptions: 2,
skippedSubscriptions: 0,
globalNotifyDays: 3,
zipLogoMatched: 0,
zipLogoMissing: 0
},
usedTags: [
{ sourceId: 1, name: 'Video', sortOrder: 1 },
{ sourceId: 2, name: 'Music', sortOrder: 2 }
],
tags: [],
subscriptionsPreview: [
{
sourceId: 1,
name: 'Netflix',
amount: 10,
currency: 'USD',
status: 'active',
autoRenew: true,
billingIntervalCount: 1,
billingIntervalUnit: 'month',
startDate: '2026-04-01',
nextRenewalDate: '2026-05-01',
notifyDaysBefore: 3,
webhookEnabled: true,
notes: '',
description: '',
websiteUrl: 'https://netflix.com',
tagNames: ['Video'],
logoRef: null,
logoImportStatus: 'none',
warnings: []
},
{
sourceId: 2,
name: 'Spotify',
amount: 15,
currency: 'USD',
status: 'active',
autoRenew: true,
billingIntervalCount: 1,
billingIntervalUnit: 'month',
startDate: '2026-04-02',
nextRenewalDate: '2026-05-02',
notifyDaysBefore: 3,
webhookEnabled: true,
notes: '',
description: '',
websiteUrl: 'https://spotify.com',
tagNames: ['Music'],
logoRef: null,
logoImportStatus: 'none',
warnings: []
}
],
warnings: []
}
})
prismaMock.tag.findMany
.mockResolvedValueOnce([])
.mockResolvedValueOnce([
@@ -158,6 +151,6 @@ describe('commitWallosImport', () => {
importedSubscriptions: 2,
skippedSubscriptions: 0
})
expect(settingsStore.get('wallosImportPreview:token-1')).toBeNull()
expect(previewState.deleteImportPreview).toHaveBeenCalledWith('token-1')
})
})

View File

@@ -0,0 +1,40 @@
import { beforeEach, describe, expect, it, vi } from 'vitest'
describe('worker-lite memory cache', () => {
beforeEach(() => {
vi.resetModules()
})
it('reuses in-memory values within ttl and inflight window', async () => {
const { withWorkerLiteCache } = await import('../../src/services/worker-lite-cache.service')
const loader = vi.fn(async () => 'cached-value')
const [first, second] = await Promise.all([
withWorkerLiteCache('settings', 'app-settings', loader, 30),
withWorkerLiteCache('settings', 'app-settings', loader, 30)
])
expect(first).toBe('cached-value')
expect(second).toBe('cached-value')
expect(loader).toHaveBeenCalledTimes(1)
const third = await withWorkerLiteCache('settings', 'app-settings', loader, 30)
expect(third).toBe('cached-value')
expect(loader).toHaveBeenCalledTimes(1)
})
it('clears current-isolate entries on namespace invalidation', async () => {
const { invalidateWorkerLiteCache, withWorkerLiteCache } = await import('../../src/services/worker-lite-cache.service')
let version = 0
const loader = vi.fn(async () => ({ version: ++version }))
const first = await withWorkerLiteCache('subscriptions', 'all', loader, 30)
expect(first).toEqual({ version: 1 })
await invalidateWorkerLiteCache(['subscriptions'])
const second = await withWorkerLiteCache('subscriptions', 'all', loader, 30)
expect(second).toEqual({ version: 2 })
expect(loader).toHaveBeenCalledTimes(2)
})
})

View File

@@ -0,0 +1,66 @@
import { beforeEach, describe, expect, it, vi } from 'vitest'
const notificationDelivery = {
create: vi.fn(),
deleteMany: vi.fn()
}
const importPreview = {
upsert: vi.fn(),
findUnique: vi.fn(),
deleteMany: vi.fn()
}
vi.mock('../../src/db', () => ({
prisma: {
notificationDelivery,
importPreview
}
}))
vi.mock('../../src/runtime', () => ({
getRuntimeD1Database: vi.fn(() => undefined),
isWorkerRuntime: vi.fn(() => false)
}))
describe('worker-lite state service', () => {
beforeEach(() => {
vi.clearAllMocks()
})
it('treats unique notification-delivery conflicts as already claimed', async () => {
const { Prisma } = await import('@prisma/client')
const { claimNotificationDelivery } = await import('../../src/services/worker-lite-state.service')
notificationDelivery.create.mockRejectedValueOnce(
new Prisma.PrismaClientKnownRequestError('duplicate', {
code: 'P2002',
clientVersion: 'test'
})
)
await expect(
claimNotificationDelivery({
channel: 'email',
eventType: 'subscription.reminder_due',
resourceKey: 'subscription:1',
periodKey: '2026-04-23:upcoming'
})
).resolves.toBe(false)
})
it('expires stored import previews on read', async () => {
const { getImportPreview } = await import('../../src/services/worker-lite-state.service')
importPreview.findUnique.mockResolvedValueOnce({
token: 'preview-token',
previewJson: { importToken: 'preview-token' },
expiresAt: new Date(Date.now() - 1000)
})
await expect(getImportPreview('preview-token')).resolves.toBeNull()
expect(importPreview.deleteMany).toHaveBeenCalledWith({
where: { token: 'preview-token' }
})
})
})

View File

@@ -968,6 +968,11 @@ async function exportSubscriptions(format: 'csv' | 'json') {
}
function handleWallosImported() {
queryClient.removeQueries({ queryKey: ['subscriptions'] })
queryClient.removeQueries({ queryKey: ['tags'] })
queryClient.removeQueries({ queryKey: ['statistics-overview'] })
queryClient.removeQueries({ queryKey: ['statistics-budgets'] })
queryClient.removeQueries({ queryKey: ['calendar-events'] })
showWallosImportModal.value = false
message.success('Wallos 数据已导入')
}

View File

@@ -7,7 +7,6 @@ const cwd = process.cwd()
const cliArgs = process.argv.slice(2)
const args = new Set(cliArgs)
const withR2 = args.has('--with-r2')
const withoutKv = args.has('--without-kv')
const skipBuild = args.has('--skip-build')
const configPath = path.resolve(cwd, 'wrangler.jsonc')
const generatedConfigPath = path.resolve(cwd, '.wrangler.generated.jsonc')
@@ -15,7 +14,6 @@ const generatedConfigPath = path.resolve(cwd, '.wrangler.generated.jsonc')
function bindingResourceName(workerName, binding) {
const suffixMap = {
DB: 'db',
SUBTRACKER_CACHE: 'cache',
SUBTRACKER_LOGOS: 'logos'
}
@@ -51,13 +49,9 @@ function resolveBooleanFlag({
return defaultValue
}
function applyOptionalBindings(config, { enableKv, enableR2 }) {
function applyOptionalBindings(config, { enableR2 }) {
const next = { ...config }
if (!enableKv) {
delete next.kv_namespaces
}
if (enableR2) {
return withProvisionedR2Binding(next)
}
@@ -268,13 +262,6 @@ async function buildGeneratedConfig() {
let config = JSON.parse(raw)
const packageJson = JSON.parse(await readFile(path.resolve(cwd, 'package.json'), 'utf8'))
const gitSha = (await run('git', ['rev-parse', '--short', 'HEAD'], { captureOutput: true })).stdout.trim()
const enableKv = withoutKv
? false
: resolveBooleanFlag({
envName: 'ENABLE_KV',
defaultValue: true
})
config.name = resolveWorkerName({
defaultName: config.name || 'subtracker',
cliArgs
@@ -289,13 +276,12 @@ async function buildGeneratedConfig() {
}
config = applyOptionalBindings(config, {
enableKv,
enableR2: withR2
})
config = syncCronTriggers(config)
const inventory = await discoverExistingResources({
includeKv: enableKv
includeKv: false
})
attachExistingResources(config, inventory)

View File

@@ -14,7 +14,6 @@ import {
} from './bootstrap-cloudflare.mjs'
test('bindingResourceName uses worker-prefixed kebab-case names', () => {
assert.equal(bindingResourceName('subtracker', 'SUBTRACKER_CACHE'), 'subtracker-cache')
assert.equal(bindingResourceName('subtracker', 'DB'), 'subtracker-db')
assert.equal(bindingResourceName('subtracker', 'SUBTRACKER_LOGOS'), 'subtracker-logos')
})
@@ -64,22 +63,19 @@ test('resolveAppVersion prefers explicit app version and falls back to package v
)
})
test('attachExistingResources reuses existing kv, d1 and r2 resources', () => {
test('attachExistingResources reuses existing d1 and r2 resources', () => {
const config = {
name: 'subtracker',
kv_namespaces: [{ binding: 'SUBTRACKER_CACHE' }],
d1_databases: [{ binding: 'DB' }],
r2_buckets: [{ binding: 'SUBTRACKER_LOGOS', bucket_name: 'subtracker-logos' }]
}
const next = attachExistingResources(config, {
kv: [{ title: 'subtracker-cache', id: 'kv-123' }],
kv: [],
d1: [{ name: 'subtracker-db', uuid: 'd1-123' }],
r2: []
})
assert.equal(next.kv_namespaces[0].id, 'kv-123')
assert.equal(next.kv_namespaces[0].preview_id, 'kv-123')
assert.equal(next.d1_databases[0].database_id, 'd1-123')
assert.equal(next.d1_databases[0].database_name, 'subtracker-db')
assert.equal(next.r2_buckets[0].bucket_name, 'subtracker-logos')
@@ -88,7 +84,6 @@ test('attachExistingResources reuses existing kv, d1 and r2 resources', () => {
test('attachExistingResources leaves missing resources unresolved for auto provisioning', () => {
const config = {
name: 'subtracker',
kv_namespaces: [{ binding: 'SUBTRACKER_CACHE' }],
d1_databases: [{ binding: 'DB' }]
}
@@ -98,13 +93,11 @@ test('attachExistingResources leaves missing resources unresolved for auto provi
r2: []
})
assert.equal(next.kv_namespaces[0].id, undefined)
assert.equal(next.d1_databases[0].database_id, undefined)
})
test('getInventoryCommands matches wrangler 4 command capabilities', () => {
const commands = getInventoryCommands()
assert.deepEqual(commands.kv, ['npx', ['wrangler', 'kv', 'namespace', 'list']])
assert.deepEqual(commands.d1, ['npx', ['wrangler', 'd1', 'list', '--json']])
})
@@ -144,19 +137,16 @@ test('withProvisionedR2Binding sets deterministic bucket name', () => {
])
})
test('applyOptionalBindings can disable kv while preserving d1', () => {
test('applyOptionalBindings leaves d1 untouched when r2 is disabled', () => {
const config = applyOptionalBindings(
{
kv_namespaces: [{ binding: 'SUBTRACKER_CACHE' }],
d1_databases: [{ binding: 'DB' }]
},
{
enableKv: false,
enableR2: false
}
)
assert.equal(config.kv_namespaces, undefined)
assert.deepEqual(config.d1_databases, [{ binding: 'DB' }])
})

View File

@@ -42,10 +42,5 @@
{
"binding": "DB"
}
],
"kv_namespaces": [
{
"binding": "SUBTRACKER_CACHE"
}
]
}