From 60d4598562ca5d154e7c1a4c7cba8042016f9c93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E6=AD=A3=E8=B6=85?= Date: Tue, 5 May 2026 21:41:15 +0800 Subject: [PATCH] test(lock): cover shared waiter abort cleanup (#2811) --- crates/lock/src/fast_lock/shard.rs | 65 ++++++++++++++++++++++++++++++ crates/lock/src/fast_lock/state.rs | 6 +++ 2 files changed, 71 insertions(+) diff --git a/crates/lock/src/fast_lock/shard.rs b/crates/lock/src/fast_lock/shard.rs index 460d6640e..5ce6c4bf2 100644 --- a/crates/lock/src/fast_lock/shard.rs +++ b/crates/lock/src/fast_lock/shard.rs @@ -882,4 +882,69 @@ mod tests { ); assert!(shard.release_lock(&key, &reader_owner, LockMode::Shared)); } + + #[tokio::test] + async fn test_shared_waiter_abort_does_not_block_following_exclusive_lock() { + let shard = Arc::new(LockShard::new(0)); + let key = ObjectKey::new("bucket", "abort-reader-waiter-key"); + + let writer_owner: Arc = Arc::from("writer-owner"); + let reader_owner: Arc = Arc::from("reader-owner"); + let followup_owner: Arc = Arc::from("followup-writer-owner"); + + let hold_writer = ObjectLockRequest { + key: key.clone(), + mode: LockMode::Exclusive, + owner: writer_owner.clone(), + acquire_timeout: Duration::from_secs(1), + lock_timeout: Duration::from_secs(30), + priority: LockPriority::Normal, + }; + + assert!(shard.acquire_lock(&hold_writer).await.is_ok()); + + let contended_reader = ObjectLockRequest { + key: key.clone(), + mode: LockMode::Shared, + owner: reader_owner.clone(), + acquire_timeout: Duration::from_secs(5), + lock_timeout: Duration::from_secs(30), + priority: LockPriority::Normal, + }; + + let shard_for_waiter = shard.clone(); + let waiter_handle = tokio::spawn(async move { shard_for_waiter.acquire_lock(&contended_reader).await }); + + tokio::time::timeout(Duration::from_secs(3), async { + loop { + if let Some(state) = shard.objects.read().get(&key).cloned() + && state.atomic_state.readers_waiting_count() > 0 + { + break; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .expect("timed out waiting for contended reader to register as waiting"); + waiter_handle.abort(); + let _ = waiter_handle.await; + + assert!(shard.release_lock(&key, &writer_owner, LockMode::Exclusive)); + + let followup_writer = ObjectLockRequest { + key: key.clone(), + mode: LockMode::Exclusive, + owner: followup_owner.clone(), + acquire_timeout: Duration::from_millis(200), + lock_timeout: Duration::from_secs(30), + priority: LockPriority::Normal, + }; + + assert!( + shard.acquire_lock(&followup_writer).await.is_ok(), + "exclusive lock should succeed after reader waiter task is aborted" + ); + assert!(shard.release_lock(&key, &followup_owner, LockMode::Exclusive)); + } } diff --git a/crates/lock/src/fast_lock/state.rs b/crates/lock/src/fast_lock/state.rs index 847b02c11..c7cb43115 100644 --- a/crates/lock/src/fast_lock/state.rs +++ b/crates/lock/src/fast_lock/state.rs @@ -289,6 +289,12 @@ impl AtomicLockState { ((state & WRITERS_WAITING_MASK) >> WRITERS_WAITING_SHIFT) as u16 } + #[cfg(test)] + pub fn readers_waiting_count(&self) -> u16 { + let state = self.state.load(Ordering::Acquire); + self.readers_waiting(state) + } + #[cfg(test)] pub fn writers_waiting_count(&self) -> u16 { let state = self.state.load(Ordering::Acquire);