diff --git a/Cargo.lock b/Cargo.lock index 2f034cf87..31071cd94 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8146,6 +8146,7 @@ dependencies = [ name = "rustfs-filemeta" version = "1.0.0-beta.1" dependencies = [ + "arc-swap", "byteorder", "bytes", "crc-fast", diff --git a/crates/filemeta/Cargo.toml b/crates/filemeta/Cargo.toml index 3ac6c96d6..ba4846463 100644 --- a/crates/filemeta/Cargo.toml +++ b/crates/filemeta/Cargo.toml @@ -41,6 +41,7 @@ tracing.workspace = true thiserror.workspace = true s3s.workspace = true regex.workspace = true +arc-swap.workspace = true [dev-dependencies] criterion = { workspace = true } diff --git a/crates/filemeta/src/metacache.rs b/crates/filemeta/src/metacache.rs index 80c857c47..90cacbc45 100644 --- a/crates/filemeta/src/metacache.rs +++ b/crates/filemeta/src/metacache.rs @@ -16,6 +16,7 @@ use crate::{ Error, FileInfo, FileInfoOpts, FileInfoVersions, FileMeta, FileMetaShallowVersion, Result, VersionType, get_file_info, merge_file_meta_versions, }; +use arc_swap::ArcSwapOption; use rmp::Marker; use serde::{Deserialize, Serialize}; use std::cmp::Ordering; @@ -24,10 +25,9 @@ use std::{ fmt::Debug, future::Future, pin::Pin, - ptr, sync::{ Arc, - atomic::{AtomicPtr, AtomicU64, Ordering as AtomicOrdering}, + atomic::{AtomicU64, Ordering as AtomicOrdering}, }, time::{Duration, SystemTime, UNIX_EPOCH}, }; @@ -767,100 +767,74 @@ pub struct Cache { update_fn: UpdateFn, ttl: Duration, opts: Opts, - val: AtomicPtr, - last_update_ms: AtomicU64, - updating: Arc>, + val: ArcSwapOption, + last_update_secs: AtomicU64, + updating: Arc>, } -impl Cache { +impl Cache { pub fn new(update_fn: UpdateFn, ttl: Duration, opts: Opts) -> Self { - let val = AtomicPtr::new(ptr::null_mut()); Self { update_fn, ttl, opts, - val, - last_update_ms: AtomicU64::new(0), - updating: Arc::new(Mutex::new(false)), + val: ArcSwapOption::from(None), + last_update_secs: AtomicU64::new(0), + updating: Arc::new(Mutex::new(())), } } - #[allow(unsafe_code)] pub async fn get(self: Arc) -> std::io::Result { - let v_ptr = self.val.load(AtomicOrdering::SeqCst); - let v = if v_ptr.is_null() { - None - } else { - Some(unsafe { (*v_ptr).clone() }) - }; + let value = self.get_shared().await?; + Ok(value.as_ref().clone()) + } - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Time went backwards") - .as_secs(); - if now - self.last_update_ms.load(AtomicOrdering::SeqCst) < self.ttl.as_secs() - && let Some(v) = v + pub async fn get_shared(self: Arc) -> std::io::Result> { + let now = Self::current_unix_secs(); + let current = self.cached_value(); + if self.age_since_last_update(now) < self.ttl.as_secs() + && let Some(value) = current.clone() { - return Ok(v); + return Ok(value); } if self.opts.no_wait - && now - self.last_update_ms.load(AtomicOrdering::SeqCst) < self.ttl.as_secs() * 2 - && let Some(value) = v + && self.age_since_last_update(now) < self.ttl.as_secs().saturating_mul(2) + && let Some(value) = current { - if self.updating.try_lock().is_ok() { + if let Ok(update_guard) = Arc::clone(&self.updating).try_lock_owned() { let this = Arc::clone(&self); spawn(async move { + let _guard = update_guard; let _ = this.update().await; }); } return Ok(value); } - let _ = self.updating.lock().await; + let _guard = self.updating.lock().await; - if let (Ok(duration), Some(value)) = ( - SystemTime::now().duration_since(UNIX_EPOCH + Duration::from_secs(self.last_update_ms.load(AtomicOrdering::SeqCst))), - v, - ) && duration < self.ttl + let now = Self::current_unix_secs(); + if self.age_since_last_update(now) < self.ttl.as_secs() + && let Some(value) = self.cached_value() { return Ok(value); } - match self.update().await { - Ok(_) => { - let v_ptr = self.val.load(AtomicOrdering::SeqCst); - let v = if v_ptr.is_null() { - None - } else { - Some(unsafe { (*v_ptr).clone() }) - }; - Ok(v.unwrap()) - } - Err(err) => Err(err), - } + self.update().await?; + self.cached_value() + .ok_or_else(|| std::io::Error::other("cache update completed without a value")) } - #[allow(unsafe_code)] async fn update(&self) -> std::io::Result<()> { match (self.update_fn)().await { Ok(val) => { - let old = self.val.swap(Box::into_raw(Box::new(val)), AtomicOrdering::SeqCst); - if !old.is_null() { - unsafe { - drop(Box::from_raw(old)); - } - } - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Time went backwards") - .as_secs(); - self.last_update_ms.store(now, AtomicOrdering::SeqCst); + self.val.store(Some(Arc::new(val))); + self.last_update_secs.store(Self::current_unix_secs(), AtomicOrdering::SeqCst); Ok(()) } Err(err) => { - let v_ptr = self.val.load(AtomicOrdering::SeqCst); - if self.opts.return_last_good && !v_ptr.is_null() { + if self.opts.return_last_good && self.cached_value().is_some() { return Ok(()); } @@ -868,6 +842,23 @@ impl Cache { } } } + + fn current_unix_secs() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_secs() + } + + fn age_since_last_update(&self, now_secs: u64) -> u64 { + now_secs + .checked_sub(self.last_update_secs.load(AtomicOrdering::SeqCst)) + .unwrap_or(u64::MAX) + } + + fn cached_value(&self) -> Option> { + self.val.load_full() + } } #[cfg(test)] @@ -877,6 +868,11 @@ mod tests { use crate::{FileMetaVersion, MetaDeleteMarker}; use std::collections::HashMap; use std::io::Cursor; + use std::sync::{ + Arc, Mutex as StdMutex, + atomic::{AtomicUsize, Ordering}, + }; + use tokio::sync::oneshot; use uuid::Uuid; #[tokio::test] @@ -964,4 +960,196 @@ mod tests { assert_eq!(decoded.versions, cached.versions); assert_ne!(extended_versions, cached.versions.len()); } + + fn build_hashmap_cache(update_size: usize) -> Arc>> { + let generation = Arc::new(AtomicUsize::new(0)); + Arc::new(Cache::new( + Box::new(move || { + let generation = Arc::clone(&generation); + Box::pin(async move { + let v = generation.fetch_add(1, Ordering::SeqCst); + let mut m = HashMap::with_capacity(update_size); + for i in 0..update_size { + m.insert(i, i ^ v); + } + Ok(m) + }) + }), + Duration::ZERO, + Opts::default(), + )) + } + + async fn run_cache_workload(cache: Arc>>, workers: usize, rounds: usize, probe_mod: usize) { + let mut tasks = Vec::with_capacity(workers); + for worker in 0..workers { + let cache = Arc::clone(&cache); + tasks.push(tokio::spawn(async move { + for round in 0..rounds { + let m = Arc::clone(&cache).get().await.expect("cache get should succeed"); + let key = (worker.wrapping_mul(17).wrapping_add(round)) % probe_mod; + assert!(m.contains_key(&key), "expected key {key} to exist"); + } + })); + } + + for task in tasks { + task.await.expect("worker task should not panic"); + } + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 8)] + async fn test_cache_concurrency_smoke() { + let cache = build_hashmap_cache(2048); + run_cache_workload(cache, 32, 120, 2048).await; + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn test_cache_no_wait_returns_stale_and_refreshes_in_background() { + let calls = Arc::new(AtomicUsize::new(0)); + let (bg_started_tx, bg_started_rx) = oneshot::channel::<()>(); + let (release_bg_tx, release_bg_rx) = oneshot::channel::<()>(); + let bg_started_tx = Arc::new(StdMutex::new(Some(bg_started_tx))); + let release_bg_rx = Arc::new(StdMutex::new(Some(release_bg_rx))); + + let cache = Arc::new(Cache::new( + Box::new({ + let calls = Arc::clone(&calls); + let bg_started_tx = Arc::clone(&bg_started_tx); + let release_bg_rx = Arc::clone(&release_bg_rx); + move || { + let calls = Arc::clone(&calls); + let bg_started_tx = Arc::clone(&bg_started_tx); + let release_bg_rx = Arc::clone(&release_bg_rx); + Box::pin(async move { + let call = calls.fetch_add(1, Ordering::SeqCst); + if call == 1 { + let tx = { bg_started_tx.lock().expect("start sender lock should not poison").take() }; + if let Some(tx) = tx { + let _ = tx.send(()); + } + let rx = { release_bg_rx.lock().expect("release receiver lock should not poison").take() }; + if let Some(rx) = rx { + let _ = rx.await; + } + } + Ok(call) + }) + } + }), + Duration::from_secs(1), + Opts { + return_last_good: true, + no_wait: true, + }, + )); + + let prime = Arc::clone(&cache).get().await.expect("prime cache should succeed"); + assert_eq!(prime, 0); + + let now = Cache::::current_unix_secs(); + cache.last_update_secs.store(now.saturating_sub(1), AtomicOrdering::SeqCst); + + let stale = tokio::time::timeout(Duration::from_millis(200), Arc::clone(&cache).get()) + .await + .expect("no_wait path should return without waiting for refresh") + .expect("stale get should succeed"); + assert_eq!(stale, 0); + + tokio::time::timeout(Duration::from_millis(200), bg_started_rx) + .await + .expect("background refresh should start") + .expect("background start signal should be delivered"); + + release_bg_tx.send(()).expect("release signal should be delivered"); + + tokio::time::timeout(Duration::from_secs(1), async { + loop { + if cache.cached_value().as_deref() == Some(&1) { + break; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .expect("background refresh should complete"); + } + + #[tokio::test] + async fn test_cache_return_last_good_on_refresh_error() { + let calls = Arc::new(AtomicUsize::new(0)); + let cache = Arc::new(Cache::new( + Box::new({ + let calls = Arc::clone(&calls); + move || { + let calls = Arc::clone(&calls); + Box::pin(async move { + let call = calls.fetch_add(1, Ordering::SeqCst); + if call == 0 { + Ok(42usize) + } else { + Err(std::io::Error::other("refresh failed")) + } + }) + } + }), + Duration::from_secs(1), + Opts { + return_last_good: true, + no_wait: false, + }, + )); + + let prime = Arc::clone(&cache).get().await.expect("prime cache should succeed"); + assert_eq!(prime, 42); + + let now = Cache::::current_unix_secs(); + cache.last_update_secs.store(now.saturating_sub(2), AtomicOrdering::SeqCst); + + let stale = Arc::clone(&cache) + .get() + .await + .expect("return_last_good should keep stale value"); + assert_eq!(stale, 42); + assert_eq!(calls.load(Ordering::SeqCst), 2); + } + + #[tokio::test] + async fn test_cache_refresh_error_without_return_last_good() { + let calls = Arc::new(AtomicUsize::new(0)); + let cache = Arc::new(Cache::new( + Box::new({ + let calls = Arc::clone(&calls); + move || { + let calls = Arc::clone(&calls); + Box::pin(async move { + let call = calls.fetch_add(1, Ordering::SeqCst); + if call == 0 { + Ok(7usize) + } else { + Err(std::io::Error::other("refresh failed")) + } + }) + } + }), + Duration::from_secs(1), + Opts { + return_last_good: false, + no_wait: false, + }, + )); + + let prime = Arc::clone(&cache).get().await.expect("prime cache should succeed"); + assert_eq!(prime, 7); + + let now = Cache::::current_unix_secs(); + cache.last_update_secs.store(now.saturating_sub(2), AtomicOrdering::SeqCst); + + let err = Arc::clone(&cache) + .get() + .await + .expect_err("refresh error should be propagated when return_last_good is false"); + assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert_eq!(calls.load(Ordering::SeqCst), 2); + } }