From 2ee7cb41013d717dc5e35a452394425ba1c5c9dd Mon Sep 17 00:00:00 2001 From: Jason Date: Wed, 29 Apr 2026 09:35:42 +0800 Subject: [PATCH] fix(usage): prevent double-counting between proxy and session-log sources MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Proxy writes and session-log sync wrote to proxy_request_logs with mismatched request_ids: only Claude on a native Anthropic backend used the shared `session:{message_id}` key. Codex/Gemini and Claude-through-OpenAI providers always produced distinct ids, so primary-key dedup never fired and every real request was recorded twice. Adds a 7-dim fingerprint dedup (app_type, 4 token counts, 2xx status, model with case-insensitive match, ±10min window) wired into three layers: - Write path: should_skip_session_insert() blocks duplicate session rows before INSERT, unifying the previously-divergent Claude/Codex/Gemini paths through a single DedupKey-based helper. - Read path: effective_usage_log_filter() excludes already-covered session rows from every aggregation query. - Rollup path: same filter applied so usage_daily_rollups never absorbs duplicates. Also adds a covering index (idx_request_logs_dedup_lookup) so the EXISTS subquery stays index-only, and a transform.rs regression test that pins openai_to_anthropic id preservation - the missing piece that lets Claude+OpenAI-compatible providers reuse the session: id scheme. --- src-tauri/src/database/dao/usage_rollup.rs | 100 ++- src-tauri/src/database/schema.rs | 32 + src-tauri/src/proxy/providers/transform.rs | 28 + src-tauri/src/services/session_usage.rs | 101 ++- src-tauri/src/services/session_usage_codex.rs | 82 ++- .../src/services/session_usage_gemini.rs | 70 +- src-tauri/src/services/usage_stats.rs | 596 ++++++++++++++++-- 7 files changed, 917 insertions(+), 92 deletions(-) diff --git a/src-tauri/src/database/dao/usage_rollup.rs b/src-tauri/src/database/dao/usage_rollup.rs index fffb8a3bf..fc5fdb371 100644 --- a/src-tauri/src/database/dao/usage_rollup.rs +++ b/src-tauri/src/database/dao/usage_rollup.rs @@ -4,6 +4,7 @@ use crate::database::{lock_conn, Database}; use crate::error::AppError; +use crate::services::usage_stats::effective_usage_log_filter; use chrono::{Duration, Local, TimeZone}; /// Compute the rollup/prune cutoff aligned to a local-day boundary. @@ -101,7 +102,8 @@ impl Database { fn do_rollup_and_prune(conn: &rusqlite::Connection, cutoff: i64) -> Result { // Aggregate old logs, merging with any pre-existing rollup rows via LEFT JOIN. - conn.execute( + let effective_filter = effective_usage_log_filter("l"); + let aggregation_sql = format!( "INSERT OR REPLACE INTO usage_daily_rollups (date, app_type, provider_id, model, request_count, success_count, @@ -124,27 +126,30 @@ impl Database { ELSE 0 END FROM ( SELECT - date(created_at, 'unixepoch', 'localtime') as d, - app_type as a, provider_id as p, model as m, + date(l.created_at, 'unixepoch', 'localtime') as d, + l.app_type as a, l.provider_id as p, l.model as m, COUNT(*) as new_req, - SUM(CASE WHEN status_code >= 200 AND status_code < 300 THEN 1 ELSE 0 END) as new_succ, - COALESCE(SUM(input_tokens), 0) as new_in, - COALESCE(SUM(output_tokens), 0) as new_out, - COALESCE(SUM(cache_read_tokens), 0) as new_cr, - COALESCE(SUM(cache_creation_tokens), 0) as new_cc, - COALESCE(SUM(CAST(total_cost_usd AS REAL)), 0) as new_cost, - COALESCE(AVG(latency_ms), 0) as new_lat - FROM proxy_request_logs WHERE created_at < ?1 + SUM(CASE WHEN l.status_code >= 200 AND l.status_code < 300 THEN 1 ELSE 0 END) as new_succ, + COALESCE(SUM(l.input_tokens), 0) as new_in, + COALESCE(SUM(l.output_tokens), 0) as new_out, + COALESCE(SUM(l.cache_read_tokens), 0) as new_cr, + COALESCE(SUM(l.cache_creation_tokens), 0) as new_cc, + COALESCE(SUM(CAST(l.total_cost_usd AS REAL)), 0) as new_cost, + COALESCE(AVG(l.latency_ms), 0) as new_lat + FROM proxy_request_logs l + WHERE l.created_at < ?1 AND {effective_filter} GROUP BY d, a, p, m ) agg LEFT JOIN usage_daily_rollups old ON old.date = agg.d AND old.app_type = agg.a - AND old.provider_id = agg.p AND old.model = agg.m", - [cutoff], - ) - .map_err(|e| AppError::Database(format!("Rollup aggregation failed: {e}")))?; + AND old.provider_id = agg.p AND old.model = agg.m" + ); - // Delete the aggregated detail rows + conn.execute(&aggregation_sql, [cutoff]) + .map_err(|e| AppError::Database(format!("Rollup aggregation failed: {e}")))?; + + // INSERT uses the effective-log filter to exclude duplicate session rows. + // DELETE intentionally prunes all old details so those duplicates are discarded. let deleted = conn .execute( "DELETE FROM proxy_request_logs WHERE created_at < ?1", @@ -254,6 +259,69 @@ mod tests { Ok(()) } + #[test] + fn test_rollup_uses_effective_usage_logs() -> Result<(), AppError> { + let db = Database::memory()?; + let now = chrono::Utc::now().timestamp(); + let old_ts = now - 40 * 86400; + + { + let conn = crate::database::lock_conn!(db.conn); + conn.execute( + "INSERT INTO proxy_request_logs ( + request_id, provider_id, app_type, model, request_model, + input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens, + total_cost_usd, latency_ms, status_code, created_at, data_source + ) VALUES (?1, 'openai', 'codex', 'gpt-5.4', 'gpt-5.4', 100, 20, 10, 0, '0.10', 100, 200, ?2, 'proxy')", + rusqlite::params!["codex-proxy-old", old_ts], + )?; + conn.execute( + "INSERT INTO proxy_request_logs ( + request_id, provider_id, app_type, model, request_model, + input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens, + total_cost_usd, latency_ms, status_code, created_at, data_source + ) VALUES (?1, '_codex_session', 'codex', 'gpt-5.4', 'gpt-5.4', 100, 20, 10, 0, '0.10', 0, 200, ?2, 'codex_session')", + rusqlite::params!["codex-session-old-dup", old_ts + 60], + )?; + } + + let deleted = db.rollup_and_prune(30)?; + assert_eq!(deleted, 2); + + let conn = crate::database::lock_conn!(db.conn); + let mut stmt = conn.prepare( + "SELECT provider_id, request_count, input_tokens, output_tokens, cache_read_tokens + FROM usage_daily_rollups WHERE app_type = 'codex'", + )?; + let rows = stmt + .query_map([], |row| { + Ok(( + row.get::<_, String>(0)?, + row.get::<_, i64>(1)?, + row.get::<_, i64>(2)?, + row.get::<_, i64>(3)?, + row.get::<_, i64>(4)?, + )) + })? + .collect::, _>>()?; + + assert_eq!(rows.len(), 1); + let (provider_id, request_count, input_tokens, output_tokens, cache_read_tokens) = &rows[0]; + assert_eq!(provider_id, "openai"); + assert_eq!(*request_count, 1); + assert_eq!(*input_tokens, 100); + assert_eq!(*output_tokens, 20); + assert_eq!(*cache_read_tokens, 10); + + let remaining: i64 = + conn.query_row("SELECT COUNT(*) FROM proxy_request_logs", [], |row| { + row.get(0) + })?; + assert_eq!(remaining, 0); + + Ok(()) + } + #[test] fn test_rollup_noop_when_no_old_data() -> Result<(), AppError> { let db = Database::memory()?; diff --git a/src-tauri/src/database/schema.rs b/src-tauri/src/database/schema.rs index 61c2f28e6..208f79413 100644 --- a/src-tauri/src/database/schema.rs +++ b/src-tauri/src/database/schema.rs @@ -214,6 +214,7 @@ impl Database { [], ) .map_err(|e| AppError::Database(e.to_string()))?; + Self::create_request_logs_dedup_index_if_supported(conn)?; // 11. Model Pricing 表 conn.execute( @@ -1107,6 +1108,7 @@ impl Database { "data_source", "TEXT NOT NULL DEFAULT 'proxy'", )?; + Self::create_request_logs_dedup_index_if_supported(conn)?; } // 2. 创建会话日志同步状态表 @@ -1908,6 +1910,36 @@ impl Database { Ok(()) } + fn create_request_logs_dedup_index_if_supported(conn: &Connection) -> Result<(), AppError> { + if !Self::table_exists(conn, "proxy_request_logs")? { + return Ok(()); + } + + let required_columns = [ + "app_type", + "data_source", + "input_tokens", + "output_tokens", + "cache_read_tokens", + "created_at", + "cache_creation_tokens", + ]; + for column in required_columns { + if !Self::has_column(conn, "proxy_request_logs", column)? { + return Ok(()); + } + } + + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_request_logs_dedup_lookup + ON proxy_request_logs(app_type, data_source, input_tokens, output_tokens, + cache_read_tokens, created_at, cache_creation_tokens)", + [], + ) + .map_err(|e| AppError::Database(format!("创建使用量去重索引失败: {e}")))?; + Ok(()) + } + fn validate_identifier(s: &str, kind: &str) -> Result<(), AppError> { if s.is_empty() { return Err(AppError::Database(format!("{kind} 不能为空"))); diff --git a/src-tauri/src/proxy/providers/transform.rs b/src-tauri/src/proxy/providers/transform.rs index 5e43a1869..d4eafd243 100644 --- a/src-tauri/src/proxy/providers/transform.rs +++ b/src-tauri/src/proxy/providers/transform.rs @@ -869,6 +869,34 @@ mod tests { assert_eq!(result["usage"]["output_tokens"], 5); } + #[test] + fn test_openai_to_anthropic_preserves_id_for_usage_dedup() { + let input = json!({ + "id": "chatcmpl-claude-compatible", + "object": "chat.completion", + "model": "claude-sonnet-4-5", + "choices": [{ + "index": 0, + "message": {"role": "assistant", "content": "Hello!"}, + "finish_reason": "stop" + }], + "usage": {"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15} + }); + + let result = openai_to_anthropic(input).unwrap(); + let usage = crate::proxy::usage::parser::TokenUsage::from_claude_response(&result) + .expect("converted Anthropic response should parse usage"); + + assert_eq!( + usage.message_id.as_deref(), + Some("chatcmpl-claude-compatible") + ); + assert_eq!( + usage.dedup_request_id(), + "session:chatcmpl-claude-compatible" + ); + } + #[test] fn test_openai_to_anthropic_with_tool_calls() { let input = json!({ diff --git a/src-tauri/src/services/session_usage.rs b/src-tauri/src/services/session_usage.rs index 2b9c5af9f..902b78cbc 100644 --- a/src-tauri/src/services/session_usage.rs +++ b/src-tauri/src/services/session_usage.rs @@ -13,6 +13,9 @@ use crate::database::{lock_conn, Database}; use crate::error::AppError; use crate::proxy::usage::calculator::{CostCalculator, ModelPricing}; use crate::proxy::usage::parser::TokenUsage; +use crate::services::usage_stats::{ + effective_usage_log_filter, should_skip_session_insert, DedupKey, +}; use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -346,25 +349,10 @@ fn insert_session_log_entry( ) -> Result { let conn = lock_conn!(db.conn); - // 检查是否已存在 - let exists: bool = conn - .query_row( - "SELECT COUNT(*) FROM proxy_request_logs WHERE request_id = ?1", - rusqlite::params![request_id], - |row| row.get::<_, i64>(0).map(|c| c > 0), - ) - .unwrap_or(false); - - if exists { - return Ok(false); - } - - // 解析时间戳 let created_at = msg .timestamp .as_ref() .and_then(|ts| { - // 尝试解析 ISO 8601 时间戳 chrono::DateTime::parse_from_rfc3339(ts) .ok() .map(|dt| dt.timestamp()) @@ -376,6 +364,19 @@ fn insert_session_log_entry( .unwrap_or(0) }); + let dedup_key = DedupKey { + app_type: "claude", + model: &msg.model, + input_tokens: msg.input_tokens, + output_tokens: msg.output_tokens, + cache_read_tokens: msg.cache_read_tokens, + cache_creation_tokens: msg.cache_creation_tokens, + created_at, + }; + if should_skip_session_insert(&conn, request_id, &dedup_key)? { + return Ok(false); + } + // 计算费用 let usage = TokenUsage { input_tokens: msg.input_tokens, @@ -531,13 +532,17 @@ fn try_find_pricing( pub fn get_data_source_breakdown(db: &Database) -> Result, AppError> { let conn = lock_conn!(db.conn); - let mut stmt = conn.prepare( - "SELECT COALESCE(data_source, 'proxy') as ds, COUNT(*) as cnt, - COALESCE(SUM(CAST(total_cost_usd AS REAL)), 0) as cost - FROM proxy_request_logs + let effective_filter = effective_usage_log_filter("l"); + let sql = format!( + "SELECT COALESCE(l.data_source, 'proxy') as ds, COUNT(*) as cnt, + COALESCE(SUM(CAST(l.total_cost_usd AS REAL)), 0) as cost + FROM proxy_request_logs l + WHERE {effective_filter} GROUP BY ds - ORDER BY cnt DESC", - )?; + ORDER BY cnt DESC" + ); + + let mut stmt = conn.prepare(&sql)?; let rows = stmt.query_map([], |row| { Ok(DataSourceSummary { @@ -636,4 +641,58 @@ mod tests { messages.insert("msg_1".to_string(), final_entry); assert_eq!(messages.get("msg_1").unwrap().output_tokens, 1349); } + + #[test] + fn test_insert_claude_session_skips_matching_proxy_log() -> Result<(), AppError> { + let db = Database::memory()?; + { + let conn = lock_conn!(db.conn); + conn.execute( + "INSERT INTO proxy_request_logs ( + request_id, provider_id, app_type, model, request_model, + input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens, + total_cost_usd, latency_ms, status_code, created_at, data_source + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + rusqlite::params![ + "proxy-different-id", + "openai-compatible", + "claude", + "claude-sonnet-4-5", + "claude-sonnet-4-5", + 100, + 20, + 10, + 5, + "0.10", + 100, + 200, + 1000, + "proxy" + ], + )?; + } + + let msg = ParsedAssistantUsage { + message_id: "msg_1".to_string(), + model: "claude-sonnet-4-5".to_string(), + input_tokens: 100, + output_tokens: 20, + cache_read_tokens: 10, + cache_creation_tokens: 5, + stop_reason: Some("end_turn".to_string()), + timestamp: Some("1970-01-01T00:16:45Z".to_string()), + session_id: Some("session-1".to_string()), + }; + + let inserted = insert_session_log_entry(&db, "session:msg_1", &msg)?; + assert!(!inserted); + + let conn = lock_conn!(db.conn); + let count: i64 = conn.query_row("SELECT COUNT(*) FROM proxy_request_logs", [], |row| { + row.get(0) + })?; + assert_eq!(count, 1); + + Ok(()) + } } diff --git a/src-tauri/src/services/session_usage_codex.rs b/src-tauri/src/services/session_usage_codex.rs index 8360c7ecf..b666634ff 100644 --- a/src-tauri/src/services/session_usage_codex.rs +++ b/src-tauri/src/services/session_usage_codex.rs @@ -19,6 +19,7 @@ use crate::error::AppError; use crate::proxy::usage::calculator::{CostCalculator, ModelPricing}; use crate::proxy::usage::parser::TokenUsage; use crate::services::session_usage::SessionSyncResult; +use crate::services::usage_stats::{should_skip_session_insert, DedupKey}; use rust_decimal::Decimal; use std::fs; use std::io::{BufRead, BufReader}; @@ -438,20 +439,6 @@ fn insert_codex_session_entry( ) -> Result { let conn = lock_conn!(db.conn); - // 检查是否已存在 - let exists: bool = conn - .query_row( - "SELECT COUNT(*) FROM proxy_request_logs WHERE request_id = ?1", - rusqlite::params![request_id], - |row| row.get::<_, i64>(0).map(|c| c > 0), - ) - .unwrap_or(false); - - if exists { - return Ok(false); - } - - // 解析时间戳 let created_at = timestamp .and_then(|ts| { chrono::DateTime::parse_from_rfc3339(ts) @@ -465,6 +452,19 @@ fn insert_codex_session_entry( .unwrap_or(0) }); + let dedup_key = DedupKey { + app_type: "codex", + model, + input_tokens: delta.input, + output_tokens: delta.output, + cache_read_tokens: delta.cached_input, + cache_creation_tokens: 0, + created_at, + }; + if should_skip_session_insert(&conn, request_id, &dedup_key)? { + return Ok(false); + } + // 计算费用 let usage = TokenUsage { input_tokens: delta.input, @@ -733,6 +733,60 @@ mod tests { assert!(files.is_empty()); } + #[test] + fn test_insert_codex_session_skips_matching_proxy_log() -> Result<(), AppError> { + let db = Database::memory()?; + { + let conn = lock_conn!(db.conn); + conn.execute( + "INSERT INTO proxy_request_logs ( + request_id, provider_id, app_type, model, request_model, + input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens, + total_cost_usd, latency_ms, status_code, created_at, data_source + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + rusqlite::params![ + "codex-proxy", + "openai", + "codex", + "gpt-5.4", + "gpt-5.4", + 10, + 2, + 1, + 7, + "0.01", + 100, + 200, + 1000, + "proxy" + ], + )?; + } + + let delta = DeltaTokens { + input: 10, + cached_input: 1, + output: 2, + }; + let inserted = insert_codex_session_entry( + &db, + "codex-session-dup", + &delta, + "gpt-5.4", + Some("session-1"), + Some("1970-01-01T00:16:45Z"), + )?; + assert!(!inserted); + + let conn = lock_conn!(db.conn); + let count: i64 = conn.query_row("SELECT COUNT(*) FROM proxy_request_logs", [], |row| { + row.get(0) + })?; + assert_eq!(count, 1); + + Ok(()) + } + // ── 模型名归一化测试 ── #[test] diff --git a/src-tauri/src/services/session_usage_gemini.rs b/src-tauri/src/services/session_usage_gemini.rs index 24ff25c37..e0049f91b 100644 --- a/src-tauri/src/services/session_usage_gemini.rs +++ b/src-tauri/src/services/session_usage_gemini.rs @@ -19,6 +19,7 @@ use crate::gemini_config::get_gemini_dir; use crate::proxy::usage::calculator::{CostCalculator, ModelPricing}; use crate::proxy::usage::parser::TokenUsage; use crate::services::session_usage::SessionSyncResult; +use crate::services::usage_stats::{should_skip_session_insert, DedupKey}; use rust_decimal::Decimal; use std::fs; use std::path::{Path, PathBuf}; @@ -237,7 +238,6 @@ fn insert_gemini_session_entry( ) -> Result { let conn = lock_conn!(db.conn); - // 解析时间戳 let created_at = timestamp .and_then(|ts| { chrono::DateTime::parse_from_rfc3339(ts) @@ -254,6 +254,19 @@ fn insert_gemini_session_entry( // 合并 thoughts 到 output(思考 token 按输出计费) let output_tokens = tokens.output + tokens.thoughts; + let dedup_key = DedupKey { + app_type: "gemini", + model, + input_tokens: tokens.input, + output_tokens, + cache_read_tokens: tokens.cached, + cache_creation_tokens: 0, + created_at, + }; + if should_skip_session_insert(&conn, request_id, &dedup_key)? { + return Ok(false); + } + // 计算费用 let usage = TokenUsage { input_tokens: tokens.input, @@ -433,6 +446,61 @@ mod tests { assert!(files.is_empty()); } + #[test] + fn test_insert_gemini_session_skips_matching_proxy_log() -> Result<(), AppError> { + let db = Database::memory()?; + { + let conn = lock_conn!(db.conn); + conn.execute( + "INSERT INTO proxy_request_logs ( + request_id, provider_id, app_type, model, request_model, + input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens, + total_cost_usd, latency_ms, status_code, created_at, data_source + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + rusqlite::params![ + "gemini-proxy", + "google", + "gemini", + "gemini-2.5-pro", + "gemini-2.5-pro", + 10, + 7, + 1, + 0, + "0.01", + 100, + 200, + 1000, + "proxy" + ], + )?; + } + + let tokens = GeminiTokens { + input: 10, + output: 2, + cached: 1, + thoughts: 5, + }; + let inserted = insert_gemini_session_entry( + &db, + "gemini-session-dup", + &tokens, + "gemini-2.5-pro", + Some("session-1"), + Some("1970-01-01T00:16:45Z"), + )?; + assert!(!inserted); + + let conn = lock_conn!(db.conn); + let count: i64 = conn.query_row("SELECT COUNT(*) FROM proxy_request_logs", [], |row| { + row.get(0) + })?; + assert_eq!(count, 1); + + Ok(()) + } + #[test] fn test_parse_gemini_tokens() { let json: serde_json::Value = serde_json::json!({ diff --git a/src-tauri/src/services/usage_stats.rs b/src-tauri/src/services/usage_stats.rs index 706fd9f09..9faef593b 100644 --- a/src-tauri/src/services/usage_stats.rs +++ b/src-tauri/src/services/usage_stats.rs @@ -130,6 +130,123 @@ fn provider_name_coalesce(log_alias: &str, provider_alias: &str) -> String { ) } +pub(crate) const SESSION_PROXY_DEDUP_WINDOW_SECONDS: i64 = 10 * 60; + +pub(crate) fn effective_usage_log_filter(log_alias: &str) -> String { + format!( + "NOT ( + {log_alias}.data_source IN ('session_log', 'codex_session', 'gemini_session') + AND EXISTS ( + SELECT 1 + FROM proxy_request_logs proxy_dedup + WHERE proxy_dedup.data_source = 'proxy' + AND proxy_dedup.app_type = {log_alias}.app_type + AND proxy_dedup.status_code >= 200 + AND proxy_dedup.status_code < 300 + AND proxy_dedup.input_tokens = {log_alias}.input_tokens + AND proxy_dedup.output_tokens = {log_alias}.output_tokens + AND proxy_dedup.cache_read_tokens = {log_alias}.cache_read_tokens + AND ( + proxy_dedup.cache_creation_tokens = {log_alias}.cache_creation_tokens + OR ( + {log_alias}.cache_creation_tokens = 0 + AND {log_alias}.data_source IN ('codex_session', 'gemini_session') + ) + ) + AND proxy_dedup.created_at BETWEEN + {log_alias}.created_at - {SESSION_PROXY_DEDUP_WINDOW_SECONDS} + AND {log_alias}.created_at + {SESSION_PROXY_DEDUP_WINDOW_SECONDS} + AND ( + LOWER(proxy_dedup.model) = LOWER({log_alias}.model) + OR LOWER(proxy_dedup.model) = 'unknown' + OR LOWER({log_alias}.model) = 'unknown' + ) + ) + )" + ) +} + +/// 跨源去重指纹键。 +/// +/// `cache_creation_tokens`:Codex/Gemini session 日志不暴露该字段,调用方传 0 +/// 表示"未知",匹配器会放行 proxy 侧任意 cache_creation_tokens 值。 +#[derive(Debug, Clone, Copy)] +pub(crate) struct DedupKey<'a> { + pub app_type: &'a str, + pub model: &'a str, + pub input_tokens: u32, + pub output_tokens: u32, + pub cache_read_tokens: u32, + pub cache_creation_tokens: u32, + pub created_at: i64, +} + +/// session 日志写入前的统一去重判定。 +/// +/// 命中以下任一条件即跳过插入:① `request_id` 已存在;② 时间窗口内存在 +/// 与 `key` 匹配的 proxy 日志(指纹去重)。 +pub(crate) fn should_skip_session_insert( + conn: &Connection, + request_id: &str, + key: &DedupKey, +) -> Result { + if proxy_request_id_exists(conn, request_id)? { + return Ok(true); + } + has_matching_proxy_usage_log(conn, key) +} + +fn proxy_request_id_exists(conn: &Connection, request_id: &str) -> Result { + conn.query_row( + "SELECT EXISTS(SELECT 1 FROM proxy_request_logs WHERE request_id = ?1)", + params![request_id], + |row| row.get::<_, bool>(0), + ) + .map_err(|e| AppError::Database(format!("查询 request_id 失败: {e}"))) +} + +pub(crate) fn has_matching_proxy_usage_log( + conn: &Connection, + key: &DedupKey, +) -> Result { + let allow_missing_cache_creation = + matches!(key.app_type, "codex" | "gemini") && key.cache_creation_tokens == 0; + + conn.query_row( + "SELECT EXISTS ( + SELECT 1 + FROM proxy_request_logs l + WHERE l.data_source = 'proxy' + AND l.app_type = ?1 + AND l.status_code >= 200 + AND l.status_code < 300 + AND l.input_tokens = ?3 + AND l.output_tokens = ?4 + AND l.cache_read_tokens = ?5 + AND (l.cache_creation_tokens = ?6 OR ?9 = 1) + AND l.created_at BETWEEN ?7 - ?8 AND ?7 + ?8 + AND ( + LOWER(l.model) = LOWER(?2) + OR LOWER(l.model) = 'unknown' + OR LOWER(?2) = 'unknown' + ) + )", + params![ + key.app_type, + key.model, + key.input_tokens as i64, + key.output_tokens as i64, + key.cache_read_tokens as i64, + key.cache_creation_tokens as i64, + key.created_at, + SESSION_PROXY_DEDUP_WINDOW_SECONDS, + allow_missing_cache_creation as i64, + ], + |row| row.get::<_, bool>(0), + ) + .map_err(|e| AppError::Database(format!("查询重复代理用量日志失败: {e}"))) +} + #[derive(Debug, Clone, Default)] struct RollupDateBounds { start: Option, @@ -231,19 +348,19 @@ impl Database { let conn = lock_conn!(self.conn); // Build detail WHERE clause - let mut conditions = Vec::new(); + let mut conditions = vec![effective_usage_log_filter("l")]; let mut params_vec: Vec> = Vec::new(); if let Some(start) = start_date { - conditions.push("created_at >= ?"); + conditions.push("l.created_at >= ?".to_string()); params_vec.push(Box::new(start)); } if let Some(end) = end_date { - conditions.push("created_at <= ?"); + conditions.push("l.created_at <= ?".to_string()); params_vec.push(Box::new(end)); } if let Some(at) = app_type { - conditions.push("app_type = ?"); + conditions.push("l.app_type = ?".to_string()); params_vec.push(Box::new(at.to_string())); } @@ -293,7 +410,7 @@ impl Database { COALESCE(SUM(cache_creation_tokens), 0) as total_cache_creation_tokens, COALESCE(SUM(cache_read_tokens), 0) as total_cache_read_tokens, COALESCE(SUM(CASE WHEN status_code >= 200 AND status_code < 300 THEN 1 ELSE 0 END), 0) as success_count - FROM proxy_request_logs {where_clause}) d, + FROM proxy_request_logs l {where_clause}) d, (SELECT COALESCE(SUM(request_count), 0) as total_requests, COALESCE(SUM(CAST(total_cost_usd AS REAL)), 0) as total_cost, @@ -369,23 +486,25 @@ impl Database { } let app_type_filter = if app_type.is_some() { - "AND app_type = ?4" + "AND l.app_type = ?4" } else { "" }; + let effective_filter = effective_usage_log_filter("l"); let sql = format!( "SELECT - CAST((created_at - ?1) / ?3 AS INTEGER) as bucket_idx, + CAST((l.created_at - ?1) / ?3 AS INTEGER) as bucket_idx, COUNT(*) as request_count, - COALESCE(SUM(CAST(total_cost_usd AS REAL)), 0) as total_cost, - COALESCE(SUM(input_tokens + output_tokens), 0) as total_tokens, - COALESCE(SUM(input_tokens), 0) as total_input_tokens, - COALESCE(SUM(output_tokens), 0) as total_output_tokens, - COALESCE(SUM(cache_creation_tokens), 0) as total_cache_creation_tokens, - COALESCE(SUM(cache_read_tokens), 0) as total_cache_read_tokens - FROM proxy_request_logs - WHERE created_at >= ?1 AND created_at <= ?2 {app_type_filter} + COALESCE(SUM(CAST(l.total_cost_usd AS REAL)), 0) as total_cost, + COALESCE(SUM(l.input_tokens + l.output_tokens), 0) as total_tokens, + COALESCE(SUM(l.input_tokens), 0) as total_input_tokens, + COALESCE(SUM(l.output_tokens), 0) as total_output_tokens, + COALESCE(SUM(l.cache_creation_tokens), 0) as total_cache_creation_tokens, + COALESCE(SUM(l.cache_read_tokens), 0) as total_cache_read_tokens + FROM proxy_request_logs l + WHERE l.created_at >= ?1 AND l.created_at <= ?2 + AND {effective_filter} {app_type_filter} GROUP BY bucket_idx ORDER BY bucket_idx ASC" ); @@ -456,23 +575,25 @@ impl Database { let bucket_count = (end_day.signed_duration_since(start_day).num_days() + 1) as usize; let app_type_filter = if app_type.is_some() { - "AND app_type = ?3" + "AND l.app_type = ?3" } else { "" }; + let effective_filter = effective_usage_log_filter("l"); let detail_sql = format!( "SELECT - date(created_at, 'unixepoch', 'localtime') as bucket_date, + date(l.created_at, 'unixepoch', 'localtime') as bucket_date, COUNT(*) as request_count, - COALESCE(SUM(CAST(total_cost_usd AS REAL)), 0) as total_cost, - COALESCE(SUM(input_tokens + output_tokens), 0) as total_tokens, - COALESCE(SUM(input_tokens), 0) as total_input_tokens, - COALESCE(SUM(output_tokens), 0) as total_output_tokens, - COALESCE(SUM(cache_creation_tokens), 0) as total_cache_creation_tokens, - COALESCE(SUM(cache_read_tokens), 0) as total_cache_read_tokens - FROM proxy_request_logs - WHERE created_at >= ?1 AND created_at <= ?2 {app_type_filter} + COALESCE(SUM(CAST(l.total_cost_usd AS REAL)), 0) as total_cost, + COALESCE(SUM(l.input_tokens + l.output_tokens), 0) as total_tokens, + COALESCE(SUM(l.input_tokens), 0) as total_input_tokens, + COALESCE(SUM(l.output_tokens), 0) as total_output_tokens, + COALESCE(SUM(l.cache_creation_tokens), 0) as total_cache_creation_tokens, + COALESCE(SUM(l.cache_read_tokens), 0) as total_cache_read_tokens + FROM proxy_request_logs l + WHERE l.created_at >= ?1 AND l.created_at <= ?2 + AND {effective_filter} {app_type_filter} GROUP BY bucket_date ORDER BY bucket_date ASC" ); @@ -623,18 +744,18 @@ impl Database { ) -> Result, AppError> { let conn = lock_conn!(self.conn); - let mut detail_conditions = Vec::new(); + let mut detail_conditions = vec![effective_usage_log_filter("l")]; let mut detail_params: Vec> = Vec::new(); if let Some(start) = start_date { - detail_conditions.push("l.created_at >= ?"); + detail_conditions.push("l.created_at >= ?".to_string()); detail_params.push(Box::new(start)); } if let Some(end) = end_date { - detail_conditions.push("l.created_at <= ?"); + detail_conditions.push("l.created_at <= ?".to_string()); detail_params.push(Box::new(end)); } if let Some(at) = app_type { - detail_conditions.push("l.app_type = ?"); + detail_conditions.push("l.app_type = ?".to_string()); detail_params.push(Box::new(at.to_string())); } let detail_where = if detail_conditions.is_empty() { @@ -747,18 +868,18 @@ impl Database { ) -> Result, AppError> { let conn = lock_conn!(self.conn); - let mut detail_conditions = Vec::new(); + let mut detail_conditions = vec![effective_usage_log_filter("l")]; let mut detail_params: Vec> = Vec::new(); if let Some(start) = start_date { - detail_conditions.push("l.created_at >= ?"); + detail_conditions.push("l.created_at >= ?".to_string()); detail_params.push(Box::new(start)); } if let Some(end) = end_date { - detail_conditions.push("l.created_at <= ?"); + detail_conditions.push("l.created_at <= ?".to_string()); detail_params.push(Box::new(end)); } if let Some(at) = app_type { - detail_conditions.push("l.app_type = ?"); + detail_conditions.push("l.app_type = ?".to_string()); detail_params.push(Box::new(at.to_string())); } let detail_where = if detail_conditions.is_empty() { @@ -855,31 +976,31 @@ impl Database { ) -> Result { let conn = lock_conn!(self.conn); - let mut conditions = Vec::new(); + let mut conditions = vec![effective_usage_log_filter("l")]; let mut params: Vec> = Vec::new(); if let Some(ref app_type) = filters.app_type { - conditions.push("l.app_type = ?"); + conditions.push("l.app_type = ?".to_string()); params.push(Box::new(app_type.clone())); } if let Some(ref provider_name) = filters.provider_name { - conditions.push("p.name LIKE ?"); + conditions.push("p.name LIKE ?".to_string()); params.push(Box::new(format!("%{provider_name}%"))); } if let Some(ref model) = filters.model { - conditions.push("l.model LIKE ?"); + conditions.push("l.model LIKE ?".to_string()); params.push(Box::new(format!("%{model}%"))); } if let Some(status) = filters.status_code { - conditions.push("l.status_code = ?"); + conditions.push("l.status_code = ?".to_string()); params.push(Box::new(status as i64)); } if let Some(start) = filters.start_date { - conditions.push("l.created_at >= ?"); + conditions.push("l.created_at >= ?".to_string()); params.push(Box::new(start)); } if let Some(end) = filters.end_date { - conditions.push("l.created_at <= ?"); + conditions.push("l.created_at <= ?".to_string()); params.push(Box::new(end)); } @@ -1351,6 +1472,48 @@ mod tests { } } + #[allow(clippy::too_many_arguments)] + fn insert_usage_log( + conn: &Connection, + request_id: &str, + app_type: &str, + provider_id: &str, + model: &str, + data_source: &str, + created_at: i64, + input_tokens: i64, + output_tokens: i64, + cache_read_tokens: i64, + cache_creation_tokens: i64, + status_code: i64, + total_cost_usd: &str, + ) -> Result<(), AppError> { + conn.execute( + "INSERT INTO proxy_request_logs ( + request_id, provider_id, app_type, model, request_model, + input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens, + input_cost_usd, output_cost_usd, cache_read_cost_usd, cache_creation_cost_usd, + total_cost_usd, latency_ms, status_code, created_at, data_source + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, '0', '0', '0', '0', ?, 100, ?, ?, ?)", + params![ + request_id, + provider_id, + app_type, + model, + model, + input_tokens, + output_tokens, + cache_read_tokens, + cache_creation_tokens, + total_cost_usd, + status_code, + created_at, + data_source + ], + )?; + Ok(()) + } + #[test] fn test_get_usage_summary() -> Result<(), AppError> { let db = Database::memory()?; @@ -1525,6 +1688,359 @@ mod tests { Ok(()) } + #[test] + fn test_effective_usage_dedup_prefers_proxy_for_session_sources() -> Result<(), AppError> { + let db = Database::memory()?; + + { + let conn = lock_conn!(db.conn); + insert_usage_log( + &conn, + "codex-proxy", + "codex", + "openai", + "GPT-5.4", + "proxy", + 10_000, + 100, + 20, + 10, + 7, + 200, + "0.10", + )?; + insert_usage_log( + &conn, + "codex-session-dup", + "codex", + "_codex_session", + "gpt-5.4", + "codex_session", + 10_060, + 100, + 20, + 10, + 0, + 200, + "0.10", + )?; + insert_usage_log( + &conn, + "claude-proxy", + "claude", + "openai-compatible", + "claude-sonnet-4-5", + "proxy", + 25_000, + 300, + 60, + 20, + 5, + 200, + "0.30", + )?; + insert_usage_log( + &conn, + "claude-session-dup", + "claude", + "_session", + "claude-sonnet-4-5", + "session_log", + 25_060, + 300, + 60, + 20, + 5, + 200, + "0.30", + )?; + insert_usage_log( + &conn, + "gemini-proxy", + "gemini", + "google", + "gemini-2.5-pro", + "proxy", + 20_000, + 200, + 40, + 30, + 0, + 200, + "0.20", + )?; + insert_usage_log( + &conn, + "gemini-session-dup", + "gemini", + "_gemini_session", + "gemini-2.5-pro", + "gemini_session", + 20_060, + 200, + 40, + 30, + 0, + 200, + "0.20", + )?; + insert_usage_log( + &conn, + "codex-session-only", + "codex", + "_codex_session", + "gpt-5.4", + "codex_session", + 30_000, + 50, + 5, + 0, + 0, + 200, + "0.02", + )?; + } + + let summary = db.get_usage_summary(None, None, None)?; + assert_eq!(summary.total_requests, 4); + assert_eq!(summary.total_input_tokens, 650); + assert_eq!(summary.total_output_tokens, 125); + assert_eq!(summary.total_cache_read_tokens, 60); + assert_eq!(summary.total_cache_creation_tokens, 12); + + let trends = db.get_daily_trends(Some(0), Some(40_000), None)?; + assert_eq!(trends.iter().map(|stat| stat.request_count).sum::(), 4); + + let provider_stats = db.get_provider_stats(None, None, None)?; + assert_eq!( + provider_stats + .iter() + .map(|stat| stat.request_count) + .sum::(), + 4 + ); + assert!(provider_stats + .iter() + .any(|stat| stat.provider_id == "_codex_session" && stat.request_count == 1)); + assert!(!provider_stats + .iter() + .any(|stat| stat.provider_id == "_gemini_session")); + assert!(!provider_stats + .iter() + .any(|stat| stat.provider_id == "_session")); + + let model_stats = db.get_model_stats(None, None, None)?; + assert_eq!( + model_stats + .iter() + .map(|stat| stat.request_count) + .sum::(), + 4 + ); + + let logs = db.get_request_logs(&LogFilters::default(), 0, 10)?; + let request_ids: Vec<&str> = logs + .data + .iter() + .map(|log| log.request_id.as_str()) + .collect(); + assert_eq!(logs.total, 4); + assert!(request_ids.contains(&"codex-proxy")); + assert!(request_ids.contains(&"claude-proxy")); + assert!(request_ids.contains(&"gemini-proxy")); + assert!(request_ids.contains(&"codex-session-only")); + assert!(!request_ids.contains(&"codex-session-dup")); + assert!(!request_ids.contains(&"claude-session-dup")); + assert!(!request_ids.contains(&"gemini-session-dup")); + + let breakdown = crate::services::session_usage::get_data_source_breakdown(&db)?; + let proxy_count = breakdown + .iter() + .find(|item| item.data_source == "proxy") + .map(|item| item.request_count); + let codex_session_count = breakdown + .iter() + .find(|item| item.data_source == "codex_session") + .map(|item| item.request_count); + let gemini_session_count = breakdown + .iter() + .find(|item| item.data_source == "gemini_session") + .map(|item| item.request_count); + let session_log_count = breakdown + .iter() + .find(|item| item.data_source == "session_log") + .map(|item| item.request_count); + assert_eq!(proxy_count, Some(3)); + assert_eq!(codex_session_count, Some(1)); + assert_eq!(gemini_session_count, None); + assert_eq!(session_log_count, None); + + Ok(()) + } + + #[test] + fn test_effective_usage_dedup_keeps_non_matching_session_rows() -> Result<(), AppError> { + let db = Database::memory()?; + + { + let conn = lock_conn!(db.conn); + insert_usage_log( + &conn, + "proxy-base", + "codex", + "openai", + "gpt-5.4", + "proxy", + 10_000, + 100, + 20, + 10, + 0, + 200, + "0.10", + )?; + insert_usage_log( + &conn, + "session-outside-window", + "codex", + "_codex_session", + "gpt-5.4", + "codex_session", + 10_601, + 100, + 20, + 10, + 0, + 200, + "0.10", + )?; + insert_usage_log( + &conn, + "session-token-mismatch", + "codex", + "_codex_session", + "gpt-5.4", + "codex_session", + 10_060, + 101, + 20, + 10, + 0, + 200, + "0.10", + )?; + insert_usage_log( + &conn, + "session-app-mismatch", + "gemini", + "_gemini_session", + "gpt-5.4", + "gemini_session", + 10_060, + 100, + 20, + 10, + 0, + 200, + "0.10", + )?; + insert_usage_log( + &conn, + "session-model-mismatch", + "codex", + "_codex_session", + "different-model", + "codex_session", + 10_060, + 100, + 20, + 10, + 0, + 200, + "0.10", + )?; + insert_usage_log( + &conn, + "proxy-error", + "codex", + "openai", + "gpt-5.4", + "proxy", + 20_000, + 300, + 60, + 0, + 0, + 500, + "0.00", + )?; + insert_usage_log( + &conn, + "session-matches-error-proxy", + "codex", + "_codex_session", + "gpt-5.4", + "codex_session", + 20_060, + 300, + 60, + 0, + 0, + 200, + "0.30", + )?; + insert_usage_log( + &conn, + "claude-proxy-cache-creation", + "claude", + "anthropic", + "claude-sonnet-4-5", + "proxy", + 30_000, + 100, + 20, + 10, + 5, + 200, + "0.10", + )?; + insert_usage_log( + &conn, + "claude-session-cache-creation-mismatch", + "claude", + "_session", + "claude-sonnet-4-5", + "session_log", + 30_060, + 100, + 20, + 10, + 0, + 200, + "0.10", + )?; + } + + let summary = db.get_usage_summary(None, None, None)?; + assert_eq!(summary.total_requests, 9); + + let logs = db.get_request_logs(&LogFilters::default(), 0, 10)?; + let request_ids: Vec<&str> = logs + .data + .iter() + .map(|log| log.request_id.as_str()) + .collect(); + assert_eq!(logs.total, 9); + assert!(request_ids.contains(&"session-outside-window")); + assert!(request_ids.contains(&"session-token-mismatch")); + assert!(request_ids.contains(&"session-app-mismatch")); + assert!(request_ids.contains(&"session-model-mismatch")); + assert!(request_ids.contains(&"session-matches-error-proxy")); + assert!(request_ids.contains(&"claude-session-cache-creation-mismatch")); + + Ok(()) + } + #[test] fn test_get_model_stats() -> Result<(), AppError> { let db = Database::memory()?;