mirror of
https://github.com/rustfs/rustfs.git
synced 2026-05-06 14:12:29 +08:00
test(lock): cover shared waiter abort cleanup (#2811)
This commit is contained in:
@@ -882,4 +882,69 @@ mod tests {
|
||||
);
|
||||
assert!(shard.release_lock(&key, &reader_owner, LockMode::Shared));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_shared_waiter_abort_does_not_block_following_exclusive_lock() {
|
||||
let shard = Arc::new(LockShard::new(0));
|
||||
let key = ObjectKey::new("bucket", "abort-reader-waiter-key");
|
||||
|
||||
let writer_owner: Arc<str> = Arc::from("writer-owner");
|
||||
let reader_owner: Arc<str> = Arc::from("reader-owner");
|
||||
let followup_owner: Arc<str> = Arc::from("followup-writer-owner");
|
||||
|
||||
let hold_writer = ObjectLockRequest {
|
||||
key: key.clone(),
|
||||
mode: LockMode::Exclusive,
|
||||
owner: writer_owner.clone(),
|
||||
acquire_timeout: Duration::from_secs(1),
|
||||
lock_timeout: Duration::from_secs(30),
|
||||
priority: LockPriority::Normal,
|
||||
};
|
||||
|
||||
assert!(shard.acquire_lock(&hold_writer).await.is_ok());
|
||||
|
||||
let contended_reader = ObjectLockRequest {
|
||||
key: key.clone(),
|
||||
mode: LockMode::Shared,
|
||||
owner: reader_owner.clone(),
|
||||
acquire_timeout: Duration::from_secs(5),
|
||||
lock_timeout: Duration::from_secs(30),
|
||||
priority: LockPriority::Normal,
|
||||
};
|
||||
|
||||
let shard_for_waiter = shard.clone();
|
||||
let waiter_handle = tokio::spawn(async move { shard_for_waiter.acquire_lock(&contended_reader).await });
|
||||
|
||||
tokio::time::timeout(Duration::from_secs(3), async {
|
||||
loop {
|
||||
if let Some(state) = shard.objects.read().get(&key).cloned()
|
||||
&& state.atomic_state.readers_waiting_count() > 0
|
||||
{
|
||||
break;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("timed out waiting for contended reader to register as waiting");
|
||||
waiter_handle.abort();
|
||||
let _ = waiter_handle.await;
|
||||
|
||||
assert!(shard.release_lock(&key, &writer_owner, LockMode::Exclusive));
|
||||
|
||||
let followup_writer = ObjectLockRequest {
|
||||
key: key.clone(),
|
||||
mode: LockMode::Exclusive,
|
||||
owner: followup_owner.clone(),
|
||||
acquire_timeout: Duration::from_millis(200),
|
||||
lock_timeout: Duration::from_secs(30),
|
||||
priority: LockPriority::Normal,
|
||||
};
|
||||
|
||||
assert!(
|
||||
shard.acquire_lock(&followup_writer).await.is_ok(),
|
||||
"exclusive lock should succeed after reader waiter task is aborted"
|
||||
);
|
||||
assert!(shard.release_lock(&key, &followup_owner, LockMode::Exclusive));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -289,6 +289,12 @@ impl AtomicLockState {
|
||||
((state & WRITERS_WAITING_MASK) >> WRITERS_WAITING_SHIFT) as u16
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn readers_waiting_count(&self) -> u16 {
|
||||
let state = self.state.load(Ordering::Acquire);
|
||||
self.readers_waiting(state)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn writers_waiting_count(&self) -> u16 {
|
||||
let state = self.state.load(Ordering::Acquire);
|
||||
|
||||
Reference in New Issue
Block a user