From a0f1bb4ff0a3d15876347532bd0cbba17c765aaa Mon Sep 17 00:00:00 2001 From: weisd Date: Wed, 22 Apr 2026 10:12:33 +0800 Subject: [PATCH] fix(lock): prevent stale distributed object locks (#2633) --- .../e2e_test/src/reliant/grpc_lock_client.rs | 8 +- .../e2e_test/src/reliant/grpc_lock_server.rs | 47 ++-- crates/e2e_test/src/reliant/lock.rs | 35 +++ crates/ecstore/src/rpc/remote_locker.rs | 8 +- crates/lock/src/distributed_lock.rs | 156 +++++------ crates/lock/src/namespace/tests.rs | 251 +++++++++++++++++- rustfs/src/storage/rpc/lock.rs | 84 ++++-- 7 files changed, 456 insertions(+), 133 deletions(-) diff --git a/crates/e2e_test/src/reliant/grpc_lock_client.rs b/crates/e2e_test/src/reliant/grpc_lock_client.rs index 1045dbf5e..a0c621660 100644 --- a/crates/e2e_test/src/reliant/grpc_lock_client.rs +++ b/crates/e2e_test/src/reliant/grpc_lock_client.rs @@ -120,11 +120,6 @@ impl LockClient for GrpcLockClient { .map_err(|e| LockError::internal(e.to_string()))? .into_inner(); - // Check for explicit error first - if let Some(error_info) = resp.error_info { - return Err(LockError::internal(error_info)); - } - // Check if the lock acquisition was successful if resp.success { Ok(LockResponse::success( @@ -134,7 +129,8 @@ impl LockClient for GrpcLockClient { } else { // Lock acquisition failed Ok(LockResponse::failure( - "Lock acquisition failed on remote server".to_string(), + resp.error_info + .unwrap_or_else(|| "Lock acquisition failed on remote server".to_string()), std::time::Duration::ZERO, )) } diff --git a/crates/e2e_test/src/reliant/grpc_lock_server.rs b/crates/e2e_test/src/reliant/grpc_lock_server.rs index 628152e5b..085d51fe0 100644 --- a/crates/e2e_test/src/reliant/grpc_lock_server.rs +++ b/crates/e2e_test/src/reliant/grpc_lock_server.rs @@ -50,6 +50,18 @@ fn lock_result_from_error(error: impl Into) -> GenerallyLockResult { } } +fn lock_result_from_release(lock_id: &rustfs_lock::LockId, success: bool) -> GenerallyLockResult { + if success { + GenerallyLockResult { + success: true, + error_info: None, + lock_info: None, + } + } else { + lock_result_from_error(format!("lock not found for release: {lock_id}")) + } +} + /// Minimal NodeService implementation that only supports Lock RPCs /// Used for testing distributed lock scenarios with real gRPC #[derive(Debug)] @@ -102,7 +114,7 @@ impl NodeService for MinimalLockNodeService { let lock_info_json = result.lock_info.as_ref().and_then(|info| serde_json::to_string(info).ok()); Ok(Response::new(GenerallyLockResponse { success: result.success, - error_info: None, + error_info: result.error, lock_info: lock_info_json, })) } @@ -131,11 +143,14 @@ impl NodeService for MinimalLockNodeService { }; match self.lock_client.release(&args.lock_id).await { - Ok(success) => Ok(Response::new(GenerallyLockResponse { - success, - error_info: None, - lock_info: None, - })), + Ok(success) => { + let result = lock_result_from_release(&args.lock_id, success); + Ok(Response::new(GenerallyLockResponse { + success: result.success, + error_info: result.error_info, + lock_info: None, + })) + } Err(err) => Ok(Response::new(GenerallyLockResponse { success: false, error_info: Some(format!( @@ -161,11 +176,14 @@ impl NodeService for MinimalLockNodeService { }; match self.lock_client.force_release(&args.lock_id).await { - Ok(success) => Ok(Response::new(GenerallyLockResponse { - success, - error_info: None, - lock_info: None, - })), + Ok(success) => { + let result = lock_result_from_release(&args.lock_id, success); + Ok(Response::new(GenerallyLockResponse { + success: result.success, + error_info: result.error_info, + lock_info: None, + })) + } Err(err) => Ok(Response::new(GenerallyLockResponse { success: false, error_info: Some(format!( @@ -271,10 +289,9 @@ impl NodeService for MinimalLockNodeService { Ok(batch_results) => { for (result_idx, success) in batch_results.into_iter().enumerate() { if let Some(request_idx) = valid_indices.get(result_idx) { - results[*request_idx] = GenerallyLockResult { - success, - error_info: None, - lock_info: None, + results[*request_idx] = match lock_ids.get(result_idx) { + Some(lock_id) => lock_result_from_release(lock_id, success), + None => lock_result_from_error(format!("unlock response index out of range: {result_idx}")), }; } } diff --git a/crates/e2e_test/src/reliant/lock.rs b/crates/e2e_test/src/reliant/lock.rs index d7dfc536c..d73dae567 100644 --- a/crates/e2e_test/src/reliant/lock.rs +++ b/crates/e2e_test/src/reliant/lock.rs @@ -275,6 +275,41 @@ async fn test_grpc_lock_client_batch_acquire_and_release() { handle.abort(); } +#[tokio::test] +async fn test_grpc_lock_client_uses_request_lock_id_and_reports_missing_unlock() { + let manager = Arc::new(GlobalLockManager::new()); + let local_client: Arc = Arc::new(LocalClient::with_manager(manager)); + + let (addr, handle) = spawn_lock_server(local_client).await.expect("Failed to spawn server"); + tokio::time::sleep(Duration::from_millis(100)).await; + + let grpc_client = GrpcLockClient::new(addr); + let request = LockRequest::new(test_resource(), LockType::Exclusive, "owner-a").with_acquire_timeout(Duration::from_secs(2)); + + let response = grpc_client.acquire_lock(&request).await.expect("gRPC acquire should succeed"); + let lock_info = response.lock_info.expect("gRPC acquire should include lock info"); + assert_eq!(lock_info.id, request.lock_id); + + assert!( + grpc_client + .release(&request.lock_id) + .await + .expect("gRPC release should succeed"), + "release should find the request lock id" + ); + + let missing_release = grpc_client + .release(&request.lock_id) + .await + .expect_err("second release should report missing lock"); + assert!( + missing_release.to_string().contains("lock not found for release"), + "missing release should preserve server error, got: {missing_release}" + ); + + handle.abort(); +} + #[tokio::test] async fn test_distributed_lock_4_nodes_grpc_read_write_quorum_split_with_two_failed_nodes() { let manager1 = Arc::new(GlobalLockManager::new()); diff --git a/crates/ecstore/src/rpc/remote_locker.rs b/crates/ecstore/src/rpc/remote_locker.rs index 9d020a655..b39c1699a 100644 --- a/crates/ecstore/src/rpc/remote_locker.rs +++ b/crates/ecstore/src/rpc/remote_locker.rs @@ -190,11 +190,6 @@ impl LockClient for RemoteClient { Err(err) => return Ok(Self::rpc_failure_response(request, &err)), }; - // Check for explicit error first - if let Some(error_info) = resp.error_info { - return Err(LockError::internal(error_info)); - } - // Check if the lock acquisition was successful if resp.success { Ok(LockResponse::success( @@ -204,7 +199,8 @@ impl LockClient for RemoteClient { } else { // Lock acquisition failed Ok(LockResponse::failure( - "Lock acquisition failed on remote server".to_string(), + resp.error_info + .unwrap_or_else(|| "Lock acquisition failed on remote server".to_string()), std::time::Duration::ZERO, )) } diff --git a/crates/lock/src/distributed_lock.rs b/crates/lock/src/distributed_lock.rs index 3849106bc..431422c1d 100644 --- a/crates/lock/src/distributed_lock.rs +++ b/crates/lock/src/distributed_lock.rs @@ -22,13 +22,15 @@ use futures::future::join_all; use rustfs_io_metrics::{ record_read_lock_held_acquire, record_read_lock_held_release, record_write_lock_held_acquire, record_write_lock_held_release, }; -use std::sync::{Arc, LazyLock}; +use std::sync::Arc; use std::time::Duration; -use tokio::sync::mpsc; use tokio::task::JoinSet; -use tracing::warn; +use tracing::{debug, warn}; use uuid::Uuid; +const UNLOCK_RETRY_ATTEMPTS: usize = 3; +const UNLOCK_RETRY_BACKOFF: Duration = Duration::from_millis(100); + /// Generate a new aggregate lock ID for multiple client locks fn generate_aggregate_lock_id(resource: &ObjectKey) -> LockId { LockId { @@ -37,45 +39,6 @@ fn generate_aggregate_lock_id(resource: &ObjectKey) -> LockId { } } -#[derive(Debug, Clone)] -struct UnlockJob { - /// Entries to release: each (LockId, client) pair will be released independently. - entries: Vec<(LockId, Arc)>, -} - -#[derive(Debug)] -struct UnlockRuntime { - tx: mpsc::Sender, -} - -// Global unlock runtime with background worker -static UNLOCK_RUNTIME: LazyLock = LazyLock::new(|| { - // Larger buffer to reduce contention during bursts - let (tx, mut rx) = mpsc::channel::(8192); - - // Spawn background worker when first used; assumes a Tokio runtime is available - tokio::spawn(async move { - while let Some(job) = rx.recv().await { - // Best-effort release across all (LockId, client) entries. - let results = join_all( - job.entries - .into_iter() - .map(|(lock_id, client)| async move { client.release(&lock_id).await.unwrap_or(false) }), - ) - .await; - let any_ok = results.into_iter().any(|released| released); - - if !any_ok { - tracing::warn!("DistributedLockGuard background release failed for one or more entries"); - } else { - tracing::debug!("DistributedLockGuard background released one or more entries"); - } - } - }); - - UnlockRuntime { tx } -}); - /// A RAII guard for distributed locks that releases the lock asynchronously when dropped. #[derive(Debug)] pub struct DistributedLockGuard { @@ -126,47 +89,22 @@ impl DistributedLockGuard { } /// Manually release the lock early. - /// This sends a release job to the background worker and then disarms the guard + /// This spawns a background release task and then disarms the guard /// to prevent double-release on drop. - /// Returns true if the lock was released (or was already released), false otherwise. + /// Returns true if release was scheduled or the guard was already disarmed. pub fn release(&mut self) -> bool { if self.disarmed { // Lock was already released, return true to indicate lock is in released state return true; } - let job = UnlockJob { - entries: self.entries.clone(), - }; - - // Try a non-blocking send to avoid panics - let success = if let Err(err) = UNLOCK_RUNTIME.tx.try_send(job) { - // Channel full or closed; best-effort fallback: spawn a detached task - let entries = self.entries.clone(); - tracing::warn!( - "DistributedLockGuard channel send failed ({}), spawning fallback unlock task for {} entries", - err, - entries.len() - ); - - // If runtime is not available, this will panic; but in RustFS we are inside Tokio contexts. - let handle = tokio::spawn(async move { - let futures_iter = entries - .into_iter() - .map(|(lock_id, client)| async move { client.release(&lock_id).await.unwrap_or(false) }); - let _ = join_all(futures_iter).await; - }); - // Explicitly drop the JoinHandle to acknowledge detaching the task. - drop(handle); - true // Consider it successful even if we had to use fallback - } else { - true - }; + let entries = self.entries.clone(); + DistributedLock::spawn_release_cleanup(entries, "distributed_lock_guard_release"); // Disarm to prevent double-release on drop self.disarmed = true; record_lock_held_release(self.lock_type); - success + true } } @@ -349,22 +287,64 @@ impl DistributedLock { pending } - async fn release_entries(entries: &[(LockId, Arc)], context: &'static str) { - let release_results = join_all( - entries - .iter() - .map(|(lock_id, client)| async move { (lock_id, client.release(lock_id).await) }), - ) - .await; + async fn release_entries(entries: Vec<(LockId, Arc)>, context: &'static str) { + let mut pending = entries; - for (lock_id, result) in release_results { - match result { - Ok(true) | Ok(false) => {} - Err(err) => { - tracing::warn!("{context}: failed to release lock {} on client: {}", lock_id, err); + for attempt in 1..=UNLOCK_RETRY_ATTEMPTS { + let release_results = join_all(pending.into_iter().map(|(lock_id, client)| async move { + match client.release(&lock_id).await { + Ok(true) => None, + Ok(false) => { + warn!(%lock_id, attempt, context, "distributed unlock did not find lock on client"); + Some((lock_id, client)) + } + Err(err) => { + warn!(%lock_id, attempt, context, "distributed unlock failed on client: {}", err); + Some((lock_id, client)) + } } + })) + .await; + + pending = release_results.into_iter().flatten().collect(); + if pending.is_empty() { + debug!(attempt, context, "distributed unlock completed"); + return; + } + + if attempt < UNLOCK_RETRY_ATTEMPTS { + tokio::time::sleep(UNLOCK_RETRY_BACKOFF * attempt as u32).await; } } + + warn!( + remaining = pending.len(), + attempts = UNLOCK_RETRY_ATTEMPTS, + context, + "distributed unlock left unreleased entries after retry" + ); + } + + fn spawn_release_cleanup(entries: Vec<(LockId, Arc)>, context: &'static str) { + if entries.is_empty() { + return; + } + + if let Ok(handle) = tokio::runtime::Handle::try_current() { + let join_handle = handle.spawn(async move { + Self::release_entries(entries, context).await; + }); + drop(join_handle); + return; + } + + let join_handle = std::thread::spawn(move || match tokio::runtime::Builder::new_current_thread().enable_all().build() { + Ok(runtime) => runtime.block_on(async move { + Self::release_entries(entries, context).await; + }), + Err(err) => warn!(context, "failed to create fallback unlock runtime: {}", err), + }); + drop(join_handle); } fn spawn_pending_cleanup( @@ -387,9 +367,7 @@ impl DistributedLock { continue; }; - if let Err(err) = client.release(&lock_id).await { - tracing::warn!("{context}: failed to cleanup late lock {} on client {}: {}", lock_id, idx, err); - } + Self::release_entries(vec![(lock_id, client.clone())], context).await; } Ok((idx, Ok(resp))) => { tracing::debug!( @@ -506,7 +484,7 @@ impl DistributedLock { if individual_locks.len() + pending.len() < required_quorum { let rollback_count = individual_locks.len(); - Self::release_entries(&individual_locks, "distributed_lock_quorum_rollback").await; + Self::spawn_release_cleanup(individual_locks.clone(), "distributed_lock_quorum_rollback"); if !pending.is_empty() { Self::spawn_pending_cleanup( pending, @@ -525,7 +503,7 @@ impl DistributedLock { } let rollback_count = individual_locks.len(); - Self::release_entries(&individual_locks, "distributed_lock_quorum_rollback").await; + Self::spawn_release_cleanup(individual_locks.clone(), "distributed_lock_quorum_rollback"); let resp = LockResponse::failure( format!("Failed to acquire quorum: {rollback_count}/{required_quorum} required"), Duration::ZERO, diff --git a/crates/lock/src/namespace/tests.rs b/crates/lock/src/namespace/tests.rs index 5ce5e91f8..b95fbfed5 100644 --- a/crates/lock/src/namespace/tests.rs +++ b/crates/lock/src/namespace/tests.rs @@ -16,7 +16,10 @@ use super::*; use crate::client::{ClientFactory, local::LocalClient}; use crate::types::LockType; use crate::{GlobalLockManager, LockError, LockInfo, LockResponse, LockStats}; -use std::sync::Arc; +use std::sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, +}; use std::time::Duration; #[derive(Debug, Default)] @@ -107,6 +110,75 @@ impl crate::client::LockClient for DelayedClient { } } +#[derive(Debug)] +struct FlakyReleaseClient { + inner: LocalClient, + failed_releases_remaining: AtomicUsize, + release_attempts: AtomicUsize, +} + +impl FlakyReleaseClient { + fn new(manager: Arc, failed_releases: usize) -> Self { + Self { + inner: LocalClient::with_manager(manager), + failed_releases_remaining: AtomicUsize::new(failed_releases), + release_attempts: AtomicUsize::new(0), + } + } + + fn release_attempts(&self) -> usize { + self.release_attempts.load(Ordering::SeqCst) + } +} + +#[async_trait::async_trait] +impl crate::client::LockClient for FlakyReleaseClient { + async fn acquire_lock(&self, request: &LockRequest) -> crate::Result { + self.inner.acquire_lock(request).await + } + + async fn release(&self, lock_id: &LockId) -> crate::Result { + self.release_attempts.fetch_add(1, Ordering::SeqCst); + if self + .failed_releases_remaining + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |remaining| remaining.checked_sub(1)) + .is_ok() + { + return Ok(false); + } + + self.inner.release(lock_id).await + } + + async fn refresh(&self, lock_id: &LockId) -> crate::Result { + self.inner.refresh(lock_id).await + } + + async fn force_release(&self, lock_id: &LockId) -> crate::Result { + self.inner.force_release(lock_id).await + } + + async fn check_status(&self, lock_id: &LockId) -> crate::Result> { + self.inner.check_status(lock_id).await + } + + async fn get_stats(&self) -> crate::Result { + self.inner.get_stats().await + } + + async fn close(&self) -> crate::Result<()> { + self.inner.close().await + } + + async fn is_online(&self) -> bool { + true + } + + async fn is_local(&self) -> bool { + true + } +} + fn create_test_object_key(bucket: &str, object: &str) -> ObjectKey { ObjectKey { bucket: Arc::from(bucket), @@ -115,6 +187,41 @@ fn create_test_object_key(bucket: &str, object: &str) -> ObjectKey { } } +async fn wait_until_all_managers_can_write(managers: &[Arc], resource: ObjectKey) { + let deadline = tokio::time::Instant::now() + Duration::from_secs(2); + + loop { + let mut guards = Vec::with_capacity(managers.len()); + let mut all_available = true; + + for (idx, manager) in managers.iter().enumerate() { + let local_lock = NamespaceLock::with_local_manager(format!("probe-node-{idx}"), manager.clone()); + match local_lock + .get_write_lock(resource.clone(), "probe-owner", Duration::from_millis(20)) + .await + { + Ok(guard) => guards.push(guard), + Err(_) => { + all_available = false; + break; + } + } + } + + drop(guards); + + if all_available { + return; + } + + assert!( + tokio::time::Instant::now() < deadline, + "distributed lock was not released on all simulated nodes" + ); + tokio::time::sleep(Duration::from_millis(20)).await; + } +} + #[tokio::test] async fn test_namespace_lock_new() { let client = ClientFactory::create_local(); @@ -174,6 +281,24 @@ async fn test_lock_client_default_batch_acquire_and_release() { assert_eq!(released, vec![true, true]); } +#[tokio::test] +async fn test_local_client_uses_request_lock_id_for_release() { + let manager = Arc::new(GlobalLockManager::new()); + let client = LocalClient::with_manager(manager); + let resource = create_test_object_key("bucket", "object"); + let request = LockRequest::new(resource.clone(), LockType::Exclusive, "owner-a").with_acquire_timeout(Duration::from_secs(1)); + + let response = client.acquire_lock(&request).await.unwrap(); + let lock_info = response.lock_info.expect("successful acquire should return lock info"); + assert_eq!(lock_info.id, request.lock_id); + + assert!(client.release(&request.lock_id).await.unwrap()); + + let second_request = LockRequest::new(resource, LockType::Exclusive, "owner-b").with_acquire_timeout(Duration::from_secs(1)); + let second_response = client.acquire_lock(&second_request).await.unwrap(); + assert!(second_response.success); +} + #[tokio::test] async fn test_namespace_lock_get_resource_key() { let client = ClientFactory::create_local(); @@ -488,6 +613,97 @@ async fn test_namespace_lock_distributed_with_clients_and_quorum() { drop(guard_b); } +#[tokio::test] +async fn test_namespace_lock_distributed_eight_node_write_releases_all_nodes() { + let managers = (0..8).map(|_| Arc::new(GlobalLockManager::new())).collect::>(); + let clients = managers + .iter() + .map(|manager| Arc::new(LocalClient::with_manager(manager.clone())) as Arc) + .collect::>(); + + let lock = NamespaceLock::with_clients_and_quorum("eight-node".to_string(), clients, 5); + let resource = create_test_object_key("bucket", "object-eight-node"); + + let mut guard = lock + .get_write_lock(resource.clone(), "owner-a", Duration::from_secs(1)) + .await + .expect("owner-a should acquire write lock across eight simulated nodes"); + + let err = lock + .get_write_lock(resource.clone(), "owner-b", Duration::from_millis(100)) + .await + .expect_err("owner-b should not acquire while owner-a holds all node locks"); + let err_str = err.to_string(); + assert!( + err_str.contains("required 5") && err_str.contains("achieved"), + "expected 8-node quorum failure below required write quorum, got: {err}" + ); + + assert!(guard.release(), "distributed guard should enqueue release"); + wait_until_all_managers_can_write(&managers, resource).await; +} + +#[tokio::test] +async fn test_namespace_lock_distributed_unlock_retries_release_false() { + let managers = (0..3).map(|_| Arc::new(GlobalLockManager::new())).collect::>(); + let flaky_clients = managers + .iter() + .map(|manager| Arc::new(FlakyReleaseClient::new(manager.clone(), 1))) + .collect::>(); + let clients = flaky_clients + .iter() + .map(|client| client.clone() as Arc) + .collect::>(); + + let lock = NamespaceLock::with_clients("flaky-release".to_string(), clients); + let resource = create_test_object_key("bucket", "object-flaky-release"); + + let mut guard = lock + .get_write_lock(resource.clone(), "owner-a", Duration::from_secs(1)) + .await + .expect("owner-a should acquire write lock before flaky release"); + + assert!(guard.release(), "distributed guard should enqueue release"); + wait_until_all_managers_can_write(&managers, resource).await; + + assert!( + flaky_clients.iter().all(|client| client.release_attempts() >= 2), + "each simulated node should be retried after an initial false release" + ); +} + +#[test] +fn test_namespace_lock_distributed_drop_without_runtime_does_not_panic() { + let (manager, resource, guard) = { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("test runtime should be created"); + runtime.block_on(async { + let manager = Arc::new(GlobalLockManager::new()); + let resource = create_test_object_key("bucket", "object-drop-no-runtime"); + let lock = NamespaceLock::with_clients( + "drop-no-runtime".to_string(), + vec![Arc::new(LocalClient::with_manager(manager.clone()))], + ); + let guard = lock + .get_write_lock(resource.clone(), "owner-a", Duration::from_secs(1)) + .await + .expect("lock should be acquired"); + (manager, resource, guard) + }) + }; + + let drop_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| drop(guard))); + assert!(drop_result.is_ok(), "dropping distributed guard without runtime should not panic"); + + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("test runtime should be created"); + runtime.block_on(wait_until_all_managers_can_write(&[manager], resource)); +} + #[tokio::test] async fn test_namespace_lock_distributed_read_lock_succeeds_with_two_nodes_one_offline() { let manager = Arc::new(GlobalLockManager::new()); @@ -568,6 +784,39 @@ async fn test_namespace_lock_distributed_quorum_failure_rolls_back_successful_no drop(guard2); } +#[tokio::test] +async fn test_namespace_lock_distributed_quorum_rollback_retries_release_false() { + let managers = (0..2).map(|_| Arc::new(GlobalLockManager::new())).collect::>(); + let flaky_clients = managers + .iter() + .map(|manager| Arc::new(FlakyReleaseClient::new(manager.clone(), 1))) + .collect::>(); + let clients = vec![ + flaky_clients[0].clone() as Arc, + flaky_clients[1].clone() as Arc, + Arc::new(FailingClient) as Arc, + ]; + let resource = create_test_object_key("bucket", "object-rollback-retry"); + let lock = NamespaceLock::with_clients_and_quorum("rollback-retry".to_string(), clients, 3); + + let err = lock + .get_write_lock(resource.clone(), "owner-a", Duration::from_millis(100)) + .await + .expect_err("write lock should fail when quorum requires the offline node"); + + let err_str = err.to_string().to_lowercase(); + assert!( + err_str.contains("quorum") || err_str.contains("not reached"), + "expected quorum error, got: {err}" + ); + wait_until_all_managers_can_write(&managers, resource).await; + + assert!( + flaky_clients.iter().all(|client| client.release_attempts() >= 2), + "rollback should retry node releases that initially returned false" + ); +} + #[tokio::test] async fn test_namespace_lock_distributed_even_node_read_write_quorum_split() { let manager1 = Arc::new(GlobalLockManager::new()); diff --git a/rustfs/src/storage/rpc/lock.rs b/rustfs/src/storage/rpc/lock.rs index 01e547190..ac6986a61 100644 --- a/rustfs/src/storage/rpc/lock.rs +++ b/rustfs/src/storage/rpc/lock.rs @@ -30,6 +30,18 @@ fn lock_result_from_error(error: impl Into) -> GenerallyLockResult { } } +fn lock_result_from_release(lock_id: &rustfs_lock::LockId, success: bool) -> GenerallyLockResult { + if success { + GenerallyLockResult { + success: true, + error_info: None, + lock_info: None, + } + } else { + lock_result_from_error(format!("lock not found for release: {lock_id}")) + } +} + impl NodeService { pub(super) async fn handle_refresh( &self, @@ -71,12 +83,15 @@ impl NodeService { }; let lock_client = self.get_lock_client()?; - match lock_client.release(&args.lock_id).await { - Ok(_) => Ok(Response::new(GenerallyLockResponse { - success: true, - error_info: None, - lock_info: None, - })), + match lock_client.force_release(&args.lock_id).await { + Ok(success) => { + let result = lock_result_from_release(&args.lock_id, success); + Ok(Response::new(GenerallyLockResponse { + success: result.success, + error_info: result.error_info, + lock_info: None, + })) + } Err(err) => Ok(Response::new(GenerallyLockResponse { success: false, error_info: Some(format!( @@ -106,11 +121,14 @@ impl NodeService { let lock_client = self.get_lock_client()?; match lock_client.release(&args.lock_id).await { - Ok(_) => Ok(Response::new(GenerallyLockResponse { - success: true, - error_info: None, - lock_info: None, - })), + Ok(success) => { + let result = lock_result_from_release(&args.lock_id, success); + Ok(Response::new(GenerallyLockResponse { + success: result.success, + error_info: result.error_info, + lock_info: None, + })) + } Err(err) => Ok(Response::new(GenerallyLockResponse { success: false, error_info: Some(format!( @@ -146,7 +164,7 @@ impl NodeService { let lock_info_json = result.lock_info.as_ref().and_then(|info| serde_json::to_string(info).ok()); Ok(Response::new(GenerallyLockResponse { success: result.success, - error_info: None, + error_info: result.error, lock_info: lock_info_json, })) } @@ -230,10 +248,9 @@ impl NodeService { Ok(batch_results) => { for (result_idx, success) in batch_results.into_iter().enumerate() { if let Some(request_idx) = valid_indices.get(result_idx) { - results[*request_idx] = GenerallyLockResult { - success, - error_info: None, - lock_info: None, + results[*request_idx] = match lock_ids.get(result_idx) { + Some(lock_id) => lock_result_from_release(lock_id, success), + None => lock_result_from_error(format!("unlock response index out of range: {result_idx}")), }; } } @@ -249,3 +266,38 @@ impl NodeService { Ok(Response::new(BatchGenerallyLockResponse { results })) } } + +#[cfg(test)] +mod tests { + use super::*; + + fn test_lock_id() -> rustfs_lock::LockId { + rustfs_lock::LockRequest::new(rustfs_lock::ObjectKey::new("bucket", "object"), rustfs_lock::LockType::Exclusive, "owner") + .lock_id + } + + #[test] + fn lock_result_from_release_reports_missing_lock() { + let lock_id = test_lock_id(); + let result = lock_result_from_release(&lock_id, false); + + assert!(!result.success); + assert!(result.lock_info.is_none()); + assert!( + result + .error_info + .expect("missing release should include error") + .contains("lock not found for release") + ); + } + + #[test] + fn lock_result_from_response_preserves_lock_failure_error() { + let response = rustfs_lock::LockResponse::failure("lock conflict", std::time::Duration::ZERO); + let result = lock_result_from_response(response); + + assert!(!result.success); + assert_eq!(result.error_info.as_deref(), Some("lock conflict")); + assert!(result.lock_info.is_none()); + } +}