mirror of
https://github.com/rustfs/rustfs.git
synced 2026-05-06 14:12:29 +08:00
test(filemeta): cover no-wait refresh coalescing (#2755)
This commit is contained in:
@@ -872,7 +872,7 @@ mod tests {
|
||||
Arc, Mutex as StdMutex,
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
};
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::sync::{Notify, oneshot};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[tokio::test]
|
||||
@@ -1127,6 +1127,74 @@ mod tests {
|
||||
.expect("background refresh should complete");
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn test_cache_no_wait_coalesces_background_refreshes() {
|
||||
let calls = Arc::new(AtomicUsize::new(0));
|
||||
let release_refresh = Arc::new(Notify::new());
|
||||
let ttl = Duration::from_secs(60);
|
||||
let cache = Arc::new(Cache::new(
|
||||
Box::new({
|
||||
let calls = Arc::clone(&calls);
|
||||
let release_refresh = Arc::clone(&release_refresh);
|
||||
move || {
|
||||
let calls = Arc::clone(&calls);
|
||||
let release_refresh = Arc::clone(&release_refresh);
|
||||
Box::pin(async move {
|
||||
let call = calls.fetch_add(1, Ordering::SeqCst);
|
||||
if call > 0 {
|
||||
release_refresh.notified().await;
|
||||
}
|
||||
Ok(call)
|
||||
})
|
||||
}
|
||||
}),
|
||||
ttl,
|
||||
Opts {
|
||||
return_last_good: true,
|
||||
no_wait: true,
|
||||
},
|
||||
));
|
||||
|
||||
let prime = Arc::clone(&cache).get().await.expect("prime cache should succeed");
|
||||
assert_eq!(prime, 0);
|
||||
|
||||
let now = Cache::<usize>::current_unix_secs();
|
||||
cache
|
||||
.last_update_secs
|
||||
.store(now.saturating_sub(ttl.as_secs()), AtomicOrdering::SeqCst);
|
||||
|
||||
let stale = Arc::clone(&cache).get().await.expect("stale get should succeed");
|
||||
assert_eq!(stale, 0);
|
||||
|
||||
tokio::time::timeout(Duration::from_secs(1), async {
|
||||
loop {
|
||||
if calls.load(Ordering::SeqCst) == 2 {
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("background refresh should start");
|
||||
|
||||
let mut readers = Vec::new();
|
||||
for _ in 0..8 {
|
||||
let cache = Arc::clone(&cache);
|
||||
readers.push(tokio::spawn(
|
||||
async move { Arc::clone(&cache).get().await.expect("stale get should succeed") },
|
||||
));
|
||||
}
|
||||
|
||||
for reader in readers {
|
||||
assert_eq!(reader.await.expect("reader task should not panic"), 0);
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
assert_eq!(calls.load(Ordering::SeqCst), 2);
|
||||
|
||||
release_refresh.notify_waiters();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cache_return_last_good_on_refresh_error() {
|
||||
let calls = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
Reference in New Issue
Block a user