fix(usage): prevent double-counting between proxy and session-log sources

Proxy writes and session-log sync wrote to proxy_request_logs with
mismatched request_ids: only Claude on a native Anthropic backend used the
shared `session:{message_id}` key. Codex/Gemini and Claude-through-OpenAI
providers always produced distinct ids, so primary-key dedup never fired
and every real request was recorded twice.

Adds a 7-dim fingerprint dedup (app_type, 4 token counts, 2xx status,
model with case-insensitive match, ±10min window) wired into three layers:

- Write path: should_skip_session_insert() blocks duplicate session rows
  before INSERT, unifying the previously-divergent Claude/Codex/Gemini
  paths through a single DedupKey-based helper.
- Read path: effective_usage_log_filter() excludes already-covered session
  rows from every aggregation query.
- Rollup path: same filter applied so usage_daily_rollups never absorbs
  duplicates.

Also adds a covering index (idx_request_logs_dedup_lookup) so the EXISTS
subquery stays index-only, and a transform.rs regression test that pins
openai_to_anthropic id preservation - the missing piece that lets
Claude+OpenAI-compatible providers reuse the session: id scheme.
This commit is contained in:
Jason
2026-04-29 09:35:42 +08:00
parent bcf8434c1f
commit 2ee7cb4101
7 changed files with 917 additions and 92 deletions

View File

@@ -4,6 +4,7 @@
use crate::database::{lock_conn, Database};
use crate::error::AppError;
use crate::services::usage_stats::effective_usage_log_filter;
use chrono::{Duration, Local, TimeZone};
/// Compute the rollup/prune cutoff aligned to a local-day boundary.
@@ -101,7 +102,8 @@ impl Database {
fn do_rollup_and_prune(conn: &rusqlite::Connection, cutoff: i64) -> Result<u64, AppError> {
// Aggregate old logs, merging with any pre-existing rollup rows via LEFT JOIN.
conn.execute(
let effective_filter = effective_usage_log_filter("l");
let aggregation_sql = format!(
"INSERT OR REPLACE INTO usage_daily_rollups
(date, app_type, provider_id, model,
request_count, success_count,
@@ -124,27 +126,30 @@ impl Database {
ELSE 0 END
FROM (
SELECT
date(created_at, 'unixepoch', 'localtime') as d,
app_type as a, provider_id as p, model as m,
date(l.created_at, 'unixepoch', 'localtime') as d,
l.app_type as a, l.provider_id as p, l.model as m,
COUNT(*) as new_req,
SUM(CASE WHEN status_code >= 200 AND status_code < 300 THEN 1 ELSE 0 END) as new_succ,
COALESCE(SUM(input_tokens), 0) as new_in,
COALESCE(SUM(output_tokens), 0) as new_out,
COALESCE(SUM(cache_read_tokens), 0) as new_cr,
COALESCE(SUM(cache_creation_tokens), 0) as new_cc,
COALESCE(SUM(CAST(total_cost_usd AS REAL)), 0) as new_cost,
COALESCE(AVG(latency_ms), 0) as new_lat
FROM proxy_request_logs WHERE created_at < ?1
SUM(CASE WHEN l.status_code >= 200 AND l.status_code < 300 THEN 1 ELSE 0 END) as new_succ,
COALESCE(SUM(l.input_tokens), 0) as new_in,
COALESCE(SUM(l.output_tokens), 0) as new_out,
COALESCE(SUM(l.cache_read_tokens), 0) as new_cr,
COALESCE(SUM(l.cache_creation_tokens), 0) as new_cc,
COALESCE(SUM(CAST(l.total_cost_usd AS REAL)), 0) as new_cost,
COALESCE(AVG(l.latency_ms), 0) as new_lat
FROM proxy_request_logs l
WHERE l.created_at < ?1 AND {effective_filter}
GROUP BY d, a, p, m
) agg
LEFT JOIN usage_daily_rollups old
ON old.date = agg.d AND old.app_type = agg.a
AND old.provider_id = agg.p AND old.model = agg.m",
[cutoff],
)
.map_err(|e| AppError::Database(format!("Rollup aggregation failed: {e}")))?;
AND old.provider_id = agg.p AND old.model = agg.m"
);
// Delete the aggregated detail rows
conn.execute(&aggregation_sql, [cutoff])
.map_err(|e| AppError::Database(format!("Rollup aggregation failed: {e}")))?;
// INSERT uses the effective-log filter to exclude duplicate session rows.
// DELETE intentionally prunes all old details so those duplicates are discarded.
let deleted = conn
.execute(
"DELETE FROM proxy_request_logs WHERE created_at < ?1",
@@ -254,6 +259,69 @@ mod tests {
Ok(())
}
#[test]
fn test_rollup_uses_effective_usage_logs() -> Result<(), AppError> {
let db = Database::memory()?;
let now = chrono::Utc::now().timestamp();
let old_ts = now - 40 * 86400;
{
let conn = crate::database::lock_conn!(db.conn);
conn.execute(
"INSERT INTO proxy_request_logs (
request_id, provider_id, app_type, model, request_model,
input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens,
total_cost_usd, latency_ms, status_code, created_at, data_source
) VALUES (?1, 'openai', 'codex', 'gpt-5.4', 'gpt-5.4', 100, 20, 10, 0, '0.10', 100, 200, ?2, 'proxy')",
rusqlite::params!["codex-proxy-old", old_ts],
)?;
conn.execute(
"INSERT INTO proxy_request_logs (
request_id, provider_id, app_type, model, request_model,
input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens,
total_cost_usd, latency_ms, status_code, created_at, data_source
) VALUES (?1, '_codex_session', 'codex', 'gpt-5.4', 'gpt-5.4', 100, 20, 10, 0, '0.10', 0, 200, ?2, 'codex_session')",
rusqlite::params!["codex-session-old-dup", old_ts + 60],
)?;
}
let deleted = db.rollup_and_prune(30)?;
assert_eq!(deleted, 2);
let conn = crate::database::lock_conn!(db.conn);
let mut stmt = conn.prepare(
"SELECT provider_id, request_count, input_tokens, output_tokens, cache_read_tokens
FROM usage_daily_rollups WHERE app_type = 'codex'",
)?;
let rows = stmt
.query_map([], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, i64>(1)?,
row.get::<_, i64>(2)?,
row.get::<_, i64>(3)?,
row.get::<_, i64>(4)?,
))
})?
.collect::<Result<Vec<_>, _>>()?;
assert_eq!(rows.len(), 1);
let (provider_id, request_count, input_tokens, output_tokens, cache_read_tokens) = &rows[0];
assert_eq!(provider_id, "openai");
assert_eq!(*request_count, 1);
assert_eq!(*input_tokens, 100);
assert_eq!(*output_tokens, 20);
assert_eq!(*cache_read_tokens, 10);
let remaining: i64 =
conn.query_row("SELECT COUNT(*) FROM proxy_request_logs", [], |row| {
row.get(0)
})?;
assert_eq!(remaining, 0);
Ok(())
}
#[test]
fn test_rollup_noop_when_no_old_data() -> Result<(), AppError> {
let db = Database::memory()?;

View File

@@ -214,6 +214,7 @@ impl Database {
[],
)
.map_err(|e| AppError::Database(e.to_string()))?;
Self::create_request_logs_dedup_index_if_supported(conn)?;
// 11. Model Pricing 表
conn.execute(
@@ -1107,6 +1108,7 @@ impl Database {
"data_source",
"TEXT NOT NULL DEFAULT 'proxy'",
)?;
Self::create_request_logs_dedup_index_if_supported(conn)?;
}
// 2. 创建会话日志同步状态表
@@ -1908,6 +1910,36 @@ impl Database {
Ok(())
}
fn create_request_logs_dedup_index_if_supported(conn: &Connection) -> Result<(), AppError> {
if !Self::table_exists(conn, "proxy_request_logs")? {
return Ok(());
}
let required_columns = [
"app_type",
"data_source",
"input_tokens",
"output_tokens",
"cache_read_tokens",
"created_at",
"cache_creation_tokens",
];
for column in required_columns {
if !Self::has_column(conn, "proxy_request_logs", column)? {
return Ok(());
}
}
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_request_logs_dedup_lookup
ON proxy_request_logs(app_type, data_source, input_tokens, output_tokens,
cache_read_tokens, created_at, cache_creation_tokens)",
[],
)
.map_err(|e| AppError::Database(format!("创建使用量去重索引失败: {e}")))?;
Ok(())
}
fn validate_identifier(s: &str, kind: &str) -> Result<(), AppError> {
if s.is_empty() {
return Err(AppError::Database(format!("{kind} 不能为空")));

View File

@@ -869,6 +869,34 @@ mod tests {
assert_eq!(result["usage"]["output_tokens"], 5);
}
#[test]
fn test_openai_to_anthropic_preserves_id_for_usage_dedup() {
let input = json!({
"id": "chatcmpl-claude-compatible",
"object": "chat.completion",
"model": "claude-sonnet-4-5",
"choices": [{
"index": 0,
"message": {"role": "assistant", "content": "Hello!"},
"finish_reason": "stop"
}],
"usage": {"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15}
});
let result = openai_to_anthropic(input).unwrap();
let usage = crate::proxy::usage::parser::TokenUsage::from_claude_response(&result)
.expect("converted Anthropic response should parse usage");
assert_eq!(
usage.message_id.as_deref(),
Some("chatcmpl-claude-compatible")
);
assert_eq!(
usage.dedup_request_id(),
"session:chatcmpl-claude-compatible"
);
}
#[test]
fn test_openai_to_anthropic_with_tool_calls() {
let input = json!({

View File

@@ -13,6 +13,9 @@ use crate::database::{lock_conn, Database};
use crate::error::AppError;
use crate::proxy::usage::calculator::{CostCalculator, ModelPricing};
use crate::proxy::usage::parser::TokenUsage;
use crate::services::usage_stats::{
effective_usage_log_filter, should_skip_session_insert, DedupKey,
};
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
@@ -346,25 +349,10 @@ fn insert_session_log_entry(
) -> Result<bool, AppError> {
let conn = lock_conn!(db.conn);
// 检查是否已存在
let exists: bool = conn
.query_row(
"SELECT COUNT(*) FROM proxy_request_logs WHERE request_id = ?1",
rusqlite::params![request_id],
|row| row.get::<_, i64>(0).map(|c| c > 0),
)
.unwrap_or(false);
if exists {
return Ok(false);
}
// 解析时间戳
let created_at = msg
.timestamp
.as_ref()
.and_then(|ts| {
// 尝试解析 ISO 8601 时间戳
chrono::DateTime::parse_from_rfc3339(ts)
.ok()
.map(|dt| dt.timestamp())
@@ -376,6 +364,19 @@ fn insert_session_log_entry(
.unwrap_or(0)
});
let dedup_key = DedupKey {
app_type: "claude",
model: &msg.model,
input_tokens: msg.input_tokens,
output_tokens: msg.output_tokens,
cache_read_tokens: msg.cache_read_tokens,
cache_creation_tokens: msg.cache_creation_tokens,
created_at,
};
if should_skip_session_insert(&conn, request_id, &dedup_key)? {
return Ok(false);
}
// 计算费用
let usage = TokenUsage {
input_tokens: msg.input_tokens,
@@ -531,13 +532,17 @@ fn try_find_pricing(
pub fn get_data_source_breakdown(db: &Database) -> Result<Vec<DataSourceSummary>, AppError> {
let conn = lock_conn!(db.conn);
let mut stmt = conn.prepare(
"SELECT COALESCE(data_source, 'proxy') as ds, COUNT(*) as cnt,
COALESCE(SUM(CAST(total_cost_usd AS REAL)), 0) as cost
FROM proxy_request_logs
let effective_filter = effective_usage_log_filter("l");
let sql = format!(
"SELECT COALESCE(l.data_source, 'proxy') as ds, COUNT(*) as cnt,
COALESCE(SUM(CAST(l.total_cost_usd AS REAL)), 0) as cost
FROM proxy_request_logs l
WHERE {effective_filter}
GROUP BY ds
ORDER BY cnt DESC",
)?;
ORDER BY cnt DESC"
);
let mut stmt = conn.prepare(&sql)?;
let rows = stmt.query_map([], |row| {
Ok(DataSourceSummary {
@@ -636,4 +641,58 @@ mod tests {
messages.insert("msg_1".to_string(), final_entry);
assert_eq!(messages.get("msg_1").unwrap().output_tokens, 1349);
}
#[test]
fn test_insert_claude_session_skips_matching_proxy_log() -> Result<(), AppError> {
let db = Database::memory()?;
{
let conn = lock_conn!(db.conn);
conn.execute(
"INSERT INTO proxy_request_logs (
request_id, provider_id, app_type, model, request_model,
input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens,
total_cost_usd, latency_ms, status_code, created_at, data_source
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
rusqlite::params![
"proxy-different-id",
"openai-compatible",
"claude",
"claude-sonnet-4-5",
"claude-sonnet-4-5",
100,
20,
10,
5,
"0.10",
100,
200,
1000,
"proxy"
],
)?;
}
let msg = ParsedAssistantUsage {
message_id: "msg_1".to_string(),
model: "claude-sonnet-4-5".to_string(),
input_tokens: 100,
output_tokens: 20,
cache_read_tokens: 10,
cache_creation_tokens: 5,
stop_reason: Some("end_turn".to_string()),
timestamp: Some("1970-01-01T00:16:45Z".to_string()),
session_id: Some("session-1".to_string()),
};
let inserted = insert_session_log_entry(&db, "session:msg_1", &msg)?;
assert!(!inserted);
let conn = lock_conn!(db.conn);
let count: i64 = conn.query_row("SELECT COUNT(*) FROM proxy_request_logs", [], |row| {
row.get(0)
})?;
assert_eq!(count, 1);
Ok(())
}
}

View File

@@ -19,6 +19,7 @@ use crate::error::AppError;
use crate::proxy::usage::calculator::{CostCalculator, ModelPricing};
use crate::proxy::usage::parser::TokenUsage;
use crate::services::session_usage::SessionSyncResult;
use crate::services::usage_stats::{should_skip_session_insert, DedupKey};
use rust_decimal::Decimal;
use std::fs;
use std::io::{BufRead, BufReader};
@@ -438,20 +439,6 @@ fn insert_codex_session_entry(
) -> Result<bool, AppError> {
let conn = lock_conn!(db.conn);
// 检查是否已存在
let exists: bool = conn
.query_row(
"SELECT COUNT(*) FROM proxy_request_logs WHERE request_id = ?1",
rusqlite::params![request_id],
|row| row.get::<_, i64>(0).map(|c| c > 0),
)
.unwrap_or(false);
if exists {
return Ok(false);
}
// 解析时间戳
let created_at = timestamp
.and_then(|ts| {
chrono::DateTime::parse_from_rfc3339(ts)
@@ -465,6 +452,19 @@ fn insert_codex_session_entry(
.unwrap_or(0)
});
let dedup_key = DedupKey {
app_type: "codex",
model,
input_tokens: delta.input,
output_tokens: delta.output,
cache_read_tokens: delta.cached_input,
cache_creation_tokens: 0,
created_at,
};
if should_skip_session_insert(&conn, request_id, &dedup_key)? {
return Ok(false);
}
// 计算费用
let usage = TokenUsage {
input_tokens: delta.input,
@@ -733,6 +733,60 @@ mod tests {
assert!(files.is_empty());
}
#[test]
fn test_insert_codex_session_skips_matching_proxy_log() -> Result<(), AppError> {
let db = Database::memory()?;
{
let conn = lock_conn!(db.conn);
conn.execute(
"INSERT INTO proxy_request_logs (
request_id, provider_id, app_type, model, request_model,
input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens,
total_cost_usd, latency_ms, status_code, created_at, data_source
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
rusqlite::params![
"codex-proxy",
"openai",
"codex",
"gpt-5.4",
"gpt-5.4",
10,
2,
1,
7,
"0.01",
100,
200,
1000,
"proxy"
],
)?;
}
let delta = DeltaTokens {
input: 10,
cached_input: 1,
output: 2,
};
let inserted = insert_codex_session_entry(
&db,
"codex-session-dup",
&delta,
"gpt-5.4",
Some("session-1"),
Some("1970-01-01T00:16:45Z"),
)?;
assert!(!inserted);
let conn = lock_conn!(db.conn);
let count: i64 = conn.query_row("SELECT COUNT(*) FROM proxy_request_logs", [], |row| {
row.get(0)
})?;
assert_eq!(count, 1);
Ok(())
}
// ── 模型名归一化测试 ──
#[test]

View File

@@ -19,6 +19,7 @@ use crate::gemini_config::get_gemini_dir;
use crate::proxy::usage::calculator::{CostCalculator, ModelPricing};
use crate::proxy::usage::parser::TokenUsage;
use crate::services::session_usage::SessionSyncResult;
use crate::services::usage_stats::{should_skip_session_insert, DedupKey};
use rust_decimal::Decimal;
use std::fs;
use std::path::{Path, PathBuf};
@@ -237,7 +238,6 @@ fn insert_gemini_session_entry(
) -> Result<bool, AppError> {
let conn = lock_conn!(db.conn);
// 解析时间戳
let created_at = timestamp
.and_then(|ts| {
chrono::DateTime::parse_from_rfc3339(ts)
@@ -254,6 +254,19 @@ fn insert_gemini_session_entry(
// 合并 thoughts 到 output思考 token 按输出计费)
let output_tokens = tokens.output + tokens.thoughts;
let dedup_key = DedupKey {
app_type: "gemini",
model,
input_tokens: tokens.input,
output_tokens,
cache_read_tokens: tokens.cached,
cache_creation_tokens: 0,
created_at,
};
if should_skip_session_insert(&conn, request_id, &dedup_key)? {
return Ok(false);
}
// 计算费用
let usage = TokenUsage {
input_tokens: tokens.input,
@@ -433,6 +446,61 @@ mod tests {
assert!(files.is_empty());
}
#[test]
fn test_insert_gemini_session_skips_matching_proxy_log() -> Result<(), AppError> {
let db = Database::memory()?;
{
let conn = lock_conn!(db.conn);
conn.execute(
"INSERT INTO proxy_request_logs (
request_id, provider_id, app_type, model, request_model,
input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens,
total_cost_usd, latency_ms, status_code, created_at, data_source
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
rusqlite::params![
"gemini-proxy",
"google",
"gemini",
"gemini-2.5-pro",
"gemini-2.5-pro",
10,
7,
1,
0,
"0.01",
100,
200,
1000,
"proxy"
],
)?;
}
let tokens = GeminiTokens {
input: 10,
output: 2,
cached: 1,
thoughts: 5,
};
let inserted = insert_gemini_session_entry(
&db,
"gemini-session-dup",
&tokens,
"gemini-2.5-pro",
Some("session-1"),
Some("1970-01-01T00:16:45Z"),
)?;
assert!(!inserted);
let conn = lock_conn!(db.conn);
let count: i64 = conn.query_row("SELECT COUNT(*) FROM proxy_request_logs", [], |row| {
row.get(0)
})?;
assert_eq!(count, 1);
Ok(())
}
#[test]
fn test_parse_gemini_tokens() {
let json: serde_json::Value = serde_json::json!({

View File

@@ -130,6 +130,123 @@ fn provider_name_coalesce(log_alias: &str, provider_alias: &str) -> String {
)
}
pub(crate) const SESSION_PROXY_DEDUP_WINDOW_SECONDS: i64 = 10 * 60;
pub(crate) fn effective_usage_log_filter(log_alias: &str) -> String {
format!(
"NOT (
{log_alias}.data_source IN ('session_log', 'codex_session', 'gemini_session')
AND EXISTS (
SELECT 1
FROM proxy_request_logs proxy_dedup
WHERE proxy_dedup.data_source = 'proxy'
AND proxy_dedup.app_type = {log_alias}.app_type
AND proxy_dedup.status_code >= 200
AND proxy_dedup.status_code < 300
AND proxy_dedup.input_tokens = {log_alias}.input_tokens
AND proxy_dedup.output_tokens = {log_alias}.output_tokens
AND proxy_dedup.cache_read_tokens = {log_alias}.cache_read_tokens
AND (
proxy_dedup.cache_creation_tokens = {log_alias}.cache_creation_tokens
OR (
{log_alias}.cache_creation_tokens = 0
AND {log_alias}.data_source IN ('codex_session', 'gemini_session')
)
)
AND proxy_dedup.created_at BETWEEN
{log_alias}.created_at - {SESSION_PROXY_DEDUP_WINDOW_SECONDS}
AND {log_alias}.created_at + {SESSION_PROXY_DEDUP_WINDOW_SECONDS}
AND (
LOWER(proxy_dedup.model) = LOWER({log_alias}.model)
OR LOWER(proxy_dedup.model) = 'unknown'
OR LOWER({log_alias}.model) = 'unknown'
)
)
)"
)
}
/// 跨源去重指纹键。
///
/// `cache_creation_tokens`Codex/Gemini session 日志不暴露该字段,调用方传 0
/// 表示"未知",匹配器会放行 proxy 侧任意 cache_creation_tokens 值。
#[derive(Debug, Clone, Copy)]
pub(crate) struct DedupKey<'a> {
pub app_type: &'a str,
pub model: &'a str,
pub input_tokens: u32,
pub output_tokens: u32,
pub cache_read_tokens: u32,
pub cache_creation_tokens: u32,
pub created_at: i64,
}
/// session 日志写入前的统一去重判定。
///
/// 命中以下任一条件即跳过插入:① `request_id` 已存在;② 时间窗口内存在
/// 与 `key` 匹配的 proxy 日志(指纹去重)。
pub(crate) fn should_skip_session_insert(
conn: &Connection,
request_id: &str,
key: &DedupKey,
) -> Result<bool, AppError> {
if proxy_request_id_exists(conn, request_id)? {
return Ok(true);
}
has_matching_proxy_usage_log(conn, key)
}
fn proxy_request_id_exists(conn: &Connection, request_id: &str) -> Result<bool, AppError> {
conn.query_row(
"SELECT EXISTS(SELECT 1 FROM proxy_request_logs WHERE request_id = ?1)",
params![request_id],
|row| row.get::<_, bool>(0),
)
.map_err(|e| AppError::Database(format!("查询 request_id 失败: {e}")))
}
pub(crate) fn has_matching_proxy_usage_log(
conn: &Connection,
key: &DedupKey,
) -> Result<bool, AppError> {
let allow_missing_cache_creation =
matches!(key.app_type, "codex" | "gemini") && key.cache_creation_tokens == 0;
conn.query_row(
"SELECT EXISTS (
SELECT 1
FROM proxy_request_logs l
WHERE l.data_source = 'proxy'
AND l.app_type = ?1
AND l.status_code >= 200
AND l.status_code < 300
AND l.input_tokens = ?3
AND l.output_tokens = ?4
AND l.cache_read_tokens = ?5
AND (l.cache_creation_tokens = ?6 OR ?9 = 1)
AND l.created_at BETWEEN ?7 - ?8 AND ?7 + ?8
AND (
LOWER(l.model) = LOWER(?2)
OR LOWER(l.model) = 'unknown'
OR LOWER(?2) = 'unknown'
)
)",
params![
key.app_type,
key.model,
key.input_tokens as i64,
key.output_tokens as i64,
key.cache_read_tokens as i64,
key.cache_creation_tokens as i64,
key.created_at,
SESSION_PROXY_DEDUP_WINDOW_SECONDS,
allow_missing_cache_creation as i64,
],
|row| row.get::<_, bool>(0),
)
.map_err(|e| AppError::Database(format!("查询重复代理用量日志失败: {e}")))
}
#[derive(Debug, Clone, Default)]
struct RollupDateBounds {
start: Option<String>,
@@ -231,19 +348,19 @@ impl Database {
let conn = lock_conn!(self.conn);
// Build detail WHERE clause
let mut conditions = Vec::new();
let mut conditions = vec![effective_usage_log_filter("l")];
let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
if let Some(start) = start_date {
conditions.push("created_at >= ?");
conditions.push("l.created_at >= ?".to_string());
params_vec.push(Box::new(start));
}
if let Some(end) = end_date {
conditions.push("created_at <= ?");
conditions.push("l.created_at <= ?".to_string());
params_vec.push(Box::new(end));
}
if let Some(at) = app_type {
conditions.push("app_type = ?");
conditions.push("l.app_type = ?".to_string());
params_vec.push(Box::new(at.to_string()));
}
@@ -293,7 +410,7 @@ impl Database {
COALESCE(SUM(cache_creation_tokens), 0) as total_cache_creation_tokens,
COALESCE(SUM(cache_read_tokens), 0) as total_cache_read_tokens,
COALESCE(SUM(CASE WHEN status_code >= 200 AND status_code < 300 THEN 1 ELSE 0 END), 0) as success_count
FROM proxy_request_logs {where_clause}) d,
FROM proxy_request_logs l {where_clause}) d,
(SELECT
COALESCE(SUM(request_count), 0) as total_requests,
COALESCE(SUM(CAST(total_cost_usd AS REAL)), 0) as total_cost,
@@ -369,23 +486,25 @@ impl Database {
}
let app_type_filter = if app_type.is_some() {
"AND app_type = ?4"
"AND l.app_type = ?4"
} else {
""
};
let effective_filter = effective_usage_log_filter("l");
let sql = format!(
"SELECT
CAST((created_at - ?1) / ?3 AS INTEGER) as bucket_idx,
CAST((l.created_at - ?1) / ?3 AS INTEGER) as bucket_idx,
COUNT(*) as request_count,
COALESCE(SUM(CAST(total_cost_usd AS REAL)), 0) as total_cost,
COALESCE(SUM(input_tokens + output_tokens), 0) as total_tokens,
COALESCE(SUM(input_tokens), 0) as total_input_tokens,
COALESCE(SUM(output_tokens), 0) as total_output_tokens,
COALESCE(SUM(cache_creation_tokens), 0) as total_cache_creation_tokens,
COALESCE(SUM(cache_read_tokens), 0) as total_cache_read_tokens
FROM proxy_request_logs
WHERE created_at >= ?1 AND created_at <= ?2 {app_type_filter}
COALESCE(SUM(CAST(l.total_cost_usd AS REAL)), 0) as total_cost,
COALESCE(SUM(l.input_tokens + l.output_tokens), 0) as total_tokens,
COALESCE(SUM(l.input_tokens), 0) as total_input_tokens,
COALESCE(SUM(l.output_tokens), 0) as total_output_tokens,
COALESCE(SUM(l.cache_creation_tokens), 0) as total_cache_creation_tokens,
COALESCE(SUM(l.cache_read_tokens), 0) as total_cache_read_tokens
FROM proxy_request_logs l
WHERE l.created_at >= ?1 AND l.created_at <= ?2
AND {effective_filter} {app_type_filter}
GROUP BY bucket_idx
ORDER BY bucket_idx ASC"
);
@@ -456,23 +575,25 @@ impl Database {
let bucket_count = (end_day.signed_duration_since(start_day).num_days() + 1) as usize;
let app_type_filter = if app_type.is_some() {
"AND app_type = ?3"
"AND l.app_type = ?3"
} else {
""
};
let effective_filter = effective_usage_log_filter("l");
let detail_sql = format!(
"SELECT
date(created_at, 'unixepoch', 'localtime') as bucket_date,
date(l.created_at, 'unixepoch', 'localtime') as bucket_date,
COUNT(*) as request_count,
COALESCE(SUM(CAST(total_cost_usd AS REAL)), 0) as total_cost,
COALESCE(SUM(input_tokens + output_tokens), 0) as total_tokens,
COALESCE(SUM(input_tokens), 0) as total_input_tokens,
COALESCE(SUM(output_tokens), 0) as total_output_tokens,
COALESCE(SUM(cache_creation_tokens), 0) as total_cache_creation_tokens,
COALESCE(SUM(cache_read_tokens), 0) as total_cache_read_tokens
FROM proxy_request_logs
WHERE created_at >= ?1 AND created_at <= ?2 {app_type_filter}
COALESCE(SUM(CAST(l.total_cost_usd AS REAL)), 0) as total_cost,
COALESCE(SUM(l.input_tokens + l.output_tokens), 0) as total_tokens,
COALESCE(SUM(l.input_tokens), 0) as total_input_tokens,
COALESCE(SUM(l.output_tokens), 0) as total_output_tokens,
COALESCE(SUM(l.cache_creation_tokens), 0) as total_cache_creation_tokens,
COALESCE(SUM(l.cache_read_tokens), 0) as total_cache_read_tokens
FROM proxy_request_logs l
WHERE l.created_at >= ?1 AND l.created_at <= ?2
AND {effective_filter} {app_type_filter}
GROUP BY bucket_date
ORDER BY bucket_date ASC"
);
@@ -623,18 +744,18 @@ impl Database {
) -> Result<Vec<ProviderStats>, AppError> {
let conn = lock_conn!(self.conn);
let mut detail_conditions = Vec::new();
let mut detail_conditions = vec![effective_usage_log_filter("l")];
let mut detail_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
if let Some(start) = start_date {
detail_conditions.push("l.created_at >= ?");
detail_conditions.push("l.created_at >= ?".to_string());
detail_params.push(Box::new(start));
}
if let Some(end) = end_date {
detail_conditions.push("l.created_at <= ?");
detail_conditions.push("l.created_at <= ?".to_string());
detail_params.push(Box::new(end));
}
if let Some(at) = app_type {
detail_conditions.push("l.app_type = ?");
detail_conditions.push("l.app_type = ?".to_string());
detail_params.push(Box::new(at.to_string()));
}
let detail_where = if detail_conditions.is_empty() {
@@ -747,18 +868,18 @@ impl Database {
) -> Result<Vec<ModelStats>, AppError> {
let conn = lock_conn!(self.conn);
let mut detail_conditions = Vec::new();
let mut detail_conditions = vec![effective_usage_log_filter("l")];
let mut detail_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
if let Some(start) = start_date {
detail_conditions.push("l.created_at >= ?");
detail_conditions.push("l.created_at >= ?".to_string());
detail_params.push(Box::new(start));
}
if let Some(end) = end_date {
detail_conditions.push("l.created_at <= ?");
detail_conditions.push("l.created_at <= ?".to_string());
detail_params.push(Box::new(end));
}
if let Some(at) = app_type {
detail_conditions.push("l.app_type = ?");
detail_conditions.push("l.app_type = ?".to_string());
detail_params.push(Box::new(at.to_string()));
}
let detail_where = if detail_conditions.is_empty() {
@@ -855,31 +976,31 @@ impl Database {
) -> Result<PaginatedLogs, AppError> {
let conn = lock_conn!(self.conn);
let mut conditions = Vec::new();
let mut conditions = vec![effective_usage_log_filter("l")];
let mut params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
if let Some(ref app_type) = filters.app_type {
conditions.push("l.app_type = ?");
conditions.push("l.app_type = ?".to_string());
params.push(Box::new(app_type.clone()));
}
if let Some(ref provider_name) = filters.provider_name {
conditions.push("p.name LIKE ?");
conditions.push("p.name LIKE ?".to_string());
params.push(Box::new(format!("%{provider_name}%")));
}
if let Some(ref model) = filters.model {
conditions.push("l.model LIKE ?");
conditions.push("l.model LIKE ?".to_string());
params.push(Box::new(format!("%{model}%")));
}
if let Some(status) = filters.status_code {
conditions.push("l.status_code = ?");
conditions.push("l.status_code = ?".to_string());
params.push(Box::new(status as i64));
}
if let Some(start) = filters.start_date {
conditions.push("l.created_at >= ?");
conditions.push("l.created_at >= ?".to_string());
params.push(Box::new(start));
}
if let Some(end) = filters.end_date {
conditions.push("l.created_at <= ?");
conditions.push("l.created_at <= ?".to_string());
params.push(Box::new(end));
}
@@ -1351,6 +1472,48 @@ mod tests {
}
}
#[allow(clippy::too_many_arguments)]
fn insert_usage_log(
conn: &Connection,
request_id: &str,
app_type: &str,
provider_id: &str,
model: &str,
data_source: &str,
created_at: i64,
input_tokens: i64,
output_tokens: i64,
cache_read_tokens: i64,
cache_creation_tokens: i64,
status_code: i64,
total_cost_usd: &str,
) -> Result<(), AppError> {
conn.execute(
"INSERT INTO proxy_request_logs (
request_id, provider_id, app_type, model, request_model,
input_tokens, output_tokens, cache_read_tokens, cache_creation_tokens,
input_cost_usd, output_cost_usd, cache_read_cost_usd, cache_creation_cost_usd,
total_cost_usd, latency_ms, status_code, created_at, data_source
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, '0', '0', '0', '0', ?, 100, ?, ?, ?)",
params![
request_id,
provider_id,
app_type,
model,
model,
input_tokens,
output_tokens,
cache_read_tokens,
cache_creation_tokens,
total_cost_usd,
status_code,
created_at,
data_source
],
)?;
Ok(())
}
#[test]
fn test_get_usage_summary() -> Result<(), AppError> {
let db = Database::memory()?;
@@ -1525,6 +1688,359 @@ mod tests {
Ok(())
}
#[test]
fn test_effective_usage_dedup_prefers_proxy_for_session_sources() -> Result<(), AppError> {
let db = Database::memory()?;
{
let conn = lock_conn!(db.conn);
insert_usage_log(
&conn,
"codex-proxy",
"codex",
"openai",
"GPT-5.4",
"proxy",
10_000,
100,
20,
10,
7,
200,
"0.10",
)?;
insert_usage_log(
&conn,
"codex-session-dup",
"codex",
"_codex_session",
"gpt-5.4",
"codex_session",
10_060,
100,
20,
10,
0,
200,
"0.10",
)?;
insert_usage_log(
&conn,
"claude-proxy",
"claude",
"openai-compatible",
"claude-sonnet-4-5",
"proxy",
25_000,
300,
60,
20,
5,
200,
"0.30",
)?;
insert_usage_log(
&conn,
"claude-session-dup",
"claude",
"_session",
"claude-sonnet-4-5",
"session_log",
25_060,
300,
60,
20,
5,
200,
"0.30",
)?;
insert_usage_log(
&conn,
"gemini-proxy",
"gemini",
"google",
"gemini-2.5-pro",
"proxy",
20_000,
200,
40,
30,
0,
200,
"0.20",
)?;
insert_usage_log(
&conn,
"gemini-session-dup",
"gemini",
"_gemini_session",
"gemini-2.5-pro",
"gemini_session",
20_060,
200,
40,
30,
0,
200,
"0.20",
)?;
insert_usage_log(
&conn,
"codex-session-only",
"codex",
"_codex_session",
"gpt-5.4",
"codex_session",
30_000,
50,
5,
0,
0,
200,
"0.02",
)?;
}
let summary = db.get_usage_summary(None, None, None)?;
assert_eq!(summary.total_requests, 4);
assert_eq!(summary.total_input_tokens, 650);
assert_eq!(summary.total_output_tokens, 125);
assert_eq!(summary.total_cache_read_tokens, 60);
assert_eq!(summary.total_cache_creation_tokens, 12);
let trends = db.get_daily_trends(Some(0), Some(40_000), None)?;
assert_eq!(trends.iter().map(|stat| stat.request_count).sum::<u64>(), 4);
let provider_stats = db.get_provider_stats(None, None, None)?;
assert_eq!(
provider_stats
.iter()
.map(|stat| stat.request_count)
.sum::<u64>(),
4
);
assert!(provider_stats
.iter()
.any(|stat| stat.provider_id == "_codex_session" && stat.request_count == 1));
assert!(!provider_stats
.iter()
.any(|stat| stat.provider_id == "_gemini_session"));
assert!(!provider_stats
.iter()
.any(|stat| stat.provider_id == "_session"));
let model_stats = db.get_model_stats(None, None, None)?;
assert_eq!(
model_stats
.iter()
.map(|stat| stat.request_count)
.sum::<u64>(),
4
);
let logs = db.get_request_logs(&LogFilters::default(), 0, 10)?;
let request_ids: Vec<&str> = logs
.data
.iter()
.map(|log| log.request_id.as_str())
.collect();
assert_eq!(logs.total, 4);
assert!(request_ids.contains(&"codex-proxy"));
assert!(request_ids.contains(&"claude-proxy"));
assert!(request_ids.contains(&"gemini-proxy"));
assert!(request_ids.contains(&"codex-session-only"));
assert!(!request_ids.contains(&"codex-session-dup"));
assert!(!request_ids.contains(&"claude-session-dup"));
assert!(!request_ids.contains(&"gemini-session-dup"));
let breakdown = crate::services::session_usage::get_data_source_breakdown(&db)?;
let proxy_count = breakdown
.iter()
.find(|item| item.data_source == "proxy")
.map(|item| item.request_count);
let codex_session_count = breakdown
.iter()
.find(|item| item.data_source == "codex_session")
.map(|item| item.request_count);
let gemini_session_count = breakdown
.iter()
.find(|item| item.data_source == "gemini_session")
.map(|item| item.request_count);
let session_log_count = breakdown
.iter()
.find(|item| item.data_source == "session_log")
.map(|item| item.request_count);
assert_eq!(proxy_count, Some(3));
assert_eq!(codex_session_count, Some(1));
assert_eq!(gemini_session_count, None);
assert_eq!(session_log_count, None);
Ok(())
}
#[test]
fn test_effective_usage_dedup_keeps_non_matching_session_rows() -> Result<(), AppError> {
let db = Database::memory()?;
{
let conn = lock_conn!(db.conn);
insert_usage_log(
&conn,
"proxy-base",
"codex",
"openai",
"gpt-5.4",
"proxy",
10_000,
100,
20,
10,
0,
200,
"0.10",
)?;
insert_usage_log(
&conn,
"session-outside-window",
"codex",
"_codex_session",
"gpt-5.4",
"codex_session",
10_601,
100,
20,
10,
0,
200,
"0.10",
)?;
insert_usage_log(
&conn,
"session-token-mismatch",
"codex",
"_codex_session",
"gpt-5.4",
"codex_session",
10_060,
101,
20,
10,
0,
200,
"0.10",
)?;
insert_usage_log(
&conn,
"session-app-mismatch",
"gemini",
"_gemini_session",
"gpt-5.4",
"gemini_session",
10_060,
100,
20,
10,
0,
200,
"0.10",
)?;
insert_usage_log(
&conn,
"session-model-mismatch",
"codex",
"_codex_session",
"different-model",
"codex_session",
10_060,
100,
20,
10,
0,
200,
"0.10",
)?;
insert_usage_log(
&conn,
"proxy-error",
"codex",
"openai",
"gpt-5.4",
"proxy",
20_000,
300,
60,
0,
0,
500,
"0.00",
)?;
insert_usage_log(
&conn,
"session-matches-error-proxy",
"codex",
"_codex_session",
"gpt-5.4",
"codex_session",
20_060,
300,
60,
0,
0,
200,
"0.30",
)?;
insert_usage_log(
&conn,
"claude-proxy-cache-creation",
"claude",
"anthropic",
"claude-sonnet-4-5",
"proxy",
30_000,
100,
20,
10,
5,
200,
"0.10",
)?;
insert_usage_log(
&conn,
"claude-session-cache-creation-mismatch",
"claude",
"_session",
"claude-sonnet-4-5",
"session_log",
30_060,
100,
20,
10,
0,
200,
"0.10",
)?;
}
let summary = db.get_usage_summary(None, None, None)?;
assert_eq!(summary.total_requests, 9);
let logs = db.get_request_logs(&LogFilters::default(), 0, 10)?;
let request_ids: Vec<&str> = logs
.data
.iter()
.map(|log| log.request_id.as_str())
.collect();
assert_eq!(logs.total, 9);
assert!(request_ids.contains(&"session-outside-window"));
assert!(request_ids.contains(&"session-token-mismatch"));
assert!(request_ids.contains(&"session-app-mismatch"));
assert!(request_ids.contains(&"session-model-mismatch"));
assert!(request_ids.contains(&"session-matches-error-proxy"));
assert!(request_ids.contains(&"claude-session-cache-creation-mismatch"));
Ok(())
}
#[test]
fn test_get_model_stats() -> Result<(), AppError> {
let db = Database::memory()?;