mirror of
https://github.com/rustfs/rustfs.git
synced 2026-06-20 14:56:10 +08:00
fix(lock): prevent stale distributed object locks (#2633)
This commit is contained in:
@@ -120,11 +120,6 @@ impl LockClient for GrpcLockClient {
|
||||
.map_err(|e| LockError::internal(e.to_string()))?
|
||||
.into_inner();
|
||||
|
||||
// Check for explicit error first
|
||||
if let Some(error_info) = resp.error_info {
|
||||
return Err(LockError::internal(error_info));
|
||||
}
|
||||
|
||||
// Check if the lock acquisition was successful
|
||||
if resp.success {
|
||||
Ok(LockResponse::success(
|
||||
@@ -134,7 +129,8 @@ impl LockClient for GrpcLockClient {
|
||||
} else {
|
||||
// Lock acquisition failed
|
||||
Ok(LockResponse::failure(
|
||||
"Lock acquisition failed on remote server".to_string(),
|
||||
resp.error_info
|
||||
.unwrap_or_else(|| "Lock acquisition failed on remote server".to_string()),
|
||||
std::time::Duration::ZERO,
|
||||
))
|
||||
}
|
||||
|
||||
@@ -50,6 +50,18 @@ fn lock_result_from_error(error: impl Into<String>) -> GenerallyLockResult {
|
||||
}
|
||||
}
|
||||
|
||||
fn lock_result_from_release(lock_id: &rustfs_lock::LockId, success: bool) -> GenerallyLockResult {
|
||||
if success {
|
||||
GenerallyLockResult {
|
||||
success: true,
|
||||
error_info: None,
|
||||
lock_info: None,
|
||||
}
|
||||
} else {
|
||||
lock_result_from_error(format!("lock not found for release: {lock_id}"))
|
||||
}
|
||||
}
|
||||
|
||||
/// Minimal NodeService implementation that only supports Lock RPCs
|
||||
/// Used for testing distributed lock scenarios with real gRPC
|
||||
#[derive(Debug)]
|
||||
@@ -102,7 +114,7 @@ impl NodeService for MinimalLockNodeService {
|
||||
let lock_info_json = result.lock_info.as_ref().and_then(|info| serde_json::to_string(info).ok());
|
||||
Ok(Response::new(GenerallyLockResponse {
|
||||
success: result.success,
|
||||
error_info: None,
|
||||
error_info: result.error,
|
||||
lock_info: lock_info_json,
|
||||
}))
|
||||
}
|
||||
@@ -131,11 +143,14 @@ impl NodeService for MinimalLockNodeService {
|
||||
};
|
||||
|
||||
match self.lock_client.release(&args.lock_id).await {
|
||||
Ok(success) => Ok(Response::new(GenerallyLockResponse {
|
||||
success,
|
||||
error_info: None,
|
||||
lock_info: None,
|
||||
})),
|
||||
Ok(success) => {
|
||||
let result = lock_result_from_release(&args.lock_id, success);
|
||||
Ok(Response::new(GenerallyLockResponse {
|
||||
success: result.success,
|
||||
error_info: result.error_info,
|
||||
lock_info: None,
|
||||
}))
|
||||
}
|
||||
Err(err) => Ok(Response::new(GenerallyLockResponse {
|
||||
success: false,
|
||||
error_info: Some(format!(
|
||||
@@ -161,11 +176,14 @@ impl NodeService for MinimalLockNodeService {
|
||||
};
|
||||
|
||||
match self.lock_client.force_release(&args.lock_id).await {
|
||||
Ok(success) => Ok(Response::new(GenerallyLockResponse {
|
||||
success,
|
||||
error_info: None,
|
||||
lock_info: None,
|
||||
})),
|
||||
Ok(success) => {
|
||||
let result = lock_result_from_release(&args.lock_id, success);
|
||||
Ok(Response::new(GenerallyLockResponse {
|
||||
success: result.success,
|
||||
error_info: result.error_info,
|
||||
lock_info: None,
|
||||
}))
|
||||
}
|
||||
Err(err) => Ok(Response::new(GenerallyLockResponse {
|
||||
success: false,
|
||||
error_info: Some(format!(
|
||||
@@ -271,10 +289,9 @@ impl NodeService for MinimalLockNodeService {
|
||||
Ok(batch_results) => {
|
||||
for (result_idx, success) in batch_results.into_iter().enumerate() {
|
||||
if let Some(request_idx) = valid_indices.get(result_idx) {
|
||||
results[*request_idx] = GenerallyLockResult {
|
||||
success,
|
||||
error_info: None,
|
||||
lock_info: None,
|
||||
results[*request_idx] = match lock_ids.get(result_idx) {
|
||||
Some(lock_id) => lock_result_from_release(lock_id, success),
|
||||
None => lock_result_from_error(format!("unlock response index out of range: {result_idx}")),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -275,6 +275,41 @@ async fn test_grpc_lock_client_batch_acquire_and_release() {
|
||||
handle.abort();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_grpc_lock_client_uses_request_lock_id_and_reports_missing_unlock() {
|
||||
let manager = Arc::new(GlobalLockManager::new());
|
||||
let local_client: Arc<dyn rustfs_lock::LockClient> = Arc::new(LocalClient::with_manager(manager));
|
||||
|
||||
let (addr, handle) = spawn_lock_server(local_client).await.expect("Failed to spawn server");
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
|
||||
let grpc_client = GrpcLockClient::new(addr);
|
||||
let request = LockRequest::new(test_resource(), LockType::Exclusive, "owner-a").with_acquire_timeout(Duration::from_secs(2));
|
||||
|
||||
let response = grpc_client.acquire_lock(&request).await.expect("gRPC acquire should succeed");
|
||||
let lock_info = response.lock_info.expect("gRPC acquire should include lock info");
|
||||
assert_eq!(lock_info.id, request.lock_id);
|
||||
|
||||
assert!(
|
||||
grpc_client
|
||||
.release(&request.lock_id)
|
||||
.await
|
||||
.expect("gRPC release should succeed"),
|
||||
"release should find the request lock id"
|
||||
);
|
||||
|
||||
let missing_release = grpc_client
|
||||
.release(&request.lock_id)
|
||||
.await
|
||||
.expect_err("second release should report missing lock");
|
||||
assert!(
|
||||
missing_release.to_string().contains("lock not found for release"),
|
||||
"missing release should preserve server error, got: {missing_release}"
|
||||
);
|
||||
|
||||
handle.abort();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_distributed_lock_4_nodes_grpc_read_write_quorum_split_with_two_failed_nodes() {
|
||||
let manager1 = Arc::new(GlobalLockManager::new());
|
||||
|
||||
@@ -190,11 +190,6 @@ impl LockClient for RemoteClient {
|
||||
Err(err) => return Ok(Self::rpc_failure_response(request, &err)),
|
||||
};
|
||||
|
||||
// Check for explicit error first
|
||||
if let Some(error_info) = resp.error_info {
|
||||
return Err(LockError::internal(error_info));
|
||||
}
|
||||
|
||||
// Check if the lock acquisition was successful
|
||||
if resp.success {
|
||||
Ok(LockResponse::success(
|
||||
@@ -204,7 +199,8 @@ impl LockClient for RemoteClient {
|
||||
} else {
|
||||
// Lock acquisition failed
|
||||
Ok(LockResponse::failure(
|
||||
"Lock acquisition failed on remote server".to_string(),
|
||||
resp.error_info
|
||||
.unwrap_or_else(|| "Lock acquisition failed on remote server".to_string()),
|
||||
std::time::Duration::ZERO,
|
||||
))
|
||||
}
|
||||
|
||||
@@ -22,13 +22,15 @@ use futures::future::join_all;
|
||||
use rustfs_io_metrics::{
|
||||
record_read_lock_held_acquire, record_read_lock_held_release, record_write_lock_held_acquire, record_write_lock_held_release,
|
||||
};
|
||||
use std::sync::{Arc, LazyLock};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::warn;
|
||||
use tracing::{debug, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
const UNLOCK_RETRY_ATTEMPTS: usize = 3;
|
||||
const UNLOCK_RETRY_BACKOFF: Duration = Duration::from_millis(100);
|
||||
|
||||
/// Generate a new aggregate lock ID for multiple client locks
|
||||
fn generate_aggregate_lock_id(resource: &ObjectKey) -> LockId {
|
||||
LockId {
|
||||
@@ -37,45 +39,6 @@ fn generate_aggregate_lock_id(resource: &ObjectKey) -> LockId {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct UnlockJob {
|
||||
/// Entries to release: each (LockId, client) pair will be released independently.
|
||||
entries: Vec<(LockId, Arc<dyn LockClient>)>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct UnlockRuntime {
|
||||
tx: mpsc::Sender<UnlockJob>,
|
||||
}
|
||||
|
||||
// Global unlock runtime with background worker
|
||||
static UNLOCK_RUNTIME: LazyLock<UnlockRuntime> = LazyLock::new(|| {
|
||||
// Larger buffer to reduce contention during bursts
|
||||
let (tx, mut rx) = mpsc::channel::<UnlockJob>(8192);
|
||||
|
||||
// Spawn background worker when first used; assumes a Tokio runtime is available
|
||||
tokio::spawn(async move {
|
||||
while let Some(job) = rx.recv().await {
|
||||
// Best-effort release across all (LockId, client) entries.
|
||||
let results = join_all(
|
||||
job.entries
|
||||
.into_iter()
|
||||
.map(|(lock_id, client)| async move { client.release(&lock_id).await.unwrap_or(false) }),
|
||||
)
|
||||
.await;
|
||||
let any_ok = results.into_iter().any(|released| released);
|
||||
|
||||
if !any_ok {
|
||||
tracing::warn!("DistributedLockGuard background release failed for one or more entries");
|
||||
} else {
|
||||
tracing::debug!("DistributedLockGuard background released one or more entries");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
UnlockRuntime { tx }
|
||||
});
|
||||
|
||||
/// A RAII guard for distributed locks that releases the lock asynchronously when dropped.
|
||||
#[derive(Debug)]
|
||||
pub struct DistributedLockGuard {
|
||||
@@ -126,47 +89,22 @@ impl DistributedLockGuard {
|
||||
}
|
||||
|
||||
/// Manually release the lock early.
|
||||
/// This sends a release job to the background worker and then disarms the guard
|
||||
/// This spawns a background release task and then disarms the guard
|
||||
/// to prevent double-release on drop.
|
||||
/// Returns true if the lock was released (or was already released), false otherwise.
|
||||
/// Returns true if release was scheduled or the guard was already disarmed.
|
||||
pub fn release(&mut self) -> bool {
|
||||
if self.disarmed {
|
||||
// Lock was already released, return true to indicate lock is in released state
|
||||
return true;
|
||||
}
|
||||
|
||||
let job = UnlockJob {
|
||||
entries: self.entries.clone(),
|
||||
};
|
||||
|
||||
// Try a non-blocking send to avoid panics
|
||||
let success = if let Err(err) = UNLOCK_RUNTIME.tx.try_send(job) {
|
||||
// Channel full or closed; best-effort fallback: spawn a detached task
|
||||
let entries = self.entries.clone();
|
||||
tracing::warn!(
|
||||
"DistributedLockGuard channel send failed ({}), spawning fallback unlock task for {} entries",
|
||||
err,
|
||||
entries.len()
|
||||
);
|
||||
|
||||
// If runtime is not available, this will panic; but in RustFS we are inside Tokio contexts.
|
||||
let handle = tokio::spawn(async move {
|
||||
let futures_iter = entries
|
||||
.into_iter()
|
||||
.map(|(lock_id, client)| async move { client.release(&lock_id).await.unwrap_or(false) });
|
||||
let _ = join_all(futures_iter).await;
|
||||
});
|
||||
// Explicitly drop the JoinHandle to acknowledge detaching the task.
|
||||
drop(handle);
|
||||
true // Consider it successful even if we had to use fallback
|
||||
} else {
|
||||
true
|
||||
};
|
||||
let entries = self.entries.clone();
|
||||
DistributedLock::spawn_release_cleanup(entries, "distributed_lock_guard_release");
|
||||
|
||||
// Disarm to prevent double-release on drop
|
||||
self.disarmed = true;
|
||||
record_lock_held_release(self.lock_type);
|
||||
success
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
@@ -349,22 +287,64 @@ impl DistributedLock {
|
||||
pending
|
||||
}
|
||||
|
||||
async fn release_entries(entries: &[(LockId, Arc<dyn LockClient>)], context: &'static str) {
|
||||
let release_results = join_all(
|
||||
entries
|
||||
.iter()
|
||||
.map(|(lock_id, client)| async move { (lock_id, client.release(lock_id).await) }),
|
||||
)
|
||||
.await;
|
||||
async fn release_entries(entries: Vec<(LockId, Arc<dyn LockClient>)>, context: &'static str) {
|
||||
let mut pending = entries;
|
||||
|
||||
for (lock_id, result) in release_results {
|
||||
match result {
|
||||
Ok(true) | Ok(false) => {}
|
||||
Err(err) => {
|
||||
tracing::warn!("{context}: failed to release lock {} on client: {}", lock_id, err);
|
||||
for attempt in 1..=UNLOCK_RETRY_ATTEMPTS {
|
||||
let release_results = join_all(pending.into_iter().map(|(lock_id, client)| async move {
|
||||
match client.release(&lock_id).await {
|
||||
Ok(true) => None,
|
||||
Ok(false) => {
|
||||
warn!(%lock_id, attempt, context, "distributed unlock did not find lock on client");
|
||||
Some((lock_id, client))
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(%lock_id, attempt, context, "distributed unlock failed on client: {}", err);
|
||||
Some((lock_id, client))
|
||||
}
|
||||
}
|
||||
}))
|
||||
.await;
|
||||
|
||||
pending = release_results.into_iter().flatten().collect();
|
||||
if pending.is_empty() {
|
||||
debug!(attempt, context, "distributed unlock completed");
|
||||
return;
|
||||
}
|
||||
|
||||
if attempt < UNLOCK_RETRY_ATTEMPTS {
|
||||
tokio::time::sleep(UNLOCK_RETRY_BACKOFF * attempt as u32).await;
|
||||
}
|
||||
}
|
||||
|
||||
warn!(
|
||||
remaining = pending.len(),
|
||||
attempts = UNLOCK_RETRY_ATTEMPTS,
|
||||
context,
|
||||
"distributed unlock left unreleased entries after retry"
|
||||
);
|
||||
}
|
||||
|
||||
fn spawn_release_cleanup(entries: Vec<(LockId, Arc<dyn LockClient>)>, context: &'static str) {
|
||||
if entries.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
if let Ok(handle) = tokio::runtime::Handle::try_current() {
|
||||
let join_handle = handle.spawn(async move {
|
||||
Self::release_entries(entries, context).await;
|
||||
});
|
||||
drop(join_handle);
|
||||
return;
|
||||
}
|
||||
|
||||
let join_handle = std::thread::spawn(move || match tokio::runtime::Builder::new_current_thread().enable_all().build() {
|
||||
Ok(runtime) => runtime.block_on(async move {
|
||||
Self::release_entries(entries, context).await;
|
||||
}),
|
||||
Err(err) => warn!(context, "failed to create fallback unlock runtime: {}", err),
|
||||
});
|
||||
drop(join_handle);
|
||||
}
|
||||
|
||||
fn spawn_pending_cleanup(
|
||||
@@ -387,9 +367,7 @@ impl DistributedLock {
|
||||
continue;
|
||||
};
|
||||
|
||||
if let Err(err) = client.release(&lock_id).await {
|
||||
tracing::warn!("{context}: failed to cleanup late lock {} on client {}: {}", lock_id, idx, err);
|
||||
}
|
||||
Self::release_entries(vec![(lock_id, client.clone())], context).await;
|
||||
}
|
||||
Ok((idx, Ok(resp))) => {
|
||||
tracing::debug!(
|
||||
@@ -506,7 +484,7 @@ impl DistributedLock {
|
||||
|
||||
if individual_locks.len() + pending.len() < required_quorum {
|
||||
let rollback_count = individual_locks.len();
|
||||
Self::release_entries(&individual_locks, "distributed_lock_quorum_rollback").await;
|
||||
Self::spawn_release_cleanup(individual_locks.clone(), "distributed_lock_quorum_rollback");
|
||||
if !pending.is_empty() {
|
||||
Self::spawn_pending_cleanup(
|
||||
pending,
|
||||
@@ -525,7 +503,7 @@ impl DistributedLock {
|
||||
}
|
||||
|
||||
let rollback_count = individual_locks.len();
|
||||
Self::release_entries(&individual_locks, "distributed_lock_quorum_rollback").await;
|
||||
Self::spawn_release_cleanup(individual_locks.clone(), "distributed_lock_quorum_rollback");
|
||||
let resp = LockResponse::failure(
|
||||
format!("Failed to acquire quorum: {rollback_count}/{required_quorum} required"),
|
||||
Duration::ZERO,
|
||||
|
||||
@@ -16,7 +16,10 @@ use super::*;
|
||||
use crate::client::{ClientFactory, local::LocalClient};
|
||||
use crate::types::LockType;
|
||||
use crate::{GlobalLockManager, LockError, LockInfo, LockResponse, LockStats};
|
||||
use std::sync::Arc;
|
||||
use std::sync::{
|
||||
Arc,
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
};
|
||||
use std::time::Duration;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
@@ -107,6 +110,75 @@ impl crate::client::LockClient for DelayedClient {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct FlakyReleaseClient {
|
||||
inner: LocalClient,
|
||||
failed_releases_remaining: AtomicUsize,
|
||||
release_attempts: AtomicUsize,
|
||||
}
|
||||
|
||||
impl FlakyReleaseClient {
|
||||
fn new(manager: Arc<GlobalLockManager>, failed_releases: usize) -> Self {
|
||||
Self {
|
||||
inner: LocalClient::with_manager(manager),
|
||||
failed_releases_remaining: AtomicUsize::new(failed_releases),
|
||||
release_attempts: AtomicUsize::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
fn release_attempts(&self) -> usize {
|
||||
self.release_attempts.load(Ordering::SeqCst)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl crate::client::LockClient for FlakyReleaseClient {
|
||||
async fn acquire_lock(&self, request: &LockRequest) -> crate::Result<LockResponse> {
|
||||
self.inner.acquire_lock(request).await
|
||||
}
|
||||
|
||||
async fn release(&self, lock_id: &LockId) -> crate::Result<bool> {
|
||||
self.release_attempts.fetch_add(1, Ordering::SeqCst);
|
||||
if self
|
||||
.failed_releases_remaining
|
||||
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |remaining| remaining.checked_sub(1))
|
||||
.is_ok()
|
||||
{
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
self.inner.release(lock_id).await
|
||||
}
|
||||
|
||||
async fn refresh(&self, lock_id: &LockId) -> crate::Result<bool> {
|
||||
self.inner.refresh(lock_id).await
|
||||
}
|
||||
|
||||
async fn force_release(&self, lock_id: &LockId) -> crate::Result<bool> {
|
||||
self.inner.force_release(lock_id).await
|
||||
}
|
||||
|
||||
async fn check_status(&self, lock_id: &LockId) -> crate::Result<Option<LockInfo>> {
|
||||
self.inner.check_status(lock_id).await
|
||||
}
|
||||
|
||||
async fn get_stats(&self) -> crate::Result<LockStats> {
|
||||
self.inner.get_stats().await
|
||||
}
|
||||
|
||||
async fn close(&self) -> crate::Result<()> {
|
||||
self.inner.close().await
|
||||
}
|
||||
|
||||
async fn is_online(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
async fn is_local(&self) -> bool {
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
fn create_test_object_key(bucket: &str, object: &str) -> ObjectKey {
|
||||
ObjectKey {
|
||||
bucket: Arc::from(bucket),
|
||||
@@ -115,6 +187,41 @@ fn create_test_object_key(bucket: &str, object: &str) -> ObjectKey {
|
||||
}
|
||||
}
|
||||
|
||||
async fn wait_until_all_managers_can_write(managers: &[Arc<GlobalLockManager>], resource: ObjectKey) {
|
||||
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
|
||||
|
||||
loop {
|
||||
let mut guards = Vec::with_capacity(managers.len());
|
||||
let mut all_available = true;
|
||||
|
||||
for (idx, manager) in managers.iter().enumerate() {
|
||||
let local_lock = NamespaceLock::with_local_manager(format!("probe-node-{idx}"), manager.clone());
|
||||
match local_lock
|
||||
.get_write_lock(resource.clone(), "probe-owner", Duration::from_millis(20))
|
||||
.await
|
||||
{
|
||||
Ok(guard) => guards.push(guard),
|
||||
Err(_) => {
|
||||
all_available = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
drop(guards);
|
||||
|
||||
if all_available {
|
||||
return;
|
||||
}
|
||||
|
||||
assert!(
|
||||
tokio::time::Instant::now() < deadline,
|
||||
"distributed lock was not released on all simulated nodes"
|
||||
);
|
||||
tokio::time::sleep(Duration::from_millis(20)).await;
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_lock_new() {
|
||||
let client = ClientFactory::create_local();
|
||||
@@ -174,6 +281,24 @@ async fn test_lock_client_default_batch_acquire_and_release() {
|
||||
assert_eq!(released, vec![true, true]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_local_client_uses_request_lock_id_for_release() {
|
||||
let manager = Arc::new(GlobalLockManager::new());
|
||||
let client = LocalClient::with_manager(manager);
|
||||
let resource = create_test_object_key("bucket", "object");
|
||||
let request = LockRequest::new(resource.clone(), LockType::Exclusive, "owner-a").with_acquire_timeout(Duration::from_secs(1));
|
||||
|
||||
let response = client.acquire_lock(&request).await.unwrap();
|
||||
let lock_info = response.lock_info.expect("successful acquire should return lock info");
|
||||
assert_eq!(lock_info.id, request.lock_id);
|
||||
|
||||
assert!(client.release(&request.lock_id).await.unwrap());
|
||||
|
||||
let second_request = LockRequest::new(resource, LockType::Exclusive, "owner-b").with_acquire_timeout(Duration::from_secs(1));
|
||||
let second_response = client.acquire_lock(&second_request).await.unwrap();
|
||||
assert!(second_response.success);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_lock_get_resource_key() {
|
||||
let client = ClientFactory::create_local();
|
||||
@@ -488,6 +613,97 @@ async fn test_namespace_lock_distributed_with_clients_and_quorum() {
|
||||
drop(guard_b);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_lock_distributed_eight_node_write_releases_all_nodes() {
|
||||
let managers = (0..8).map(|_| Arc::new(GlobalLockManager::new())).collect::<Vec<_>>();
|
||||
let clients = managers
|
||||
.iter()
|
||||
.map(|manager| Arc::new(LocalClient::with_manager(manager.clone())) as Arc<dyn LockClient>)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let lock = NamespaceLock::with_clients_and_quorum("eight-node".to_string(), clients, 5);
|
||||
let resource = create_test_object_key("bucket", "object-eight-node");
|
||||
|
||||
let mut guard = lock
|
||||
.get_write_lock(resource.clone(), "owner-a", Duration::from_secs(1))
|
||||
.await
|
||||
.expect("owner-a should acquire write lock across eight simulated nodes");
|
||||
|
||||
let err = lock
|
||||
.get_write_lock(resource.clone(), "owner-b", Duration::from_millis(100))
|
||||
.await
|
||||
.expect_err("owner-b should not acquire while owner-a holds all node locks");
|
||||
let err_str = err.to_string();
|
||||
assert!(
|
||||
err_str.contains("required 5") && err_str.contains("achieved"),
|
||||
"expected 8-node quorum failure below required write quorum, got: {err}"
|
||||
);
|
||||
|
||||
assert!(guard.release(), "distributed guard should enqueue release");
|
||||
wait_until_all_managers_can_write(&managers, resource).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_lock_distributed_unlock_retries_release_false() {
|
||||
let managers = (0..3).map(|_| Arc::new(GlobalLockManager::new())).collect::<Vec<_>>();
|
||||
let flaky_clients = managers
|
||||
.iter()
|
||||
.map(|manager| Arc::new(FlakyReleaseClient::new(manager.clone(), 1)))
|
||||
.collect::<Vec<_>>();
|
||||
let clients = flaky_clients
|
||||
.iter()
|
||||
.map(|client| client.clone() as Arc<dyn LockClient>)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let lock = NamespaceLock::with_clients("flaky-release".to_string(), clients);
|
||||
let resource = create_test_object_key("bucket", "object-flaky-release");
|
||||
|
||||
let mut guard = lock
|
||||
.get_write_lock(resource.clone(), "owner-a", Duration::from_secs(1))
|
||||
.await
|
||||
.expect("owner-a should acquire write lock before flaky release");
|
||||
|
||||
assert!(guard.release(), "distributed guard should enqueue release");
|
||||
wait_until_all_managers_can_write(&managers, resource).await;
|
||||
|
||||
assert!(
|
||||
flaky_clients.iter().all(|client| client.release_attempts() >= 2),
|
||||
"each simulated node should be retried after an initial false release"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_namespace_lock_distributed_drop_without_runtime_does_not_panic() {
|
||||
let (manager, resource, guard) = {
|
||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("test runtime should be created");
|
||||
runtime.block_on(async {
|
||||
let manager = Arc::new(GlobalLockManager::new());
|
||||
let resource = create_test_object_key("bucket", "object-drop-no-runtime");
|
||||
let lock = NamespaceLock::with_clients(
|
||||
"drop-no-runtime".to_string(),
|
||||
vec![Arc::new(LocalClient::with_manager(manager.clone()))],
|
||||
);
|
||||
let guard = lock
|
||||
.get_write_lock(resource.clone(), "owner-a", Duration::from_secs(1))
|
||||
.await
|
||||
.expect("lock should be acquired");
|
||||
(manager, resource, guard)
|
||||
})
|
||||
};
|
||||
|
||||
let drop_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| drop(guard)));
|
||||
assert!(drop_result.is_ok(), "dropping distributed guard without runtime should not panic");
|
||||
|
||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("test runtime should be created");
|
||||
runtime.block_on(wait_until_all_managers_can_write(&[manager], resource));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_lock_distributed_read_lock_succeeds_with_two_nodes_one_offline() {
|
||||
let manager = Arc::new(GlobalLockManager::new());
|
||||
@@ -568,6 +784,39 @@ async fn test_namespace_lock_distributed_quorum_failure_rolls_back_successful_no
|
||||
drop(guard2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_lock_distributed_quorum_rollback_retries_release_false() {
|
||||
let managers = (0..2).map(|_| Arc::new(GlobalLockManager::new())).collect::<Vec<_>>();
|
||||
let flaky_clients = managers
|
||||
.iter()
|
||||
.map(|manager| Arc::new(FlakyReleaseClient::new(manager.clone(), 1)))
|
||||
.collect::<Vec<_>>();
|
||||
let clients = vec![
|
||||
flaky_clients[0].clone() as Arc<dyn LockClient>,
|
||||
flaky_clients[1].clone() as Arc<dyn LockClient>,
|
||||
Arc::new(FailingClient) as Arc<dyn LockClient>,
|
||||
];
|
||||
let resource = create_test_object_key("bucket", "object-rollback-retry");
|
||||
let lock = NamespaceLock::with_clients_and_quorum("rollback-retry".to_string(), clients, 3);
|
||||
|
||||
let err = lock
|
||||
.get_write_lock(resource.clone(), "owner-a", Duration::from_millis(100))
|
||||
.await
|
||||
.expect_err("write lock should fail when quorum requires the offline node");
|
||||
|
||||
let err_str = err.to_string().to_lowercase();
|
||||
assert!(
|
||||
err_str.contains("quorum") || err_str.contains("not reached"),
|
||||
"expected quorum error, got: {err}"
|
||||
);
|
||||
wait_until_all_managers_can_write(&managers, resource).await;
|
||||
|
||||
assert!(
|
||||
flaky_clients.iter().all(|client| client.release_attempts() >= 2),
|
||||
"rollback should retry node releases that initially returned false"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_namespace_lock_distributed_even_node_read_write_quorum_split() {
|
||||
let manager1 = Arc::new(GlobalLockManager::new());
|
||||
|
||||
@@ -30,6 +30,18 @@ fn lock_result_from_error(error: impl Into<String>) -> GenerallyLockResult {
|
||||
}
|
||||
}
|
||||
|
||||
fn lock_result_from_release(lock_id: &rustfs_lock::LockId, success: bool) -> GenerallyLockResult {
|
||||
if success {
|
||||
GenerallyLockResult {
|
||||
success: true,
|
||||
error_info: None,
|
||||
lock_info: None,
|
||||
}
|
||||
} else {
|
||||
lock_result_from_error(format!("lock not found for release: {lock_id}"))
|
||||
}
|
||||
}
|
||||
|
||||
impl NodeService {
|
||||
pub(super) async fn handle_refresh(
|
||||
&self,
|
||||
@@ -71,12 +83,15 @@ impl NodeService {
|
||||
};
|
||||
|
||||
let lock_client = self.get_lock_client()?;
|
||||
match lock_client.release(&args.lock_id).await {
|
||||
Ok(_) => Ok(Response::new(GenerallyLockResponse {
|
||||
success: true,
|
||||
error_info: None,
|
||||
lock_info: None,
|
||||
})),
|
||||
match lock_client.force_release(&args.lock_id).await {
|
||||
Ok(success) => {
|
||||
let result = lock_result_from_release(&args.lock_id, success);
|
||||
Ok(Response::new(GenerallyLockResponse {
|
||||
success: result.success,
|
||||
error_info: result.error_info,
|
||||
lock_info: None,
|
||||
}))
|
||||
}
|
||||
Err(err) => Ok(Response::new(GenerallyLockResponse {
|
||||
success: false,
|
||||
error_info: Some(format!(
|
||||
@@ -106,11 +121,14 @@ impl NodeService {
|
||||
|
||||
let lock_client = self.get_lock_client()?;
|
||||
match lock_client.release(&args.lock_id).await {
|
||||
Ok(_) => Ok(Response::new(GenerallyLockResponse {
|
||||
success: true,
|
||||
error_info: None,
|
||||
lock_info: None,
|
||||
})),
|
||||
Ok(success) => {
|
||||
let result = lock_result_from_release(&args.lock_id, success);
|
||||
Ok(Response::new(GenerallyLockResponse {
|
||||
success: result.success,
|
||||
error_info: result.error_info,
|
||||
lock_info: None,
|
||||
}))
|
||||
}
|
||||
Err(err) => Ok(Response::new(GenerallyLockResponse {
|
||||
success: false,
|
||||
error_info: Some(format!(
|
||||
@@ -146,7 +164,7 @@ impl NodeService {
|
||||
let lock_info_json = result.lock_info.as_ref().and_then(|info| serde_json::to_string(info).ok());
|
||||
Ok(Response::new(GenerallyLockResponse {
|
||||
success: result.success,
|
||||
error_info: None,
|
||||
error_info: result.error,
|
||||
lock_info: lock_info_json,
|
||||
}))
|
||||
}
|
||||
@@ -230,10 +248,9 @@ impl NodeService {
|
||||
Ok(batch_results) => {
|
||||
for (result_idx, success) in batch_results.into_iter().enumerate() {
|
||||
if let Some(request_idx) = valid_indices.get(result_idx) {
|
||||
results[*request_idx] = GenerallyLockResult {
|
||||
success,
|
||||
error_info: None,
|
||||
lock_info: None,
|
||||
results[*request_idx] = match lock_ids.get(result_idx) {
|
||||
Some(lock_id) => lock_result_from_release(lock_id, success),
|
||||
None => lock_result_from_error(format!("unlock response index out of range: {result_idx}")),
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -249,3 +266,38 @@ impl NodeService {
|
||||
Ok(Response::new(BatchGenerallyLockResponse { results }))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn test_lock_id() -> rustfs_lock::LockId {
|
||||
rustfs_lock::LockRequest::new(rustfs_lock::ObjectKey::new("bucket", "object"), rustfs_lock::LockType::Exclusive, "owner")
|
||||
.lock_id
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn lock_result_from_release_reports_missing_lock() {
|
||||
let lock_id = test_lock_id();
|
||||
let result = lock_result_from_release(&lock_id, false);
|
||||
|
||||
assert!(!result.success);
|
||||
assert!(result.lock_info.is_none());
|
||||
assert!(
|
||||
result
|
||||
.error_info
|
||||
.expect("missing release should include error")
|
||||
.contains("lock not found for release")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn lock_result_from_response_preserves_lock_failure_error() {
|
||||
let response = rustfs_lock::LockResponse::failure("lock conflict", std::time::Duration::ZERO);
|
||||
let result = lock_result_from_response(response);
|
||||
|
||||
assert!(!result.success);
|
||||
assert_eq!(result.error_info.as_deref(), Some("lock conflict"));
|
||||
assert!(result.lock_info.is_none());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user