diff --git a/crates/filemeta/src/metacache.rs b/crates/filemeta/src/metacache.rs index 725342956..e6f475bfe 100644 --- a/crates/filemeta/src/metacache.rs +++ b/crates/filemeta/src/metacache.rs @@ -872,7 +872,7 @@ mod tests { Arc, Mutex as StdMutex, atomic::{AtomicUsize, Ordering}, }; - use tokio::sync::oneshot; + use tokio::sync::{Notify, oneshot}; use uuid::Uuid; #[tokio::test] @@ -1127,6 +1127,74 @@ mod tests { .expect("background refresh should complete"); } + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_cache_no_wait_coalesces_background_refreshes() { + let calls = Arc::new(AtomicUsize::new(0)); + let release_refresh = Arc::new(Notify::new()); + let ttl = Duration::from_secs(60); + let cache = Arc::new(Cache::new( + Box::new({ + let calls = Arc::clone(&calls); + let release_refresh = Arc::clone(&release_refresh); + move || { + let calls = Arc::clone(&calls); + let release_refresh = Arc::clone(&release_refresh); + Box::pin(async move { + let call = calls.fetch_add(1, Ordering::SeqCst); + if call > 0 { + release_refresh.notified().await; + } + Ok(call) + }) + } + }), + ttl, + Opts { + return_last_good: true, + no_wait: true, + }, + )); + + let prime = Arc::clone(&cache).get().await.expect("prime cache should succeed"); + assert_eq!(prime, 0); + + let now = Cache::::current_unix_secs(); + cache + .last_update_secs + .store(now.saturating_sub(ttl.as_secs()), AtomicOrdering::SeqCst); + + let stale = Arc::clone(&cache).get().await.expect("stale get should succeed"); + assert_eq!(stale, 0); + + tokio::time::timeout(Duration::from_secs(1), async { + loop { + if calls.load(Ordering::SeqCst) == 2 { + break; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .expect("background refresh should start"); + + let mut readers = Vec::new(); + for _ in 0..8 { + let cache = Arc::clone(&cache); + readers.push(tokio::spawn( + async move { Arc::clone(&cache).get().await.expect("stale get should succeed") }, + )); + } + + for reader in readers { + assert_eq!(reader.await.expect("reader task should not panic"), 0); + } + + tokio::time::sleep(Duration::from_millis(50)).await; + assert_eq!(calls.load(Ordering::SeqCst), 2); + + release_refresh.notify_waiters(); + } + #[tokio::test] async fn test_cache_return_last_good_on_refresh_error() { let calls = Arc::new(AtomicUsize::new(0));