fix(filemeta): harden and optimize metacache path (#2724)

Co-authored-by: houseme <housemecn@gmail.com>
This commit is contained in:
GatewayJ
2026-04-30 15:47:31 +08:00
committed by GitHub
parent 403997f2f8
commit c29c8a5a1e
3 changed files with 248 additions and 58 deletions

1
Cargo.lock generated
View File

@@ -8146,6 +8146,7 @@ dependencies = [
name = "rustfs-filemeta"
version = "1.0.0-beta.1"
dependencies = [
"arc-swap",
"byteorder",
"bytes",
"crc-fast",

View File

@@ -41,6 +41,7 @@ tracing.workspace = true
thiserror.workspace = true
s3s.workspace = true
regex.workspace = true
arc-swap.workspace = true
[dev-dependencies]
criterion = { workspace = true }

View File

@@ -16,6 +16,7 @@ use crate::{
Error, FileInfo, FileInfoOpts, FileInfoVersions, FileMeta, FileMetaShallowVersion, Result, VersionType, get_file_info,
merge_file_meta_versions,
};
use arc_swap::ArcSwapOption;
use rmp::Marker;
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
@@ -24,10 +25,9 @@ use std::{
fmt::Debug,
future::Future,
pin::Pin,
ptr,
sync::{
Arc,
atomic::{AtomicPtr, AtomicU64, Ordering as AtomicOrdering},
atomic::{AtomicU64, Ordering as AtomicOrdering},
},
time::{Duration, SystemTime, UNIX_EPOCH},
};
@@ -767,100 +767,74 @@ pub struct Cache<T: Clone + Debug + Send> {
update_fn: UpdateFn<T>,
ttl: Duration,
opts: Opts,
val: AtomicPtr<T>,
last_update_ms: AtomicU64,
updating: Arc<Mutex<bool>>,
val: ArcSwapOption<T>,
last_update_secs: AtomicU64,
updating: Arc<Mutex<()>>,
}
impl<T: Clone + Debug + Send + 'static> Cache<T> {
impl<T: Clone + Debug + Send + Sync + 'static> Cache<T> {
pub fn new(update_fn: UpdateFn<T>, ttl: Duration, opts: Opts) -> Self {
let val = AtomicPtr::new(ptr::null_mut());
Self {
update_fn,
ttl,
opts,
val,
last_update_ms: AtomicU64::new(0),
updating: Arc::new(Mutex::new(false)),
val: ArcSwapOption::from(None),
last_update_secs: AtomicU64::new(0),
updating: Arc::new(Mutex::new(())),
}
}
#[allow(unsafe_code)]
pub async fn get(self: Arc<Self>) -> std::io::Result<T> {
let v_ptr = self.val.load(AtomicOrdering::SeqCst);
let v = if v_ptr.is_null() {
None
} else {
Some(unsafe { (*v_ptr).clone() })
};
let value = self.get_shared().await?;
Ok(value.as_ref().clone())
}
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();
if now - self.last_update_ms.load(AtomicOrdering::SeqCst) < self.ttl.as_secs()
&& let Some(v) = v
pub async fn get_shared(self: Arc<Self>) -> std::io::Result<Arc<T>> {
let now = Self::current_unix_secs();
let current = self.cached_value();
if self.age_since_last_update(now) < self.ttl.as_secs()
&& let Some(value) = current.clone()
{
return Ok(v);
return Ok(value);
}
if self.opts.no_wait
&& now - self.last_update_ms.load(AtomicOrdering::SeqCst) < self.ttl.as_secs() * 2
&& let Some(value) = v
&& self.age_since_last_update(now) < self.ttl.as_secs().saturating_mul(2)
&& let Some(value) = current
{
if self.updating.try_lock().is_ok() {
if let Ok(update_guard) = Arc::clone(&self.updating).try_lock_owned() {
let this = Arc::clone(&self);
spawn(async move {
let _guard = update_guard;
let _ = this.update().await;
});
}
return Ok(value);
}
let _ = self.updating.lock().await;
let _guard = self.updating.lock().await;
if let (Ok(duration), Some(value)) = (
SystemTime::now().duration_since(UNIX_EPOCH + Duration::from_secs(self.last_update_ms.load(AtomicOrdering::SeqCst))),
v,
) && duration < self.ttl
let now = Self::current_unix_secs();
if self.age_since_last_update(now) < self.ttl.as_secs()
&& let Some(value) = self.cached_value()
{
return Ok(value);
}
match self.update().await {
Ok(_) => {
let v_ptr = self.val.load(AtomicOrdering::SeqCst);
let v = if v_ptr.is_null() {
None
} else {
Some(unsafe { (*v_ptr).clone() })
};
Ok(v.unwrap())
}
Err(err) => Err(err),
}
self.update().await?;
self.cached_value()
.ok_or_else(|| std::io::Error::other("cache update completed without a value"))
}
#[allow(unsafe_code)]
async fn update(&self) -> std::io::Result<()> {
match (self.update_fn)().await {
Ok(val) => {
let old = self.val.swap(Box::into_raw(Box::new(val)), AtomicOrdering::SeqCst);
if !old.is_null() {
unsafe {
drop(Box::from_raw(old));
}
}
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();
self.last_update_ms.store(now, AtomicOrdering::SeqCst);
self.val.store(Some(Arc::new(val)));
self.last_update_secs.store(Self::current_unix_secs(), AtomicOrdering::SeqCst);
Ok(())
}
Err(err) => {
let v_ptr = self.val.load(AtomicOrdering::SeqCst);
if self.opts.return_last_good && !v_ptr.is_null() {
if self.opts.return_last_good && self.cached_value().is_some() {
return Ok(());
}
@@ -868,6 +842,23 @@ impl<T: Clone + Debug + Send + 'static> Cache<T> {
}
}
}
fn current_unix_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs()
}
fn age_since_last_update(&self, now_secs: u64) -> u64 {
now_secs
.checked_sub(self.last_update_secs.load(AtomicOrdering::SeqCst))
.unwrap_or(u64::MAX)
}
fn cached_value(&self) -> Option<Arc<T>> {
self.val.load_full()
}
}
#[cfg(test)]
@@ -877,6 +868,11 @@ mod tests {
use crate::{FileMetaVersion, MetaDeleteMarker};
use std::collections::HashMap;
use std::io::Cursor;
use std::sync::{
Arc, Mutex as StdMutex,
atomic::{AtomicUsize, Ordering},
};
use tokio::sync::oneshot;
use uuid::Uuid;
#[tokio::test]
@@ -964,4 +960,196 @@ mod tests {
assert_eq!(decoded.versions, cached.versions);
assert_ne!(extended_versions, cached.versions.len());
}
fn build_hashmap_cache(update_size: usize) -> Arc<Cache<HashMap<usize, usize>>> {
let generation = Arc::new(AtomicUsize::new(0));
Arc::new(Cache::new(
Box::new(move || {
let generation = Arc::clone(&generation);
Box::pin(async move {
let v = generation.fetch_add(1, Ordering::SeqCst);
let mut m = HashMap::with_capacity(update_size);
for i in 0..update_size {
m.insert(i, i ^ v);
}
Ok(m)
})
}),
Duration::ZERO,
Opts::default(),
))
}
async fn run_cache_workload(cache: Arc<Cache<HashMap<usize, usize>>>, workers: usize, rounds: usize, probe_mod: usize) {
let mut tasks = Vec::with_capacity(workers);
for worker in 0..workers {
let cache = Arc::clone(&cache);
tasks.push(tokio::spawn(async move {
for round in 0..rounds {
let m = Arc::clone(&cache).get().await.expect("cache get should succeed");
let key = (worker.wrapping_mul(17).wrapping_add(round)) % probe_mod;
assert!(m.contains_key(&key), "expected key {key} to exist");
}
}));
}
for task in tasks {
task.await.expect("worker task should not panic");
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn test_cache_concurrency_smoke() {
let cache = build_hashmap_cache(2048);
run_cache_workload(cache, 32, 120, 2048).await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_cache_no_wait_returns_stale_and_refreshes_in_background() {
let calls = Arc::new(AtomicUsize::new(0));
let (bg_started_tx, bg_started_rx) = oneshot::channel::<()>();
let (release_bg_tx, release_bg_rx) = oneshot::channel::<()>();
let bg_started_tx = Arc::new(StdMutex::new(Some(bg_started_tx)));
let release_bg_rx = Arc::new(StdMutex::new(Some(release_bg_rx)));
let cache = Arc::new(Cache::new(
Box::new({
let calls = Arc::clone(&calls);
let bg_started_tx = Arc::clone(&bg_started_tx);
let release_bg_rx = Arc::clone(&release_bg_rx);
move || {
let calls = Arc::clone(&calls);
let bg_started_tx = Arc::clone(&bg_started_tx);
let release_bg_rx = Arc::clone(&release_bg_rx);
Box::pin(async move {
let call = calls.fetch_add(1, Ordering::SeqCst);
if call == 1 {
let tx = { bg_started_tx.lock().expect("start sender lock should not poison").take() };
if let Some(tx) = tx {
let _ = tx.send(());
}
let rx = { release_bg_rx.lock().expect("release receiver lock should not poison").take() };
if let Some(rx) = rx {
let _ = rx.await;
}
}
Ok(call)
})
}
}),
Duration::from_secs(1),
Opts {
return_last_good: true,
no_wait: true,
},
));
let prime = Arc::clone(&cache).get().await.expect("prime cache should succeed");
assert_eq!(prime, 0);
let now = Cache::<usize>::current_unix_secs();
cache.last_update_secs.store(now.saturating_sub(1), AtomicOrdering::SeqCst);
let stale = tokio::time::timeout(Duration::from_millis(200), Arc::clone(&cache).get())
.await
.expect("no_wait path should return without waiting for refresh")
.expect("stale get should succeed");
assert_eq!(stale, 0);
tokio::time::timeout(Duration::from_millis(200), bg_started_rx)
.await
.expect("background refresh should start")
.expect("background start signal should be delivered");
release_bg_tx.send(()).expect("release signal should be delivered");
tokio::time::timeout(Duration::from_secs(1), async {
loop {
if cache.cached_value().as_deref() == Some(&1) {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("background refresh should complete");
}
#[tokio::test]
async fn test_cache_return_last_good_on_refresh_error() {
let calls = Arc::new(AtomicUsize::new(0));
let cache = Arc::new(Cache::new(
Box::new({
let calls = Arc::clone(&calls);
move || {
let calls = Arc::clone(&calls);
Box::pin(async move {
let call = calls.fetch_add(1, Ordering::SeqCst);
if call == 0 {
Ok(42usize)
} else {
Err(std::io::Error::other("refresh failed"))
}
})
}
}),
Duration::from_secs(1),
Opts {
return_last_good: true,
no_wait: false,
},
));
let prime = Arc::clone(&cache).get().await.expect("prime cache should succeed");
assert_eq!(prime, 42);
let now = Cache::<usize>::current_unix_secs();
cache.last_update_secs.store(now.saturating_sub(2), AtomicOrdering::SeqCst);
let stale = Arc::clone(&cache)
.get()
.await
.expect("return_last_good should keep stale value");
assert_eq!(stale, 42);
assert_eq!(calls.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn test_cache_refresh_error_without_return_last_good() {
let calls = Arc::new(AtomicUsize::new(0));
let cache = Arc::new(Cache::new(
Box::new({
let calls = Arc::clone(&calls);
move || {
let calls = Arc::clone(&calls);
Box::pin(async move {
let call = calls.fetch_add(1, Ordering::SeqCst);
if call == 0 {
Ok(7usize)
} else {
Err(std::io::Error::other("refresh failed"))
}
})
}
}),
Duration::from_secs(1),
Opts {
return_last_good: false,
no_wait: false,
},
));
let prime = Arc::clone(&cache).get().await.expect("prime cache should succeed");
assert_eq!(prime, 7);
let now = Cache::<usize>::current_unix_secs();
cache.last_update_secs.store(now.saturating_sub(2), AtomicOrdering::SeqCst);
let err = Arc::clone(&cache)
.get()
.await
.expect_err("refresh error should be propagated when return_last_good is false");
assert_eq!(err.kind(), std::io::ErrorKind::Other);
assert_eq!(calls.load(Ordering::SeqCst), 2);
}
}