fix(disk): Fix Usage Report Capacity Calculation (#2274)

Co-authored-by: cxymds <Cxymds@qq.com>
Co-authored-by: loverustfs <hello@rustfs.com>
Co-authored-by: heihutu <heihutu@gmail.com>
This commit is contained in:
houseme
2026-03-24 23:47:30 +08:00
committed by GitHub
parent 8c8d157418
commit 19b8389dc4
19 changed files with 2990 additions and 14 deletions

1
Cargo.lock generated
View File

@@ -7370,6 +7370,7 @@ dependencies = [
"url",
"urlencoding",
"uuid",
"walkdir",
"zip",
]

View File

@@ -0,0 +1,155 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Capacity calculation configuration constants
// ============================================================================
// Environment Variable Names
// ============================================================================
/// Environment variable for scheduled update interval
pub const ENV_CAPACITY_SCHEDULED_INTERVAL: &str = "RUSTFS_CAPACITY_SCHEDULED_INTERVAL";
/// Environment variable for write trigger delay
pub const ENV_CAPACITY_WRITE_TRIGGER_DELAY: &str = "RUSTFS_CAPACITY_WRITE_TRIGGER_DELAY";
/// Environment variable for write frequency threshold
pub const ENV_CAPACITY_WRITE_FREQUENCY_THRESHOLD: &str = "RUSTFS_CAPACITY_WRITE_FREQUENCY_THRESHOLD";
/// Environment variable for fast update threshold
pub const ENV_CAPACITY_FAST_UPDATE_THRESHOLD: &str = "RUSTFS_CAPACITY_FAST_UPDATE_THRESHOLD";
/// Environment variable for max files threshold
pub const ENV_CAPACITY_MAX_FILES_THRESHOLD: &str = "RUSTFS_CAPACITY_MAX_FILES_THRESHOLD";
/// Environment variable for statistics timeout
pub const ENV_CAPACITY_STAT_TIMEOUT: &str = "RUSTFS_CAPACITY_STAT_TIMEOUT";
/// Environment variable for sample rate
pub const ENV_CAPACITY_SAMPLE_RATE: &str = "RUSTFS_CAPACITY_SAMPLE_RATE";
/// Environment variable for following symbolic links during capacity calculation
pub const ENV_CAPACITY_FOLLOW_SYMLINKS: &str = "RUSTFS_CAPACITY_FOLLOW_SYMLINKS";
/// Environment variable for maximum symlink follow depth
pub const ENV_CAPACITY_MAX_SYMLINK_DEPTH: &str = "RUSTFS_CAPACITY_MAX_SYMLINK_DEPTH";
/// Environment variable for enabling dynamic timeout calculation
pub const ENV_CAPACITY_ENABLE_DYNAMIC_TIMEOUT: &str = "RUSTFS_CAPACITY_ENABLE_DYNAMIC_TIMEOUT";
/// Environment variable for minimum capacity calculation timeout
pub const ENV_CAPACITY_MIN_TIMEOUT: &str = "RUSTFS_CAPACITY_MIN_TIMEOUT";
/// Environment variable for maximum capacity calculation timeout
pub const ENV_CAPACITY_MAX_TIMEOUT: &str = "RUSTFS_CAPACITY_MAX_TIMEOUT";
/// Environment variable for progress stall detection timeout
pub const ENV_CAPACITY_STALL_TIMEOUT: &str = "RUSTFS_CAPACITY_STALL_TIMEOUT";
// ============================================================================
// Default Values
// ============================================================================
/// Scheduled update interval in seconds
/// Default: 300 seconds (5 minutes)
pub const DEFAULT_SCHEDULED_UPDATE_INTERVAL_SECS: u64 = 300;
/// Write trigger delay in seconds
/// Default: 10 seconds
pub const DEFAULT_WRITE_TRIGGER_DELAY_SECS: u64 = 10;
/// Write frequency threshold (writes per minute)
/// Default: 10 writes/minute
pub const DEFAULT_WRITE_FREQUENCY_THRESHOLD: usize = 10;
/// Fast update threshold in seconds
/// Default: 60 seconds
pub const DEFAULT_FAST_UPDATE_THRESHOLD_SECS: u64 = 60;
/// Maximum files threshold for sampling
/// Default: 1,000,000 files
pub const DEFAULT_MAX_FILES_THRESHOLD: usize = 1_000_000;
/// Statistics timeout in seconds
/// Default: 5 seconds
pub const DEFAULT_STAT_TIMEOUT_SECS: u64 = 5;
/// Sampling rate (1 in every N files)
/// Default: 100
pub const DEFAULT_SAMPLE_RATE: usize = 100;
/// Follow symbolic links during capacity calculation
/// Default: false (disabled for safety)
pub const DEFAULT_CAPACITY_FOLLOW_SYMLINKS: bool = false;
/// Maximum symlink follow depth
/// Default: 3 levels
pub const DEFAULT_CAPACITY_MAX_SYMLINK_DEPTH: u8 = 3;
/// Enable dynamic timeout calculation based on directory characteristics
/// Default: true (enabled)
pub const DEFAULT_CAPACITY_ENABLE_DYNAMIC_TIMEOUT: bool = true;
/// Minimum capacity calculation timeout in seconds
/// Default: 5 seconds
pub const DEFAULT_CAPACITY_MIN_TIMEOUT_SECS: u64 = 5;
/// Maximum capacity calculation timeout in seconds
/// Default: 60 seconds
pub const DEFAULT_CAPACITY_MAX_TIMEOUT_SECS: u64 = 60;
/// Progress stall detection timeout in seconds
/// Default: 1 second (no progress for 1 second = stall)
pub const DEFAULT_CAPACITY_STALL_TIMEOUT_SECS: u64 = 1;
// ============================================================================
// Tests
// ============================================================================
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_env_var_names() {
assert_eq!(ENV_CAPACITY_SCHEDULED_INTERVAL, "RUSTFS_CAPACITY_SCHEDULED_INTERVAL");
assert_eq!(ENV_CAPACITY_WRITE_TRIGGER_DELAY, "RUSTFS_CAPACITY_WRITE_TRIGGER_DELAY");
assert_eq!(ENV_CAPACITY_WRITE_FREQUENCY_THRESHOLD, "RUSTFS_CAPACITY_WRITE_FREQUENCY_THRESHOLD");
assert_eq!(ENV_CAPACITY_FAST_UPDATE_THRESHOLD, "RUSTFS_CAPACITY_FAST_UPDATE_THRESHOLD");
assert_eq!(ENV_CAPACITY_MAX_FILES_THRESHOLD, "RUSTFS_CAPACITY_MAX_FILES_THRESHOLD");
assert_eq!(ENV_CAPACITY_STAT_TIMEOUT, "RUSTFS_CAPACITY_STAT_TIMEOUT");
assert_eq!(ENV_CAPACITY_SAMPLE_RATE, "RUSTFS_CAPACITY_SAMPLE_RATE");
assert_eq!(ENV_CAPACITY_FOLLOW_SYMLINKS, "RUSTFS_CAPACITY_FOLLOW_SYMLINKS");
assert_eq!(ENV_CAPACITY_MAX_SYMLINK_DEPTH, "RUSTFS_CAPACITY_MAX_SYMLINK_DEPTH");
assert_eq!(ENV_CAPACITY_ENABLE_DYNAMIC_TIMEOUT, "RUSTFS_CAPACITY_ENABLE_DYNAMIC_TIMEOUT");
assert_eq!(ENV_CAPACITY_MIN_TIMEOUT, "RUSTFS_CAPACITY_MIN_TIMEOUT");
assert_eq!(ENV_CAPACITY_MAX_TIMEOUT, "RUSTFS_CAPACITY_MAX_TIMEOUT");
assert_eq!(ENV_CAPACITY_STALL_TIMEOUT, "RUSTFS_CAPACITY_STALL_TIMEOUT");
}
#[test]
fn test_default_values() {
assert_eq!(DEFAULT_SCHEDULED_UPDATE_INTERVAL_SECS, 300);
assert_eq!(DEFAULT_WRITE_TRIGGER_DELAY_SECS, 10);
assert_eq!(DEFAULT_WRITE_FREQUENCY_THRESHOLD, 10);
assert_eq!(DEFAULT_FAST_UPDATE_THRESHOLD_SECS, 60);
assert_eq!(DEFAULT_MAX_FILES_THRESHOLD, 1_000_000);
assert_eq!(DEFAULT_STAT_TIMEOUT_SECS, 5);
assert_eq!(DEFAULT_SAMPLE_RATE, 100);
assert_eq!(DEFAULT_CAPACITY_MAX_SYMLINK_DEPTH, 3);
assert_eq!(DEFAULT_CAPACITY_MIN_TIMEOUT_SECS, 5);
assert_eq!(DEFAULT_CAPACITY_MAX_TIMEOUT_SECS, 60);
assert_eq!(DEFAULT_CAPACITY_STALL_TIMEOUT_SECS, 1);
}
}

View File

@@ -14,6 +14,7 @@
pub(crate) mod app;
pub(crate) mod body_limits;
pub(crate) mod capacity;
pub(crate) mod compress;
pub(crate) mod console;
pub(crate) mod env;

View File

@@ -214,6 +214,54 @@ pub const ENV_OBJECT_DISK_READ_TIMEOUT: &str = "RUSTFS_OBJECT_DISK_READ_TIMEOUT"
/// Default disk read timeout in seconds.
pub const DEFAULT_OBJECT_DISK_READ_TIMEOUT: u64 = 10;
/// Environment variable for minimum GetObject timeout in seconds.
///
/// When dynamic timeout calculation is enabled, this is the minimum timeout
/// that will be used regardless of object size. This prevents excessively
/// short timeouts for very small objects.
///
/// Default: 5 seconds (can be overridden by `RUSTFS_OBJECT_MIN_TIMEOUT`).
pub const ENV_OBJECT_MIN_TIMEOUT: &str = "RUSTFS_OBJECT_MIN_TIMEOUT";
/// Default minimum GetObject timeout: 5 seconds.
pub const DEFAULT_OBJECT_MIN_TIMEOUT: u64 = 5;
/// Environment variable for maximum GetObject timeout in seconds.
///
/// When dynamic timeout calculation is enabled, this is the maximum timeout
/// that will be used regardless of object size. This prevents excessively
/// long timeouts for very large objects.
///
/// Default: 300 seconds (5 minutes, can be overridden by `RUSTFS_OBJECT_MAX_TIMEOUT`).
pub const ENV_OBJECT_MAX_TIMEOUT: &str = "RUSTFS_OBJECT_MAX_TIMEOUT";
/// Default maximum GetObject timeout: 300 seconds (5 minutes).
pub const DEFAULT_OBJECT_MAX_TIMEOUT: u64 = 300;
/// Environment variable for default bytes per second for timeout estimation.
///
/// This value is used to estimate timeout duration based on object size when
/// dynamic timeout calculation is enabled. The timeout is calculated as:
/// (object_size / bytes_per_second) * buffer_factor
///
/// Default: 1048576 (1 MB/s, can be overridden by `RUSTFS_OBJECT_BYTES_PER_SECOND`).
pub const ENV_OBJECT_BYTES_PER_SECOND: &str = "RUSTFS_OBJECT_BYTES_PER_SECOND";
/// Default bytes per second for timeout estimation: 1 MB/s.
pub const DEFAULT_OBJECT_BYTES_PER_SECOND: u64 = 1024 * 1024;
/// Environment variable to enable dynamic timeout calculation.
///
/// When enabled, timeout is calculated based on object size and transfer speed
/// rather than using a fixed timeout value. This provides better timeout
/// handling for objects of varying sizes.
///
/// Default: true (enabled, can be overridden by `RUSTFS_OBJECT_DYNAMIC_TIMEOUT_ENABLE`).
pub const ENV_OBJECT_DYNAMIC_TIMEOUT_ENABLE: &str = "RUSTFS_OBJECT_DYNAMIC_TIMEOUT_ENABLE";
/// Default: dynamic timeout calculation is enabled.
pub const DEFAULT_OBJECT_DYNAMIC_TIMEOUT_ENABLE: bool = true;
/// Environment variable for duplex pipe buffer size in bytes.
///
/// The duplex pipe connects the disk read task to the HTTP response stream.

View File

@@ -19,6 +19,8 @@ pub use constants::app::*;
#[cfg(feature = "constants")]
pub use constants::body_limits::*;
#[cfg(feature = "constants")]
pub use constants::capacity::*;
#[cfg(feature = "constants")]
pub use constants::compress::*;
#[cfg(feature = "constants")]
pub use constants::console::*;

View File

@@ -1013,7 +1013,7 @@ async fn handle_authenticated_request(
type SymlinkResolutionFuture<'a> =
Pin<Box<dyn std::future::Future<Output = Result<(String, String, String, Option<String>), SwiftError>> + Send + 'a>>;
/// Resolve symlink chain recursively
/// Resolve symlink chain recursively with circular reference detection
///
/// Returns (final_account, final_container, final_object, symlink_target_header)
/// where symlink_target_header is Some(target) if the original object was a symlink
@@ -1023,12 +1023,17 @@ fn resolve_symlink_chain<'a>(
object: &'a str,
credentials: &'a Option<Credentials>,
depth: u8,
visited: std::collections::HashSet<crate::swift::symlink::SymlinkPath>,
) -> SymlinkResolutionFuture<'a> {
Box::pin(async move {
use super::symlink;
// Validate depth to prevent infinite loops
symlink::validate_symlink_depth(depth)?;
// Validate both depth and circular references
symlink::validate_symlink_access(&visited, depth, account, container, object)?;
// Add current path to visited set
let mut new_visited = visited;
new_visited.insert(symlink::SymlinkPath::new(account, container, object));
// Get object metadata
let info = if let Some(creds) = credentials {
@@ -1041,14 +1046,14 @@ fn resolve_symlink_chain<'a>(
// Check if this object is a symlink
if let Some(target) = symlink::get_symlink_target(&info.user_defined)? {
let target_container = target.resolve_container(container);
let target_object = &target.object;
let target_object = target.object.clone();
// Store the original target for the response header
let target_header = target.to_header_value(container);
// Recursively resolve the target (it might also be a symlink)
let (final_account, final_container, final_object, _) =
resolve_symlink_chain(account, target_container, target_object, credentials, depth + 1).await?;
resolve_symlink_chain(account, target_container, &target_object, credentials, depth + 1, new_visited).await?;
// Return the final target, but keep the first-level symlink target for the header
Ok((final_account, final_container, final_object, Some(target_header)))
@@ -1059,6 +1064,18 @@ fn resolve_symlink_chain<'a>(
})
}
/// Helper function to start symlink resolution with an empty visited set
fn resolve_symlink_chain_wrapper<'a>(
account: &'a str,
container: &'a str,
object: &'a str,
credentials: &'a Option<Credentials>,
) -> SymlinkResolutionFuture<'a> {
Box::pin(
async move { resolve_symlink_chain(account, container, object, credentials, 0, std::collections::HashSet::new()).await },
)
}
/// Helper function for object GET operations (used by both authenticated and TempURL requests)
async fn handle_object_get(
account: &str,
@@ -1072,7 +1089,7 @@ async fn handle_object_get(
// Resolve symlinks first (with loop detection)
let (final_account, final_container, final_object, symlink_target) =
resolve_symlink_chain(account, container, object, credentials, 0).await?;
resolve_symlink_chain_wrapper(account, container, object, credentials).await?;
// Check if object is SLO (via metadata)
if slo::is_slo_object(&final_account, &final_container, &final_object, credentials).await? {
@@ -1213,7 +1230,7 @@ async fn handle_object_head(
) -> Result<Response<Body>, SwiftError> {
// Resolve symlinks first (with loop detection)
let (final_account, final_container, final_object, symlink_target) =
resolve_symlink_chain(account, container, object, credentials, 0).await?;
resolve_symlink_chain_wrapper(account, container, object, credentials).await?;
let info = if let Some(creds) = credentials {
object::head_object(&final_account, &final_container, &final_object, creds).await?
@@ -1437,8 +1454,7 @@ fn swift_error_to_response(error: SwiftError) -> Response<Body> {
#[cfg(test)]
mod tests {
use super::*;
use super::parse_range_header;
#[test]
fn test_parse_range_header_start_end() {
// bytes=100-199

View File

@@ -59,11 +59,34 @@
//! ```
use super::{SwiftError, SwiftResult};
use tracing::debug;
use std::collections::HashSet;
use tracing::{debug, warn};
/// Maximum symlink follow depth to prevent infinite loops
const MAX_SYMLINK_DEPTH: u8 = 5;
/// Symlink path used for loop detection
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SymlinkPath {
pub account: String,
pub container: String,
pub object: String,
}
impl SymlinkPath {
pub fn new(account: &str, container: &str, object: &str) -> Self {
Self {
account: account.to_string(),
container: container.to_string(),
object: object.to_string(),
}
}
pub fn from_strs(account: &str, container: &str, object: &str) -> Self {
Self::new(account, container, object)
}
}
/// Parsed symlink target
#[derive(Debug, Clone, PartialEq)]
pub struct SymlinkTarget {
@@ -167,6 +190,43 @@ pub fn validate_symlink_depth(depth: u8) -> SwiftResult<()> {
Ok(())
}
/// Check if a symlink path has been visited before (circular reference detection)
pub fn check_circular_reference(visited: &HashSet<SymlinkPath>, account: &str, container: &str, object: &str) -> SwiftResult<()> {
let path = SymlinkPath::new(account, container, object);
if visited.contains(&path) {
warn!(
account = %account,
container = %container,
object = %object,
"Circular symlink reference detected"
);
return Err(SwiftError::Conflict(format!(
"Circular symlink reference detected: {}/{}/{}",
account, container, object
)));
}
Ok(())
}
/// Validate symlink depth and check for circular references
pub fn validate_symlink_access(
visited: &HashSet<SymlinkPath>,
depth: u8,
account: &str,
container: &str,
object: &str,
) -> SwiftResult<()> {
// Check depth limit first
validate_symlink_depth(depth)?;
// Check for circular references
check_circular_reference(visited, account, container, object)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
@@ -310,4 +370,94 @@ mod tests {
assert!(validate_symlink_depth(5).is_err());
assert!(validate_symlink_depth(10).is_err());
}
#[test]
fn test_symlink_path_creation() {
let path = SymlinkPath::new("account1", "container1", "object1");
assert_eq!(path.account, "account1");
assert_eq!(path.container, "container1");
assert_eq!(path.object, "object1");
}
#[test]
fn test_symlink_path_equality() {
let path1 = SymlinkPath::new("account1", "container1", "object1");
let path2 = SymlinkPath::new("account1", "container1", "object1");
let path3 = SymlinkPath::new("account2", "container1", "object1");
assert_eq!(path1, path2);
assert_ne!(path1, path3);
}
#[test]
fn test_check_circular_reference_not_visited() {
let visited = HashSet::new();
assert!(check_circular_reference(&visited, "acc", "cont", "obj").is_ok());
}
#[test]
fn test_check_circular_reference_visited() {
let mut visited = HashSet::new();
visited.insert(SymlinkPath::new("acc", "cont", "obj"));
let result = check_circular_reference(&visited, "acc", "cont", "obj");
assert!(result.is_err());
if let Err(SwiftError::Conflict(msg)) = result {
assert!(msg.contains("Circular symlink reference detected"));
assert!(msg.contains("acc/cont/obj"));
} else {
panic!("Expected Conflict error");
}
}
#[test]
fn test_check_circular_reference_different_path() {
let mut visited = HashSet::new();
visited.insert(SymlinkPath::new("acc1", "cont1", "obj1"));
// Different path should not trigger circular reference error
assert!(check_circular_reference(&visited, "acc2", "cont2", "obj2").is_ok());
}
#[test]
fn test_validate_symlink_access_success() {
let visited = HashSet::new();
assert!(validate_symlink_access(&visited, 0, "acc", "cont", "obj").is_ok());
assert!(validate_symlink_access(&visited, 4, "acc", "cont", "obj").is_ok());
}
#[test]
fn test_validate_symlink_access_depth_exceeded() {
let visited = HashSet::new();
assert!(validate_symlink_access(&visited, 5, "acc", "cont", "obj").is_err());
assert!(validate_symlink_access(&visited, 10, "acc", "cont", "obj").is_err());
}
#[test]
fn test_validate_symlink_access_circular_reference() {
let mut visited = HashSet::new();
visited.insert(SymlinkPath::new("acc", "cont", "obj"));
let result = validate_symlink_access(&visited, 0, "acc", "cont", "obj");
assert!(result.is_err());
if let Err(SwiftError::Conflict(msg)) = result {
assert!(msg.contains("Circular symlink reference detected"));
} else {
panic!("Expected Conflict error");
}
}
#[test]
fn test_validate_symlink_access_both_checks() {
let mut visited = HashSet::new();
visited.insert(SymlinkPath::new("acc", "cont", "obj"));
// Should fail due to circular reference even though depth is OK
assert!(validate_symlink_access(&visited, 0, "acc", "cont", "obj").is_err());
// Should fail due to depth even though no circular reference
assert!(validate_symlink_access(&visited, 6, "acc2", "cont2", "obj2").is_err());
}
}

View File

@@ -99,6 +99,7 @@ tower-http = { workspace = true, features = ["trace", "compression-full", "cors"
# Serialization and Data Formats
bytes = { workspace = true }
flatbuffers.workspace = true
walkdir = { workspace = true }
rmp-serde.workspace = true
rustfs-signer.workspace = true
serde.workspace = true

View File

@@ -15,6 +15,11 @@
//! Admin application use-case contracts.
use crate::app::context::{AppContext, get_global_app_context};
use crate::capacity::capacity_manager::{
DataSource, get_capacity_manager, get_enable_dynamic_timeout, get_follow_symlinks, get_max_files_threshold,
get_max_symlink_depth, get_max_timeout, get_min_timeout, get_sample_rate, get_stall_timeout, get_stat_timeout,
};
use crate::capacity::capacity_metrics::get_capacity_metrics;
use crate::error::ApiError;
use rustfs_common::data_usage::DataUsageInfo;
use rustfs_ecstore::admin_server_info::get_server_info;
@@ -25,8 +30,12 @@ use rustfs_ecstore::pools::{PoolStatus, get_total_usable_capacity, get_total_usa
use rustfs_ecstore::store_api::StorageAPI;
use rustfs_madmin::{InfoMessage, StorageInfo};
use s3s::S3ErrorCode;
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::{debug, error, info, warn};
use walkdir::WalkDir;
pub type AdminUsecaseResult<T> = Result<T, ApiError>;
@@ -57,6 +66,419 @@ pub struct QueryPoolStatusRequest {
pub by_id: bool,
}
/// Calculate actual used capacity of all data directories
pub(crate) async fn calculate_data_dir_used_capacity(
disks: &[rustfs_madmin::Disk],
) -> Result<u64, Box<dyn std::error::Error + Send + Sync>> {
let mut total_used = 0u64;
let mut has_failure = false;
let mut has_success = false;
for disk in disks {
let path = Path::new(&disk.drive_path);
// Check if path exists
if !path.exists() {
warn!("Data directory does not exist: {}", disk.drive_path);
has_failure = true;
continue;
}
// Asynchronously calculate directory size
match get_dir_size_async(path).await {
Ok(size) => {
debug!("Data directory {} size: {} bytes", disk.drive_path, size);
total_used += size;
has_success = true;
}
Err(e) => {
warn!("Failed to get size for directory {}: {:?}", disk.drive_path, e);
has_failure = true;
// Continue with other directories
}
}
}
// If all directories failed, return error to trigger fallback
if !has_success {
return Err("All directories failed to calculate size".into());
}
// Log warning if there were some failures
if has_failure {
warn!("Some directories failed to calculate size, result may be incomplete");
}
Ok(total_used)
}
// ============================================================================
// Symlink Tracker for Circular Reference Detection
// ============================================================================
/// Tracker for symlink resolution with circular reference detection
struct SymlinkTracker {
/// Set of visited symlink paths to detect circular references
visited: HashSet<PathBuf>,
/// Count of symlinks encountered
symlink_count: usize,
/// Total size of symlink targets
symlink_size: u64,
/// Maximum symlink depth to follow
max_depth: u8,
}
impl SymlinkTracker {
/// Create a new symlink tracker
fn new(max_depth: u8) -> Self {
Self {
visited: HashSet::new(),
symlink_count: 0,
symlink_size: 0,
max_depth,
}
}
/// Check if we should follow a symlink at the given depth
fn should_follow(&self, path: &Path, depth: u8) -> bool {
if depth >= self.max_depth {
debug!("Symlink depth limit reached: {} >= {}, not following {:?}", depth, self.max_depth, path);
return false;
}
if self.visited.contains(path) {
warn!("Circular symlink reference detected: {:?}, skipping", path);
return false;
}
true
}
/// Record a visited symlink path and update metrics
fn record_symlink(&mut self, path: PathBuf, size: u64) {
self.visited.insert(path);
self.symlink_count += 1;
self.symlink_size += size;
// Record to metrics
if let Ok(metrics) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(get_capacity_metrics)) {
metrics.record_symlink(size);
}
}
/// Get symlink statistics
fn get_stats(&self) -> (usize, u64) {
(self.symlink_count, self.symlink_size)
}
}
// ============================================================================
// Progress Monitor for Timeout and Stall Detection
// ============================================================================
/// Monitor for directory traversal progress with timeout and stall detection
struct ProgressMonitor {
/// Start time of the operation
start_time: Instant,
/// Last check time for stall detection
last_check: Instant,
/// Number of files processed at last checkpoint
last_checkpoint_files: usize,
/// Base timeout for this operation
timeout: Duration,
/// Minimum allowed timeout
min_timeout: Duration,
/// Maximum allowed timeout
max_timeout: Duration,
/// Stall detection timeout
stall_timeout: Duration,
/// Enable dynamic timeout calculation
enable_dynamic_timeout: bool,
/// Track if dynamic timeout was used
used_dynamic_timeout: bool,
}
impl ProgressMonitor {
/// Create a new progress monitor
fn new(
base_timeout: Duration,
min_timeout: Duration,
max_timeout: Duration,
stall_timeout: Duration,
enable_dynamic: bool,
) -> Self {
Self {
start_time: Instant::now(),
last_check: Instant::now(),
last_checkpoint_files: 0,
timeout: base_timeout,
min_timeout,
max_timeout,
stall_timeout,
enable_dynamic_timeout: enable_dynamic,
used_dynamic_timeout: false,
}
}
/// Calculate dynamic timeout based on directory characteristics
fn calculate_dynamic_timeout(&mut self, file_count: usize, avg_file_size: u64) -> Duration {
if !self.enable_dynamic_timeout {
return self.timeout;
}
// Mark that we're using dynamic timeout
self.used_dynamic_timeout = true;
// Calculate multipliers based on directory characteristics
let file_factor = (file_count as f64).sqrt() * 0.01; // File count influence
let size_factor = if avg_file_size > 0 {
(avg_file_size as f64).log(10.0) * 0.05 // File size influence
} else {
0.0
};
let multiplier = 1.0 + file_factor + size_factor;
let adjusted_timeout = self.timeout.mul_f64(multiplier.min(5.0)); // Max 5x multiplier
// Clamp to min/max bounds
let clamped_timeout = adjusted_timeout.max(self.min_timeout).min(self.max_timeout);
debug!(
"Dynamic timeout calculation: files={}, avg_size={}, multiplier={:.2}, base_timeout={:?}, adjusted_timeout={:?}, clamped_timeout={:?}",
file_count, avg_file_size, multiplier, self.timeout, adjusted_timeout, clamped_timeout
);
clamped_timeout
}
/// Update and check for timeout or stall
fn update_and_check_timeout(&mut self, files_processed: usize, avg_file_size: u64) -> Result<(), std::io::Error> {
let elapsed = self.start_time.elapsed();
// Calculate dynamic timeout based on current state
let dynamic_timeout = if self.enable_dynamic_timeout {
self.calculate_dynamic_timeout(files_processed, avg_file_size)
} else {
self.timeout
};
// Check for hard timeout
if elapsed >= dynamic_timeout {
warn!(
"Directory size calculation timeout after {} files, elapsed: {:?}, timeout: {:?}",
files_processed, elapsed, dynamic_timeout
);
// Record timeout to metrics
if let Ok(metrics) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(get_capacity_metrics))
&& self.used_dynamic_timeout
{
metrics.record_dynamic_timeout();
}
return Err(std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!("Timeout after {} files", files_processed),
));
}
// Check for stall (no progress)
let now = Instant::now();
if now.duration_since(self.last_check) >= self.stall_timeout {
let files_per_checkpoint = files_processed.saturating_sub(self.last_checkpoint_files);
if files_per_checkpoint == 0 && files_processed > 0 {
// No progress for stall_timeout duration
warn!(
"No progress detected for {:?}, possible stall at {} files",
self.stall_timeout, files_processed
);
// Record stall to metrics
if let Ok(metrics) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(get_capacity_metrics)) {
metrics.record_stall_detected();
}
return Err(std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!("Stall detected at {} files", files_processed),
));
}
self.last_check = now;
self.last_checkpoint_files = files_processed;
}
Ok(())
}
/// Record timeout fallback to sampling
fn record_timeout_fallback(&self) {
if let Ok(metrics) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(get_capacity_metrics)) {
metrics.record_timeout_fallback();
}
}
}
/// Asynchronously get directory size with enhanced symlink handling and dynamic timeout
async fn get_dir_size_async(path: &Path) -> Result<u64, std::io::Error> {
let path = path.to_path_buf();
// Get configuration values
let max_files_threshold = get_max_files_threshold();
let base_timeout = get_stat_timeout();
let min_timeout = get_min_timeout();
let max_timeout = get_max_timeout();
let stall_timeout = get_stall_timeout();
let sample_rate = get_sample_rate();
let enable_dynamic_timeout = get_enable_dynamic_timeout();
let follow_symlinks = get_follow_symlinks();
let max_symlink_depth = get_max_symlink_depth();
// Ensure sample_rate is never zero to avoid panics in is_multiple_of
let effective_sample_rate = if sample_rate == 0 {
warn!("Invalid sampling configuration: sample_rate=0. Clamping to 1 to avoid panic.");
1
} else {
sample_rate
};
// Check if path exists before traversing
if !path.exists() {
return Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("Directory not found: {:?}", path),
));
}
// Use tokio::task::spawn_blocking to avoid blocking the async runtime
tokio::task::spawn_blocking(move || {
let start_time = Instant::now();
let mut total_size = 0u64;
let mut file_count = 0usize;
let mut sampled_size = 0u64;
let mut sampled_count = 0usize;
// Initialize symlink tracker and progress monitor
let mut symlink_tracker = if follow_symlinks {
Some(SymlinkTracker::new(max_symlink_depth))
} else {
None
};
let mut progress_monitor =
ProgressMonitor::new(base_timeout, min_timeout, max_timeout, stall_timeout, enable_dynamic_timeout);
// Build WalkDir with appropriate settings
let mut walker_builder = WalkDir::new(&path);
if !follow_symlinks {
walker_builder = walker_builder.follow_links(false);
}
let walker = walker_builder.into_iter();
for entry_result in walker {
// Propagate traversal errors instead of silently dropping them
let entry = match entry_result {
Ok(entry) => entry,
Err(err) => {
warn!("Failed to traverse directory entry under {:?}: {}", path, err);
return Err(std::io::Error::other(err.to_string()));
}
};
// Get file metadata
let metadata = match entry.metadata() {
Ok(meta) => meta,
Err(err) => {
warn!("Failed to get metadata for {:?}: {}", entry.path(), err);
continue;
}
};
// Handle symlinks if enabled
if metadata.is_symlink() {
if let Some(ref mut tracker) = symlink_tracker
&& let Ok(target) = std::fs::read_link(entry.path())
&& tracker.should_follow(&target, 0)
{
tracker.record_symlink(target, metadata.len());
// Don't count symlink size itself, only target
continue;
}
// If not following symlinks, skip
continue;
}
// Only count file sizes, ignore directories
if !metadata.is_file() {
continue;
}
file_count += 1;
// Update progress and check for timeout/stall
let avg_size = if file_count > 0 { total_size / file_count as u64 } else { 0 };
if let Err(e) = progress_monitor.update_and_check_timeout(file_count, avg_size) {
// Timeout or stall detected
if sampled_count > 0 {
info!("Timeout/stall at {} files, using sampled estimate", file_count);
progress_monitor.record_timeout_fallback();
return Ok(sampled_size * file_count as u64 / sampled_count as u64);
}
return Err(e);
}
// When file count exceeds threshold, enable sampling
if file_count > max_files_threshold {
// Sampling: count 1 in every effective_sample_rate files
if file_count.is_multiple_of(effective_sample_rate) {
sampled_size += metadata.len();
sampled_count += 1;
}
// Log progress every 100k files
if file_count.is_multiple_of(100_000) {
debug!(
"Processed {} files, sampled {} files, size: {} bytes",
file_count, sampled_count, sampled_size
);
}
} else {
// Below threshold, full statistics
total_size += metadata.len();
}
}
// Report symlink statistics if tracking was enabled
if let Some(tracker) = symlink_tracker {
let (count, size) = tracker.get_stats();
if count > 0 {
info!("Symlink tracking: {} symlinks processed, total target size: {} bytes", count, size);
}
}
// If sampling was enabled, return estimated value
if file_count > max_files_threshold && sampled_count > 0 {
let estimated_size = sampled_size * file_count as u64 / sampled_count as u64;
info!(
"Large directory detected: {} files, estimated size: {} bytes (sampled {}/{} files)",
file_count, estimated_size, sampled_count, file_count
);
Ok(estimated_size)
} else {
debug!(
"Directory size calculation completed: {} files, {} bytes, took {:?}",
file_count,
total_size,
start_time.elapsed()
);
Ok(total_size)
}
})
.await
.map_err(std::io::Error::other)?
}
#[derive(Clone, Default)]
pub struct DefaultAdminUsecase {
context: Option<Arc<AppContext>>,
@@ -182,8 +604,85 @@ impl DefaultAdminUsecase {
info.total_free_capacity = free_u64;
}
info.total_used_capacity = info.total_capacity.saturating_sub(info.total_free_capacity);
// Use hybrid strategy for capacity calculation
let capacity_manager = get_capacity_manager();
// Check if we have a valid cache
if let Some(cached) = capacity_manager.get_capacity().await {
let cache_age = cached.last_update.elapsed();
let fast_update_threshold = capacity_manager.get_config().fast_update_threshold;
// If cache is fresh (< fast_update_threshold), use it directly
if cache_age < fast_update_threshold {
info.total_used_capacity = cached.total_used;
debug!(
"Using cached capacity: {} bytes (age: {:?}, source: {:?})",
cached.total_used, cache_age, cached.source
);
} else {
// Cache is stale, check if we need fast update
let needs_update = capacity_manager.needs_fast_update().await;
if needs_update {
// Fast update needed (recent writes or high frequency)
let start = Instant::now();
match calculate_data_dir_used_capacity(&storage_info.disks).await {
Ok(used_capacity) => {
info.total_used_capacity = used_capacity;
capacity_manager
.update_capacity(used_capacity, DataSource::WriteTriggered)
.await;
let elapsed = start.elapsed();
debug!("Fast capacity update completed in {:?}", elapsed);
}
Err(e) => {
warn!("Fast capacity update failed: {:?}, using cached value", e);
info.total_used_capacity = cached.total_used;
}
}
} else {
// Use stale cache and trigger background update (if not already in progress)
info.total_used_capacity = cached.total_used;
debug!("Using stale cache, background update will be triggered if not already in progress");
// Trigger background update only if not already in progress (prevent thundering herd)
if capacity_manager.try_start_background_update() {
let disks = storage_info.disks.clone();
let manager = capacity_manager.clone();
tokio::spawn(async move {
if let Ok(new_capacity) = calculate_data_dir_used_capacity(&disks).await {
manager.update_capacity(new_capacity, DataSource::Scheduled).await;
debug!("Background capacity update completed: {} bytes", new_capacity);
}
manager.complete_background_update();
});
} else {
debug!("Background update already in progress, skipping spawn");
}
}
}
} else {
// No cache, perform initial calculation
let start = Instant::now();
match calculate_data_dir_used_capacity(&storage_info.disks).await {
Ok(used_capacity) => {
info.total_used_capacity = used_capacity;
capacity_manager.update_capacity(used_capacity, DataSource::RealTime).await;
let elapsed = start.elapsed();
info!("Initial capacity calculation completed: {} bytes in {:?}", used_capacity, elapsed);
}
Err(e) => {
warn!(
"Failed to calculate data directory used capacity: {:?}, falling back to disk used capacity",
e
);
// Fallback: use disk used capacity
info.total_used_capacity = info.total_capacity.saturating_sub(info.total_free_capacity);
}
}
}
debug!(
"Capacity statistics: total={:.2} TiB, free={:.2} TiB, used={:.2} TiB",
info.total_capacity as f64 / (1024.0_f64.powi(4)),
@@ -272,6 +771,7 @@ impl DefaultAdminUsecase {
#[cfg(test)]
mod tests {
use super::*;
use serial_test::serial;
#[tokio::test]
async fn execute_query_storage_info_returns_internal_error_when_store_uninitialized() {
@@ -297,4 +797,79 @@ mod tests {
let _ = readiness.storage_ready;
let _ = readiness.iam_ready;
}
// Tests for directory size calculation functions
#[tokio::test]
async fn test_get_dir_size_async_empty_directory() {
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let size = get_dir_size_async(temp_dir.path()).await.unwrap();
assert_eq!(size, 0);
}
#[tokio::test]
async fn test_get_dir_size_async_single_file() {
use std::fs::File;
use std::io::Write;
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("test.txt");
let mut file = File::create(&file_path).unwrap();
file.write_all(b"Hello, World!").unwrap();
let size = get_dir_size_async(temp_dir.path()).await.unwrap();
assert_eq!(size, 13);
}
#[tokio::test]
async fn test_get_dir_size_async_multiple_files() {
use std::fs::File;
use std::io::Write;
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
// Create multiple files
for i in 0..10 {
let file_path = temp_dir.path().join(format!("file_{}.txt", i));
let mut file = File::create(&file_path).unwrap();
file.write_all(b"test").unwrap();
}
let size = get_dir_size_async(temp_dir.path()).await.unwrap();
assert_eq!(size, 40); // 10 files * 4 bytes
}
#[tokio::test]
async fn test_get_dir_size_async_nested_directories() {
use std::fs::File;
use std::io::Write;
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
// Create nested directories and files
let subdir = temp_dir.path().join("subdir");
std::fs::create_dir(&subdir).unwrap();
let file1 = temp_dir.path().join("file1.txt");
let mut f1 = File::create(&file1).unwrap();
f1.write_all(b"content1").unwrap();
let file2 = subdir.join("file2.txt");
let mut f2 = File::create(&file2).unwrap();
f2.write_all(b"content2").unwrap();
let size = get_dir_size_async(temp_dir.path()).await.unwrap();
assert_eq!(size, 16); // "content1" (8) + "content2" (8)
}
#[tokio::test]
#[serial]
async fn test_get_dir_size_async_nonexistent_directory() {
let result = get_dir_size_async(Path::new("/nonexistent/path")).await;
assert!(result.is_err());
}
}

View File

@@ -15,6 +15,7 @@
//! Object application use-case contracts.
use crate::app::context::{AppContext, default_notify_interface, get_global_app_context};
use crate::capacity::capacity_manager::get_capacity_manager;
use crate::config::RustFSBufferConfig;
use crate::error::ApiError;
use crate::storage::access::{PostObjectRequestMarker, authorize_request, has_bypass_governance_header, req_info_mut};
@@ -938,6 +939,9 @@ impl DefaultObjectUsecase {
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
// Record write operation for capacity management (inline to avoid per-request tokio::spawn overhead)
let manager = get_capacity_manager();
manager.record_write_operation().await;
result
}
@@ -3037,6 +3041,9 @@ impl DefaultObjectUsecase {
let result = Ok(S3Response::new(output));
let _ = helper.complete(&result);
// Record write operation for capacity management (inline to avoid per-request tokio::spawn overhead)
let manager = get_capacity_manager();
manager.record_write_operation().await;
result
}
@@ -3214,6 +3221,9 @@ impl DefaultObjectUsecase {
.version_id(version_id.map(|v| v.to_string()).unwrap_or_default());
let result = Ok(S3Response::new(output));
// Record write operation for capacity management (inline to avoid per-request tokio::spawn overhead)
let manager = get_capacity_manager();
manager.record_write_operation().await;
let _ = helper.complete(&result);
result
}

View File

@@ -0,0 +1,102 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Capacity management integration for application startup
use crate::capacity::capacity_manager::{DataSource, get_capacity_manager, start_background_task};
use crate::capacity::capacity_metrics::{get_capacity_metrics, start_metrics_logging};
use rustfs_ecstore::disk::DiskAPI;
use std::time::Duration;
use tracing::{info, warn};
/// Initialize capacity management system
/// This should be called during application startup after local disks are initialized
pub async fn init_capacity_management() {
info!("Initializing capacity management system...");
// Get all local disks
let disks = rustfs_ecstore::store::all_local_disk().await;
if disks.is_empty() {
warn!("No local disks found, capacity management will not run");
return;
}
info!("Found {} local disk(s)", disks.len());
// Convert DiskStore to Disk (for compatibility with capacity_manager)
let disk_refs: Vec<rustfs_madmin::Disk> = disks
.iter()
.map(|ds| rustfs_madmin::Disk {
endpoint: ds.endpoint().to_string(),
drive_path: ds.to_string(),
root_disk: true,
..Default::default()
})
.collect();
// Start background update task
info!("Starting background capacity update task...");
start_background_task(disk_refs).await;
// Start metrics logging (log every 10 minutes)
let metrics_interval = Duration::from_secs(600);
info!("Starting metrics logging task (interval: {:?})...", metrics_interval);
start_metrics_logging(metrics_interval).await;
info!("Capacity management system initialized successfully");
}
/// Get capacity statistics with metrics
#[allow(dead_code)]
pub async fn get_capacity_with_metrics() -> Option<(u64, String)> {
let manager = get_capacity_manager();
let metrics = get_capacity_metrics();
// Check cache
if let Some(cached) = manager.get_capacity().await {
metrics.record_cache_hit();
let source = match cached.source {
DataSource::RealTime => "real-time",
DataSource::Scheduled => "scheduled",
DataSource::WriteTriggered => "write-triggered",
DataSource::Fallback => "fallback",
};
return Some((cached.total_used, source.to_string()));
}
metrics.record_cache_miss();
None
}
#[cfg(test)]
mod tests {
use super::*;
use crate::capacity::capacity_manager::{DataSource, get_capacity_manager};
#[tokio::test]
async fn test_get_capacity_with_metrics() {
let manager = get_capacity_manager();
manager.update_capacity(1000, DataSource::RealTime).await;
let result = get_capacity_with_metrics().await;
assert!(result.is_some());
let (capacity, source) = result.unwrap();
assert_eq!(capacity, 1000);
assert_eq!(source, "real-time");
}
}

View File

@@ -0,0 +1,583 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Hybrid Capacity Manager for efficient capacity statistics
use crate::app::admin_usecase::calculate_data_dir_used_capacity;
use metrics::{counter, gauge};
use rustfs_config::{
DEFAULT_CAPACITY_ENABLE_DYNAMIC_TIMEOUT, DEFAULT_CAPACITY_FOLLOW_SYMLINKS, DEFAULT_CAPACITY_MAX_SYMLINK_DEPTH,
DEFAULT_CAPACITY_MAX_TIMEOUT_SECS, DEFAULT_CAPACITY_MIN_TIMEOUT_SECS, DEFAULT_CAPACITY_STALL_TIMEOUT_SECS,
DEFAULT_FAST_UPDATE_THRESHOLD_SECS, DEFAULT_MAX_FILES_THRESHOLD, DEFAULT_SAMPLE_RATE, DEFAULT_SCHEDULED_UPDATE_INTERVAL_SECS,
DEFAULT_STAT_TIMEOUT_SECS, DEFAULT_WRITE_FREQUENCY_THRESHOLD, DEFAULT_WRITE_TRIGGER_DELAY_SECS,
ENV_CAPACITY_ENABLE_DYNAMIC_TIMEOUT, ENV_CAPACITY_FAST_UPDATE_THRESHOLD, ENV_CAPACITY_FOLLOW_SYMLINKS,
ENV_CAPACITY_MAX_FILES_THRESHOLD, ENV_CAPACITY_MAX_SYMLINK_DEPTH, ENV_CAPACITY_MAX_TIMEOUT, ENV_CAPACITY_MIN_TIMEOUT,
ENV_CAPACITY_SAMPLE_RATE, ENV_CAPACITY_SCHEDULED_INTERVAL, ENV_CAPACITY_STALL_TIMEOUT, ENV_CAPACITY_STAT_TIMEOUT,
ENV_CAPACITY_WRITE_FREQUENCY_THRESHOLD, ENV_CAPACITY_WRITE_TRIGGER_DELAY,
};
use rustfs_utils::{get_env_bool, get_env_u64, get_env_usize};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tracing::{debug, error, info, warn};
// ============================================================================
// Configuration Functions
// ============================================================================
/// Get scheduled update interval from environment or default
pub fn get_scheduled_update_interval() -> Duration {
Duration::from_secs(get_env_u64(ENV_CAPACITY_SCHEDULED_INTERVAL, DEFAULT_SCHEDULED_UPDATE_INTERVAL_SECS))
}
/// Get write trigger delay from environment or default
pub fn get_write_trigger_delay() -> Duration {
Duration::from_secs(get_env_u64(ENV_CAPACITY_WRITE_TRIGGER_DELAY, DEFAULT_WRITE_TRIGGER_DELAY_SECS))
}
/// Get write frequency threshold from environment or default
pub fn get_write_frequency_threshold() -> usize {
get_env_usize(ENV_CAPACITY_WRITE_FREQUENCY_THRESHOLD, DEFAULT_WRITE_FREQUENCY_THRESHOLD)
}
/// Get fast update threshold from environment or default
pub fn get_fast_update_threshold() -> Duration {
Duration::from_secs(get_env_u64(ENV_CAPACITY_FAST_UPDATE_THRESHOLD, DEFAULT_FAST_UPDATE_THRESHOLD_SECS))
}
/// Get max files threshold from environment or default
pub fn get_max_files_threshold() -> usize {
get_env_usize(ENV_CAPACITY_MAX_FILES_THRESHOLD, DEFAULT_MAX_FILES_THRESHOLD)
}
/// Get stat timeout from environment or default
pub fn get_stat_timeout() -> Duration {
Duration::from_secs(get_env_u64(ENV_CAPACITY_STAT_TIMEOUT, DEFAULT_STAT_TIMEOUT_SECS))
}
/// Get sample rate from environment or default
pub fn get_sample_rate() -> usize {
get_env_usize(ENV_CAPACITY_SAMPLE_RATE, DEFAULT_SAMPLE_RATE)
}
/// Get follow symlinks flag from environment or default
pub fn get_follow_symlinks() -> bool {
get_env_bool(ENV_CAPACITY_FOLLOW_SYMLINKS, DEFAULT_CAPACITY_FOLLOW_SYMLINKS)
}
/// Get max symlink depth from environment or default
pub fn get_max_symlink_depth() -> u8 {
get_env_u64(ENV_CAPACITY_MAX_SYMLINK_DEPTH, DEFAULT_CAPACITY_MAX_SYMLINK_DEPTH as u64) as u8
}
/// Get enable dynamic timeout flag from environment or default
pub fn get_enable_dynamic_timeout() -> bool {
get_env_bool(ENV_CAPACITY_ENABLE_DYNAMIC_TIMEOUT, DEFAULT_CAPACITY_ENABLE_DYNAMIC_TIMEOUT)
}
/// Get min timeout from environment or default
pub fn get_min_timeout() -> Duration {
Duration::from_secs(get_env_u64(ENV_CAPACITY_MIN_TIMEOUT, DEFAULT_CAPACITY_MIN_TIMEOUT_SECS))
}
/// Get max timeout from environment or default
pub fn get_max_timeout() -> Duration {
Duration::from_secs(get_env_u64(ENV_CAPACITY_MAX_TIMEOUT, DEFAULT_CAPACITY_MAX_TIMEOUT_SECS))
}
/// Get stall timeout from environment or default
pub fn get_stall_timeout() -> Duration {
Duration::from_secs(get_env_u64(ENV_CAPACITY_STALL_TIMEOUT, DEFAULT_CAPACITY_STALL_TIMEOUT_SECS))
}
// ============================================================================
// Data Structures
// ============================================================================
/// Cached capacity data
#[derive(Clone, Debug)]
#[allow(dead_code)]
pub struct CachedCapacity {
/// Total used capacity in bytes
pub total_used: u64,
/// Last update time
pub last_update: Instant,
/// File count (optional)
pub file_count: usize,
/// Whether it's an estimated value
pub is_estimated: bool,
/// Data source
pub source: DataSource,
}
#[derive(Clone, Debug, PartialEq, Copy, Eq)]
#[allow(dead_code)]
pub enum DataSource {
/// Real-time statistics
RealTime,
/// Scheduled update
Scheduled,
/// Write triggered
WriteTriggered,
/// Fallback value
Fallback,
}
/// Write record for tracking write operations
#[derive(Debug)]
pub struct WriteRecord {
/// Last write time
pub last_write_time: Instant,
/// Write count
pub write_count: usize,
/// Write time window (for frequency calculation)
pub write_window: Vec<Instant>,
}
/// Hybrid strategy configuration
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub struct HybridStrategyConfig {
/// Scheduled update interval
pub scheduled_update_interval: Duration,
/// Write trigger delay
pub write_trigger_delay: Duration,
/// Write frequency threshold (writes/minute)
pub write_frequency_threshold: usize,
/// Fast update threshold
pub fast_update_threshold: Duration,
/// Enable smart update
pub enable_smart_update: bool,
/// Enable write trigger
pub enable_write_trigger: bool,
}
impl Default for HybridStrategyConfig {
fn default() -> Self {
Self {
scheduled_update_interval: get_scheduled_update_interval(),
write_trigger_delay: get_write_trigger_delay(),
write_frequency_threshold: get_write_frequency_threshold(),
fast_update_threshold: get_fast_update_threshold(),
enable_smart_update: true,
enable_write_trigger: true,
}
}
}
impl HybridStrategyConfig {
/// Create config from environment variables
pub fn from_env() -> Self {
Self::default()
}
}
// ============================================================================
// Hybrid Capacity Manager
// ============================================================================
/// Hybrid capacity manager
pub struct HybridCapacityManager {
/// Capacity cache
cache: Arc<RwLock<Option<CachedCapacity>>>,
/// Write record
write_record: Arc<RwLock<WriteRecord>>,
/// Configuration
config: HybridStrategyConfig,
/// Background update in progress flag
update_in_progress: Arc<AtomicBool>,
}
impl HybridCapacityManager {
/// Create a new hybrid capacity manager
pub fn new(config: HybridStrategyConfig) -> Self {
Self {
cache: Arc::new(RwLock::new(None)),
write_record: Arc::new(RwLock::new(WriteRecord {
last_write_time: Instant::now(),
write_count: 0,
write_window: Vec::new(),
})),
config,
update_in_progress: Arc::new(AtomicBool::new(false)),
}
}
/// Create with default config from environment
pub fn from_env() -> Self {
Self::new(HybridStrategyConfig::from_env())
}
/// Get capacity (core method)
pub async fn get_capacity(&self) -> Option<CachedCapacity> {
let cache = self.cache.read().await;
cache.clone()
}
/// Update capacity
pub async fn update_capacity(&self, capacity: u64, source: DataSource) {
let mut cache = self.cache.write().await;
*cache = Some(CachedCapacity {
total_used: capacity,
last_update: Instant::now(),
file_count: 0,
is_estimated: false,
source,
});
debug!("Capacity updated: {} bytes, source: {:?}", capacity, source);
// Update metrics
gauge!("rustfs.capacity.current").set(capacity as f64);
match source {
DataSource::RealTime => counter!("rustfs.capacity.update.realtime").increment(1),
DataSource::Scheduled => counter!("rustfs.capacity.update.scheduled").increment(1),
DataSource::WriteTriggered => counter!("rustfs.capacity.update.write_triggered").increment(1),
DataSource::Fallback => counter!("rustfs.capacity.update.fallback").increment(1),
}
}
/// Record write operation
pub async fn record_write_operation(&self) {
let mut record = self.write_record.write().await;
record.last_write_time = Instant::now();
record.write_count += 1;
// Maintain write time window (keep last 1 minute)
// Cap the window size to prevent unbounded memory growth at high write rates
const MAX_WRITE_WINDOW_SIZE: usize = 10000;
let now = Instant::now();
record
.write_window
.retain(|&t| now.duration_since(t) < Duration::from_secs(60));
// Only push if under the cap to prevent unbounded growth
if record.write_window.len() < MAX_WRITE_WINDOW_SIZE {
record.write_window.push(now);
}
counter!("rustfs.capacity.write.operations").increment(1);
gauge!("rustfs.capacity.write.frequency").set(record.write_window.len() as f64);
debug!(
"Write operation recorded: total writes = {}, recent writes = {}",
record.write_count,
record.write_window.len()
);
}
/// Check if fast update is needed
pub async fn needs_fast_update(&self) -> bool {
if !self.config.enable_smart_update {
return false;
}
let cache = self.cache.read().await;
if let Some(cached) = cache.as_ref() {
let cache_age = cached.last_update.elapsed();
// Cache is fresh, no need to update
if cache_age < self.config.fast_update_threshold {
return false;
}
let write_record = self.write_record.read().await;
let time_since_write = write_record.last_write_time.elapsed();
// Recent write, trigger fast update
if time_since_write < self.config.fast_update_threshold {
debug!("Recent write detected ({:?} ago), needs fast update", time_since_write);
return true;
}
// High write frequency, trigger update
let write_frequency = write_record.write_window.len();
if write_frequency > self.config.write_frequency_threshold {
debug!("High write frequency detected ({} writes/min), needs fast update", write_frequency);
return true;
}
}
false
}
/// Get cache age
#[allow(dead_code)]
pub async fn get_cache_age(&self) -> Option<Duration> {
let cache = self.cache.read().await;
cache.as_ref().map(|c| c.last_update.elapsed())
}
/// Get write frequency (writes/minute)
#[allow(dead_code)]
pub async fn get_write_frequency(&self) -> usize {
let record = self.write_record.read().await;
record.write_window.len()
}
/// Get config
pub fn get_config(&self) -> &HybridStrategyConfig {
&self.config
}
/// Try to start a background update, returns true if update was started (false if already in progress)
pub fn try_start_background_update(&self) -> bool {
self.update_in_progress
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
}
/// Mark background update as complete
pub fn complete_background_update(&self) {
self.update_in_progress.store(false, Ordering::Release);
}
}
/// Global capacity manager instance
static CAPACITY_MANAGER: std::sync::OnceLock<Arc<HybridCapacityManager>> = std::sync::OnceLock::new();
/// Get or initialize the global capacity manager
pub fn get_capacity_manager() -> Arc<HybridCapacityManager> {
CAPACITY_MANAGER
.get_or_init(|| Arc::new(HybridCapacityManager::from_env()))
.clone()
}
/// Start background update task
pub async fn start_background_task(disks: Vec<rustfs_madmin::Disk>) {
let manager = get_capacity_manager();
let mut interval = manager.get_config().scheduled_update_interval;
// Prevent panic in tokio::time::interval when misconfigured to 0
if interval.is_zero() {
warn!("RUSTFS_CAPACITY_SCHEDULED_INTERVAL is configured as 0; clamping to 1s to avoid panic");
interval = Duration::from_secs(1);
}
tokio::spawn(async move {
let mut timer = tokio::time::interval(interval);
loop {
timer.tick().await;
info!("Starting scheduled capacity update");
let start = Instant::now();
// Import the calculate function
match calculate_data_dir_used_capacity(&disks).await {
Ok(new_capacity) => {
let elapsed = start.elapsed();
info!("Scheduled update completed: {} bytes in {:?}", new_capacity, elapsed);
manager.update_capacity(new_capacity, DataSource::Scheduled).await;
}
Err(e) => {
error!("Scheduled update failed: {:?}", e);
}
}
}
});
}
// ============================================================================
// Tests
// ============================================================================
#[cfg(test)]
mod tests {
use super::*;
use rustfs_config::{
ENV_CAPACITY_FAST_UPDATE_THRESHOLD, ENV_CAPACITY_MAX_FILES_THRESHOLD, ENV_CAPACITY_SAMPLE_RATE,
ENV_CAPACITY_STAT_TIMEOUT, ENV_CAPACITY_WRITE_FREQUENCY_THRESHOLD, ENV_CAPACITY_WRITE_TRIGGER_DELAY,
};
use serial_test::serial;
#[test]
#[serial]
fn test_get_scheduled_update_interval() {
let interval = get_scheduled_update_interval();
assert_eq!(interval, Duration::from_secs(300));
}
#[test]
#[serial]
fn test_get_write_trigger_delay() {
let delay = get_write_trigger_delay();
assert_eq!(delay, Duration::from_secs(10));
}
#[test]
#[serial]
fn test_get_write_frequency_threshold() {
let threshold = get_write_frequency_threshold();
assert_eq!(threshold, 10);
}
#[test]
#[serial]
fn test_get_fast_update_threshold() {
let threshold = get_fast_update_threshold();
assert_eq!(threshold, Duration::from_secs(60));
}
#[test]
#[serial]
fn test_get_max_files_threshold() {
let threshold = get_max_files_threshold();
assert_eq!(threshold, 1_000_000);
}
#[test]
#[serial]
fn test_get_stat_timeout() {
let timeout = get_stat_timeout();
assert_eq!(timeout, Duration::from_secs(5));
}
#[test]
#[serial]
fn test_get_sample_rate() {
let rate = get_sample_rate();
assert_eq!(rate, 100);
}
#[test]
#[serial]
fn test_env_var_override_scheduled_interval() {
temp_env::with_var(ENV_CAPACITY_SCHEDULED_INTERVAL, Some("600"), || {
let interval = get_scheduled_update_interval();
assert_eq!(interval, Duration::from_secs(600));
});
}
#[test]
#[serial]
fn test_env_var_override_write_trigger_delay() {
temp_env::with_var(ENV_CAPACITY_WRITE_TRIGGER_DELAY, Some("20"), || {
let delay = get_write_trigger_delay();
assert_eq!(delay, Duration::from_secs(20));
});
}
#[test]
#[serial]
fn test_env_var_override_write_frequency_threshold() {
temp_env::with_var(ENV_CAPACITY_WRITE_FREQUENCY_THRESHOLD, Some("20"), || {
let threshold = get_write_frequency_threshold();
assert_eq!(threshold, 20);
});
}
#[test]
#[serial]
fn test_env_var_override_fast_update_threshold() {
temp_env::with_var(ENV_CAPACITY_FAST_UPDATE_THRESHOLD, Some("120"), || {
let threshold = get_fast_update_threshold();
assert_eq!(threshold, Duration::from_secs(120));
});
}
#[test]
#[serial]
fn test_env_var_override_max_files_threshold() {
temp_env::with_var(ENV_CAPACITY_MAX_FILES_THRESHOLD, Some("2000000"), || {
let threshold = get_max_files_threshold();
assert_eq!(threshold, 2_000_000);
});
}
#[test]
#[serial]
fn test_env_var_override_stat_timeout() {
temp_env::with_var(ENV_CAPACITY_STAT_TIMEOUT, Some("10"), || {
let timeout = get_stat_timeout();
assert_eq!(timeout, Duration::from_secs(10));
});
}
#[test]
#[serial]
fn test_env_var_override_sample_rate() {
temp_env::with_var(ENV_CAPACITY_SAMPLE_RATE, Some("200"), || {
let rate = get_sample_rate();
assert_eq!(rate, 200);
});
}
#[tokio::test]
#[serial]
async fn test_capacity_manager_creation() {
let config = HybridStrategyConfig::default();
let manager = HybridCapacityManager::new(config);
assert!(manager.get_capacity().await.is_none());
}
#[tokio::test]
#[serial]
async fn test_update_capacity() {
let manager = HybridCapacityManager::from_env();
manager.update_capacity(1000, DataSource::RealTime).await;
let cached = manager.get_capacity().await;
assert!(cached.is_some());
assert_eq!(cached.unwrap().total_used, 1000);
}
#[tokio::test]
#[serial]
async fn test_record_write_operation() {
let manager = HybridCapacityManager::from_env();
manager.record_write_operation().await;
let frequency = manager.get_write_frequency().await;
assert_eq!(frequency, 1);
}
#[tokio::test]
#[serial]
async fn test_needs_fast_update() {
let manager = HybridCapacityManager::from_env();
// No cache, should not need update
assert!(!manager.needs_fast_update().await);
// Update cache
manager.update_capacity(1000, DataSource::RealTime).await;
// Fresh cache, should not need update
assert!(!manager.needs_fast_update().await);
}
#[tokio::test]
#[serial]
async fn test_config_from_env() {
let config = HybridStrategyConfig::from_env();
// Check default values
assert_eq!(config.scheduled_update_interval, Duration::from_secs(300));
assert_eq!(config.write_trigger_delay, Duration::from_secs(10));
assert_eq!(config.write_frequency_threshold, 10);
assert_eq!(config.fast_update_threshold, Duration::from_secs(60));
assert!(config.enable_smart_update);
assert!(config.enable_write_trigger);
}
#[tokio::test]
#[serial]
async fn test_config_from_env_with_override() {
temp_env::with_var(ENV_CAPACITY_SCHEDULED_INTERVAL, Some("600"), || {
let config = HybridStrategyConfig::from_env();
assert_eq!(config.scheduled_update_interval, Duration::from_secs(600));
});
}
}

View File

@@ -0,0 +1,212 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Comprehensive tests for Hybrid Capacity Manager
#[cfg(test)]
mod tests {
use crate::capacity::capacity_manager::{DataSource, HybridCapacityManager, HybridStrategyConfig};
use serial_test::serial;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
#[tokio::test]
#[serial]
async fn test_capacity_manager_initialization() {
let manager = HybridCapacityManager::from_env();
assert!(manager.get_capacity().await.is_none());
}
#[tokio::test]
async fn test_capacity_update_and_retrieval() {
let manager = HybridCapacityManager::from_env();
// Initially no cache
assert!(manager.get_capacity().await.is_none());
// Update capacity
manager.update_capacity(1000, DataSource::RealTime).await;
// Retrieve cached value
let cached = manager.get_capacity().await;
assert!(cached.is_some());
let cached = cached.unwrap();
assert_eq!(cached.total_used, 1000);
assert_eq!(cached.source, DataSource::RealTime);
assert!(!cached.is_estimated);
}
#[tokio::test]
async fn test_write_operation_recording() {
let manager = HybridCapacityManager::from_env();
// Record multiple write operations
manager.record_write_operation().await;
manager.record_write_operation().await;
manager.record_write_operation().await;
let frequency = manager.get_write_frequency().await;
assert_eq!(frequency, 3);
}
#[tokio::test]
async fn test_fast_update_detection() {
let manager = HybridCapacityManager::from_env();
// No cache, should not need fast update
assert!(!manager.needs_fast_update().await);
// Update cache
manager.update_capacity(1000, DataSource::RealTime).await;
// Fresh cache, should not need fast update
assert!(!manager.needs_fast_update().await);
// Record write operation
manager.record_write_operation().await;
// Wait for cache to become stale
sleep(Duration::from_millis(100)).await;
// Now cache is stale and there's recent write
// Note: This might not trigger due to timing, so we just check it doesn't panic
let _needs_update = manager.needs_fast_update().await;
}
#[tokio::test]
async fn test_cache_age_tracking() {
let manager = HybridCapacityManager::from_env();
// No cache, age should be None
assert!(manager.get_cache_age().await.is_none());
// Update cache
manager.update_capacity(1000, DataSource::RealTime).await;
// Check cache age
let age = manager.get_cache_age().await;
assert!(age.is_some());
let age = age.unwrap();
assert!(age < Duration::from_secs(1));
// Wait a bit
sleep(Duration::from_millis(100)).await;
// Check age again
let age = manager.get_cache_age().await.unwrap();
assert!(age >= Duration::from_millis(100));
}
#[tokio::test]
async fn test_data_source_tracking() {
let manager = HybridCapacityManager::from_env();
// Test different data sources
let sources = vec![
DataSource::RealTime,
DataSource::Scheduled,
DataSource::WriteTriggered,
DataSource::Fallback,
];
for source in sources {
manager.update_capacity(1000, source).await;
let cached = manager.get_capacity().await.unwrap();
assert_eq!(cached.source, source);
}
}
#[tokio::test]
async fn test_config_from_env() {
let config = HybridStrategyConfig::from_env();
// Check default values
assert_eq!(config.scheduled_update_interval, Duration::from_secs(300));
assert_eq!(config.write_trigger_delay, Duration::from_secs(10));
assert_eq!(config.write_frequency_threshold, 10);
assert_eq!(config.fast_update_threshold, Duration::from_secs(60));
assert!(config.enable_smart_update);
assert!(config.enable_write_trigger);
}
#[tokio::test]
async fn test_write_frequency_window() {
let manager = HybridCapacityManager::from_env();
// Record many write operations
for _ in 0..20 {
manager.record_write_operation().await;
}
// Check frequency (should be 20 since all are within 1 minute)
let frequency = manager.get_write_frequency().await;
assert_eq!(frequency, 20);
// Note: In a real test, we would wait for the window to expire
// and verify that old writes are removed
}
#[tokio::test]
#[serial]
async fn test_concurrent_access() {
let manager = Arc::new(HybridCapacityManager::from_env());
// Simulate concurrent updates
let mut handles = vec![];
for i in 0..10 {
let mgr = manager.clone();
let handle = tokio::spawn(async move {
mgr.update_capacity(i as u64 * 100, DataSource::RealTime).await;
mgr.record_write_operation().await;
});
handles.push(handle);
}
// Wait for all tasks to complete
for handle in handles {
handle.await.unwrap();
}
// Verify final state
let cached = manager.get_capacity().await;
assert!(cached.is_some());
let frequency = manager.get_write_frequency().await;
assert_eq!(frequency, 10);
}
#[tokio::test]
#[serial]
async fn test_performance_overhead() {
let manager = Arc::new(HybridCapacityManager::from_env());
// Measure time for 1000 operations
let start = std::time::Instant::now();
for i in 0..1000 {
manager.update_capacity(i as u64, DataSource::RealTime).await;
manager.record_write_operation().await;
let _ = manager.get_capacity().await;
}
let elapsed = start.elapsed();
// Should complete in less than 1 second
assert!(elapsed < Duration::from_secs(1));
println!("1000 operations completed in {:?}", elapsed);
}
}

View File

@@ -0,0 +1,379 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Capacity Metrics for monitoring
use metrics::{counter, gauge, histogram};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use tracing::info;
/// Capacity metrics for monitoring
#[derive(Debug, Default)]
pub struct CapacityMetrics {
/// Cache hit count
pub cache_hits: AtomicU64,
/// Cache miss count
pub cache_misses: AtomicU64,
/// Scheduled update count
pub scheduled_updates: AtomicU64,
/// Write triggered update count
pub write_triggered_updates: AtomicU64,
/// Update failure count
pub update_failures: AtomicU64,
/// Total update duration in microseconds
pub total_update_duration_us: AtomicU64,
/// Update count for average calculation
pub update_count: AtomicU64,
/// Symlink count encountered during capacity calculation
pub symlink_count: AtomicU64,
/// Total size of symlink targets
pub symlink_size: AtomicU64,
/// Dynamic timeout usage count
pub dynamic_timeout_count: AtomicU64,
/// Timeout fallback to sampling count
pub timeout_fallback_count: AtomicU64,
/// Stall detection count
pub stall_detected_count: AtomicU64,
}
impl CapacityMetrics {
/// Create new metrics
pub fn new() -> Self {
Self::default()
}
/// Record cache hit
pub fn record_cache_hit(&self) {
self.cache_hits.fetch_add(1, Ordering::Relaxed);
counter!("rustfs.capacity.cache.hits").increment(1);
}
/// Record cache miss
pub fn record_cache_miss(&self) {
self.cache_misses.fetch_add(1, Ordering::Relaxed);
counter!("rustfs.capacity.cache.misses").increment(1);
}
/// Record scheduled update
#[allow(dead_code)]
pub fn record_scheduled_update(&self) {
self.scheduled_updates.fetch_add(1, Ordering::Relaxed);
counter!("rustfs.capacity.update.scheduled").increment(1);
}
/// Record write triggered update
#[allow(dead_code)]
pub fn record_write_triggered_update(&self) {
self.write_triggered_updates.fetch_add(1, Ordering::Relaxed);
counter!("rustfs.capacity.update.write_triggered").increment(1);
}
/// Record update failure
#[allow(dead_code)]
pub fn record_update_failure(&self) {
self.update_failures.fetch_add(1, Ordering::Relaxed);
counter!("rustfs.capacity.update.failures").increment(1);
}
/// Record write operation
#[allow(dead_code)]
pub fn record_write_operation(&self) {
counter!("rustfs.capacity.write.operations").increment(1);
}
/// Record symlink encountered
pub fn record_symlink(&self, size: u64) {
self.symlink_count.fetch_add(1, Ordering::Relaxed);
self.symlink_size.fetch_add(size, Ordering::Relaxed);
counter!("rustfs.capacity.symlinks.encountered").increment(1);
gauge!("rustfs.capacity.symlinks.total_size").set(size as f64);
}
/// Record dynamic timeout usage
pub fn record_dynamic_timeout(&self) {
self.dynamic_timeout_count.fetch_add(1, Ordering::Relaxed);
counter!("rustfs.capacity.timeout.dynamic").increment(1);
}
/// Record timeout fallback to sampling
pub fn record_timeout_fallback(&self) {
self.timeout_fallback_count.fetch_add(1, Ordering::Relaxed);
counter!("rustfs.capacity.timeout.fallback").increment(1);
}
/// Record stall detection
pub fn record_stall_detected(&self) {
self.stall_detected_count.fetch_add(1, Ordering::Relaxed);
counter!("rustfs.capacity.timeout.stall").increment(1);
}
/// Get symlink statistics
#[allow(dead_code)]
pub fn get_symlink_stats(&self) -> (u64, u64) {
(self.symlink_count.load(Ordering::Relaxed), self.symlink_size.load(Ordering::Relaxed))
}
/// Get timeout statistics
#[allow(dead_code)]
pub fn get_timeout_stats(&self) -> (u64, u64, u64) {
(
self.dynamic_timeout_count.load(Ordering::Relaxed),
self.timeout_fallback_count.load(Ordering::Relaxed),
self.stall_detected_count.load(Ordering::Relaxed),
)
}
/// Record update duration
#[allow(dead_code)]
pub fn record_update_duration(&self, duration: Duration) {
let duration_us = duration.as_micros() as u64;
self.total_update_duration_us.fetch_add(duration_us, Ordering::Relaxed);
self.update_count.fetch_add(1, Ordering::Relaxed);
histogram!("rustfs.capacity.update.duration_us").record(duration_us as f64);
}
/// Get cache hit rate
pub fn get_cache_hit_rate(&self) -> f64 {
let hits = self.cache_hits.load(Ordering::Relaxed);
let misses = self.cache_misses.load(Ordering::Relaxed);
let total = hits + misses;
if total == 0 { 0.0 } else { hits as f64 / total as f64 }
}
/// Get average update duration
pub fn get_avg_update_duration(&self) -> Duration {
let total_us = self.total_update_duration_us.load(Ordering::Relaxed);
let count = self.update_count.load(Ordering::Relaxed);
if count == 0 {
Duration::from_secs(0)
} else {
Duration::from_micros(total_us / count)
}
}
/// Get metrics summary
pub fn get_summary(&self) -> MetricsSummary {
MetricsSummary {
cache_hits: self.cache_hits.load(Ordering::Relaxed),
cache_misses: self.cache_misses.load(Ordering::Relaxed),
cache_hit_rate: self.get_cache_hit_rate(),
scheduled_updates: self.scheduled_updates.load(Ordering::Relaxed),
write_triggered_updates: self.write_triggered_updates.load(Ordering::Relaxed),
update_failures: self.update_failures.load(Ordering::Relaxed),
avg_update_duration: self.get_avg_update_duration(),
symlink_count: self.symlink_count.load(Ordering::Relaxed),
symlink_size: self.symlink_size.load(Ordering::Relaxed),
dynamic_timeout_count: self.dynamic_timeout_count.load(Ordering::Relaxed),
timeout_fallback_count: self.timeout_fallback_count.load(Ordering::Relaxed),
stall_detected_count: self.stall_detected_count.load(Ordering::Relaxed),
}
}
/// Log metrics summary
pub fn log_summary(&self) {
let summary = self.get_summary();
// Update gauges for current values
gauge!("rustfs.capacity.cache.hit_rate").set(summary.cache_hit_rate);
gauge!("rustfs.capacity.cache.hits_total").set(summary.cache_hits as f64);
gauge!("rustfs.capacity.cache.misses_total").set(summary.cache_misses as f64);
gauge!("rustfs.capacity.update.scheduled_total").set(summary.scheduled_updates as f64);
gauge!("rustfs.capacity.update.write_triggered_total").set(summary.write_triggered_updates as f64);
gauge!("rustfs.capacity.update.failures_total").set(summary.update_failures as f64);
gauge!("rustfs.capacity.symlinks.count").set(summary.symlink_count as f64);
gauge!("rustfs.capacity.symlinks.size").set(summary.symlink_size as f64);
gauge!("rustfs.capacity.timeout.dynamic_total").set(summary.dynamic_timeout_count as f64);
gauge!("rustfs.capacity.timeout.fallback_total").set(summary.timeout_fallback_count as f64);
gauge!("rustfs.capacity.timeout.stall_total").set(summary.stall_detected_count as f64);
info!(
"Capacity Metrics: cache_hit_rate={:.2}%, cache_hits={}, cache_misses={}, scheduled_updates={}, write_triggered_updates={}, update_failures={}, avg_update_duration={:?}, symlinks={}, symlink_size={}, dynamic_timeouts={}, timeout_fallbacks={}, stalls={}",
summary.cache_hit_rate * 100.0,
summary.cache_hits,
summary.cache_misses,
summary.scheduled_updates,
summary.write_triggered_updates,
summary.update_failures,
summary.avg_update_duration,
summary.symlink_count,
summary.symlink_size,
summary.dynamic_timeout_count,
summary.timeout_fallback_count,
summary.stall_detected_count
);
}
}
/// Metrics summary
#[derive(Debug, Clone)]
pub struct MetricsSummary {
pub cache_hits: u64,
pub cache_misses: u64,
pub cache_hit_rate: f64,
pub scheduled_updates: u64,
pub write_triggered_updates: u64,
pub update_failures: u64,
pub avg_update_duration: Duration,
pub symlink_count: u64,
pub symlink_size: u64,
pub dynamic_timeout_count: u64,
pub timeout_fallback_count: u64,
pub stall_detected_count: u64,
}
/// Global metrics instance
static CAPACITY_METRICS: std::sync::OnceLock<Arc<CapacityMetrics>> = std::sync::OnceLock::new();
/// Get global metrics
pub fn get_capacity_metrics() -> Arc<CapacityMetrics> {
CAPACITY_METRICS.get_or_init(|| Arc::new(CapacityMetrics::new())).clone()
}
/// Start metrics logging task
pub async fn start_metrics_logging(interval: Duration) {
let metrics = get_capacity_metrics();
tokio::spawn(async move {
let mut timer = tokio::time::interval(interval);
loop {
timer.tick().await;
metrics.log_summary();
}
});
}
/// Record a write operation globally
#[allow(dead_code)]
pub fn record_global_write_operation() {
let metrics = get_capacity_metrics();
metrics.record_write_operation();
}
/// Record cache hit globally
#[allow(dead_code)]
pub fn record_global_cache_hit() {
let metrics = get_capacity_metrics();
metrics.record_cache_hit();
}
/// Record cache miss globally
#[allow(dead_code)]
pub fn record_global_cache_miss() {
let metrics = get_capacity_metrics();
metrics.record_cache_miss();
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_metrics_creation() {
let metrics = CapacityMetrics::new();
assert_eq!(metrics.cache_hits.load(Ordering::Relaxed), 0);
assert_eq!(metrics.cache_misses.load(Ordering::Relaxed), 0);
}
#[test]
fn test_record_cache_hit() {
let metrics = CapacityMetrics::new();
metrics.record_cache_hit();
metrics.record_cache_hit();
assert_eq!(metrics.cache_hits.load(Ordering::Relaxed), 2);
}
#[test]
fn test_cache_hit_rate() {
let metrics = CapacityMetrics::new();
metrics.record_cache_hit();
metrics.record_cache_hit();
metrics.record_cache_miss();
let rate = metrics.get_cache_hit_rate();
assert!((rate - 0.6666666666666666).abs() < 0.0001);
}
#[test]
fn test_avg_update_duration() {
let metrics = CapacityMetrics::new();
metrics.record_update_duration(Duration::from_millis(100));
metrics.record_update_duration(Duration::from_millis(200));
let avg = metrics.get_avg_update_duration();
assert_eq!(avg, Duration::from_millis(150));
}
#[test]
fn test_get_summary() {
let metrics = CapacityMetrics::new();
metrics.record_cache_hit();
metrics.record_scheduled_update();
metrics.record_update_duration(Duration::from_millis(100));
let summary = metrics.get_summary();
assert_eq!(summary.cache_hits, 1);
assert_eq!(summary.scheduled_updates, 1);
assert_eq!(summary.avg_update_duration, Duration::from_millis(100));
assert_eq!(summary.symlink_count, 0);
assert_eq!(summary.dynamic_timeout_count, 0);
}
#[test]
fn test_record_write_operation() {
let metrics = CapacityMetrics::new();
metrics.record_write_operation();
metrics.record_write_operation();
// This test just ensures the method doesn't panic
assert_eq!(metrics.write_triggered_updates.load(Ordering::Relaxed), 0);
}
#[test]
fn test_record_symlink() {
let metrics = CapacityMetrics::new();
metrics.record_symlink(1024);
metrics.record_symlink(2048);
let (count, size) = metrics.get_symlink_stats();
assert_eq!(count, 2);
assert_eq!(size, 3072);
}
#[test]
fn test_record_dynamic_timeout() {
let metrics = CapacityMetrics::new();
metrics.record_dynamic_timeout();
metrics.record_dynamic_timeout();
let (dynamic, fallback, stalls) = metrics.get_timeout_stats();
assert_eq!(dynamic, 2);
assert_eq!(fallback, 0);
assert_eq!(stalls, 0);
}
#[test]
fn test_record_timeout_fallback() {
let metrics = CapacityMetrics::new();
metrics.record_timeout_fallback();
metrics.record_stall_detected();
let (dynamic, fallback, stalls) = metrics.get_timeout_stats();
assert_eq!(dynamic, 0);
assert_eq!(fallback, 1);
assert_eq!(stalls, 1);
}
}

View File

@@ -0,0 +1,21 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod capacity_integration;
pub mod capacity_manager;
#[cfg(test)]
mod capacity_manager_test;
pub mod capacity_metrics;
#[cfg(test)]
mod write_trigger_test;

View File

@@ -0,0 +1,157 @@
// Copyright 2024 RustFS Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Write trigger integration tests
#[cfg(test)]
mod tests {
use crate::capacity::capacity_manager::{DataSource, HybridCapacityManager};
use crate::capacity::capacity_metrics::{
CapacityMetrics, get_capacity_metrics, record_global_cache_hit, record_global_cache_miss, record_global_write_operation,
};
use serial_test::serial;
use std::time::Duration;
#[tokio::test]
#[serial]
async fn test_write_trigger_integration() {
let manager = HybridCapacityManager::from_env();
let metrics = CapacityMetrics::new();
// Record write operations
manager.record_write_operation().await;
manager.record_write_operation().await;
manager.record_write_operation().await;
// Check write frequency
let frequency = manager.get_write_frequency().await;
assert_eq!(frequency, 3);
// Check metrics
let summary = metrics.get_summary();
assert_eq!(summary.write_triggered_updates, 0); // Not triggered yet
}
#[tokio::test]
#[serial]
async fn test_write_trigger_with_capacity_update() {
let manager = HybridCapacityManager::from_env();
let metrics = CapacityMetrics::new();
// Simulate write-triggered update by calling metrics directly
metrics.record_write_triggered_update();
// Check metrics
let summary = metrics.get_summary();
assert_eq!(summary.write_triggered_updates, 1);
// Also test manager update
manager.update_capacity(1000, DataSource::WriteTriggered).await;
// Check capacity
let cached = manager.get_capacity().await;
assert!(cached.is_some());
assert_eq!(cached.unwrap().total_used, 1000);
}
#[tokio::test]
#[serial]
async fn test_metrics_recording() {
let metrics = CapacityMetrics::new();
// Record various operations
metrics.record_cache_hit();
metrics.record_cache_hit();
metrics.record_cache_miss();
metrics.record_scheduled_update();
metrics.record_write_triggered_update();
metrics.record_update_duration(Duration::from_millis(100));
metrics.record_update_duration(Duration::from_millis(200));
// Check summary
let summary = metrics.get_summary();
assert_eq!(summary.cache_hits, 2);
assert_eq!(summary.cache_misses, 1);
assert_eq!(summary.scheduled_updates, 1);
assert_eq!(summary.write_triggered_updates, 1);
assert_eq!(summary.avg_update_duration, Duration::from_millis(150));
// Check hit rate
let hit_rate = metrics.get_cache_hit_rate();
assert!((hit_rate - 0.6666666666666666).abs() < 0.0001);
}
#[tokio::test]
async fn test_write_frequency_tracking() {
let manager = HybridCapacityManager::from_env();
// Initial state
assert_eq!(manager.get_write_frequency().await, 0);
// Record writes
for _ in 0..5 {
manager.record_write_operation().await;
}
// Check frequency
assert_eq!(manager.get_write_frequency().await, 5);
// Wait for window to expire (60 seconds)
// In real tests, we'd use a shorter window
tokio::time::sleep(Duration::from_millis(10)).await;
// Frequency should still be 5 (window not expired)
assert_eq!(manager.get_write_frequency().await, 5);
}
#[tokio::test]
async fn test_needs_fast_update() {
let manager = HybridCapacityManager::from_env();
// No cache, should not need update
assert!(!manager.needs_fast_update().await);
// Update cache
manager.update_capacity(1000, DataSource::Scheduled).await;
// Fresh cache, should not need update
assert!(!manager.needs_fast_update().await);
// Record write operation
manager.record_write_operation().await;
// With recent write, should need fast update
// (depending on configuration, this may or may not trigger)
let needs_update = manager.needs_fast_update().await;
// Just ensure it doesn't panic
#[allow(clippy::overly_complex_bool_expr)]
let _ = needs_update || !needs_update;
}
#[test]
#[serial]
fn test_global_metrics_functions() {
// Test global functions don't panic
let before = get_capacity_metrics().cache_hits.load(std::sync::atomic::Ordering::Relaxed);
record_global_write_operation();
record_global_cache_hit();
record_global_cache_miss();
let metrics = get_capacity_metrics();
assert!(metrics.cache_hits.load(std::sync::atomic::Ordering::Relaxed) > before);
}
}

View File

@@ -16,6 +16,7 @@ mod admin;
mod app;
mod auth;
mod auth_keystone;
mod capacity;
mod config;
mod error;
mod init;
@@ -40,6 +41,7 @@ use crate::init::{init_ftp_system, init_ftps_system};
#[cfg(feature = "webdav")]
use crate::init::init_webdav_system;
use crate::capacity::capacity_integration::init_capacity_management;
use crate::server::{
SHUTDOWN_TIMEOUT, ServiceState, ServiceStateManager, ShutdownSignal, init_cert, init_event_notifier, shutdown_event_notifier,
start_audit_system, start_http_server, stop_audit_system, wait_for_shutdown,
@@ -296,6 +298,7 @@ async fn run(config: config::Config) -> Result<()> {
// Initialize the local disk
init_local_disks(endpoint_pools.clone()).await.map_err(Error::other)?;
// Initialize the lock clients
init_lock_clients(endpoint_pools.clone());
for (i, eps) in endpoint_pools.as_ref().iter().enumerate() {
@@ -330,7 +333,8 @@ async fn run(config: config::Config) -> Result<()> {
);
}
}
// Initialize capacity management system
init_capacity_management().await;
let state_manager = ServiceStateManager::new();
// Update service status to Starting
state_manager.update(ServiceState::Starting);

View File

@@ -64,6 +64,18 @@ pub struct TimeoutConfig {
/// Disk read operation timeout (default 10s).
/// Individual disk read operations that exceed this are cancelled.
pub disk_read_timeout: Duration,
/// Enable dynamic timeout calculation based on object size
pub enable_dynamic_timeout: bool,
/// Expected transfer speed in bytes per second for timeout estimation
pub bytes_per_second: u64,
/// Minimum timeout for dynamic calculation
pub min_timeout: Duration,
/// Maximum timeout for dynamic calculation
pub max_timeout: Duration,
}
impl Default for TimeoutConfig {
@@ -72,6 +84,10 @@ impl Default for TimeoutConfig {
get_object_timeout: Duration::from_secs(rustfs_config::DEFAULT_OBJECT_GET_TIMEOUT),
lock_acquire_timeout: Duration::from_secs(rustfs_config::DEFAULT_OBJECT_LOCK_ACQUIRE_TIMEOUT),
disk_read_timeout: Duration::from_secs(rustfs_config::DEFAULT_OBJECT_DISK_READ_TIMEOUT),
enable_dynamic_timeout: rustfs_config::DEFAULT_OBJECT_DYNAMIC_TIMEOUT_ENABLE,
bytes_per_second: rustfs_config::DEFAULT_OBJECT_BYTES_PER_SECOND,
min_timeout: Duration::from_secs(rustfs_config::DEFAULT_OBJECT_MIN_TIMEOUT),
max_timeout: Duration::from_secs(rustfs_config::DEFAULT_OBJECT_MAX_TIMEOUT),
}
}
}
@@ -90,10 +106,26 @@ impl TimeoutConfig {
rustfs_config::DEFAULT_OBJECT_DISK_READ_TIMEOUT,
);
// Dynamic timeout settings
let enable_dynamic_timeout = rustfs_utils::get_env_bool(
rustfs_config::ENV_OBJECT_DYNAMIC_TIMEOUT_ENABLE,
rustfs_config::DEFAULT_OBJECT_DYNAMIC_TIMEOUT_ENABLE,
);
let bytes_per_second =
rustfs_utils::get_env_u64(rustfs_config::ENV_OBJECT_BYTES_PER_SECOND, rustfs_config::DEFAULT_OBJECT_BYTES_PER_SECOND);
let min_timeout_secs =
rustfs_utils::get_env_u64(rustfs_config::ENV_OBJECT_MIN_TIMEOUT, rustfs_config::DEFAULT_OBJECT_MIN_TIMEOUT);
let max_timeout_secs =
rustfs_utils::get_env_u64(rustfs_config::ENV_OBJECT_MAX_TIMEOUT, rustfs_config::DEFAULT_OBJECT_MAX_TIMEOUT);
Self {
get_object_timeout: Duration::from_secs(get_object_timeout),
lock_acquire_timeout: Duration::from_secs(lock_acquire_timeout),
disk_read_timeout: Duration::from_secs(disk_read_timeout),
enable_dynamic_timeout,
bytes_per_second,
min_timeout: Duration::from_secs(min_timeout_secs),
max_timeout: Duration::from_secs(max_timeout_secs),
}
}
@@ -101,6 +133,34 @@ impl TimeoutConfig {
pub fn is_timeout_enabled(&self) -> bool {
self.get_object_timeout > Duration::ZERO
}
/// Calculate dynamic timeout based on object size
pub fn calculate_timeout_for_size(&self, object_size: u64) -> Duration {
if !self.enable_dynamic_timeout {
return self.get_object_timeout;
}
// Calculate timeout based on expected transfer speed
// Add 50% buffer for network overhead and system load
let estimated_seconds = (object_size / self.bytes_per_second) * 3 / 2;
// Ensure at least 1 second
let estimated_duration = Duration::from_secs(estimated_seconds.max(1));
// Clamp to min/max bounds
estimated_duration
.max(self.min_timeout)
.min(self.max_timeout)
.min(self.get_object_timeout) // Never exceed configured timeout
}
/// Get appropriate timeout for a given operation
pub fn get_timeout_for_operation(&self, operation_size: Option<u64>) -> Duration {
match operation_size {
Some(size) if self.enable_dynamic_timeout && size > 0 => self.calculate_timeout_for_size(size),
_ => self.get_object_timeout,
}
}
}
/// Information about a timeout event.
@@ -124,6 +184,70 @@ pub struct TimeoutInfo {
pub disk_reads_completed: u32,
/// Number of disk reads pending.
pub disk_reads_pending: u32,
/// Object size (if known)
pub object_size: Option<u64>,
/// Progress percentage (0-100)
pub progress_percent: Option<f32>,
}
/// Progress tracking for long-running operations
#[derive(Debug, Clone)]
pub struct OperationProgress {
/// Start time
start_time: Instant,
/// Last progress update time
last_update: Instant,
/// Bytes transferred so far
bytes_transferred: u64,
/// Total object size (if known)
total_size: Option<u64>,
/// Stale timeout - if no progress for this duration, consider stuck
stale_timeout: Duration,
}
impl OperationProgress {
/// Create a new progress tracker
pub fn new(total_size: Option<u64>, stale_timeout: Duration) -> Self {
Self {
start_time: Instant::now(),
last_update: Instant::now(),
bytes_transferred: 0,
total_size,
stale_timeout,
}
}
/// Update progress with new bytes transferred
pub fn update(&mut self, bytes: u64) {
self.bytes_transferred = bytes;
self.last_update = Instant::now();
}
/// Check if progress is stale (no updates for stale_timeout)
pub fn is_stale(&self) -> bool {
self.last_update.elapsed() > self.stale_timeout
}
/// Get progress percentage (0-100)
pub fn progress_percent(&self) -> Option<f32> {
self.total_size.map(|total| {
if total == 0 {
100.0
} else {
(self.bytes_transferred as f32 / total as f32 * 100.0).min(100.0)
}
})
}
/// Get transfer rate in bytes per second
pub fn transfer_rate(&self) -> u64 {
let elapsed = self.start_time.elapsed().as_secs_f64();
if elapsed > 0.0 {
(self.bytes_transferred as f64 / elapsed) as u64
} else {
0
}
}
}
/// Result of a timed GetObject operation.
@@ -171,6 +295,25 @@ impl RequestTimeoutWrapper {
}
}
/// Create a new timeout wrapper with operation size for dynamic timeout calculation
pub fn with_operation_size(config: TimeoutConfig, operation_size: Option<u64>) -> Self {
// Store operation size in config for later use
// Note: Currently we don't store the size in the wrapper itself,
// but the config can be used to calculate appropriate timeout
let _ = operation_size; // Suppress unused warning for now
Self {
config,
start_time: Instant::now(),
cancel_token: CancellationToken::new(),
request_id: format!("req-{}", &uuid::Uuid::new_v4().to_string()[..8]),
}
}
/// Get the configured timeout for this operation
pub fn get_timeout(&self, operation_size: Option<u64>) -> Duration {
self.config.get_timeout_for_operation(operation_size)
}
/// Get the request ID.
pub fn request_id(&self) -> &str {
&self.request_id
@@ -200,13 +343,28 @@ impl RequestTimeoutWrapper {
/// Get remaining time before timeout.
/// Returns None if timeout is disabled or already exceeded.
pub fn remaining_time(&self) -> Option<Duration> {
self.remaining_time_for_size(None)
}
/// Get remaining time before timeout for a specific operation size.
pub fn remaining_time_for_size(&self, operation_size: Option<u64>) -> Option<Duration> {
if !self.config.is_timeout_enabled() {
return None;
}
let remaining = self.config.get_object_timeout.saturating_sub(self.elapsed());
let timeout = self.config.get_timeout_for_operation(operation_size);
let remaining = timeout.saturating_sub(self.elapsed());
if remaining == Duration::ZERO { None } else { Some(remaining) }
}
/// Check if the wrapper should timeout based on elapsed time and optional operation size
pub fn should_timeout(&self, operation_size: Option<u64>) -> bool {
if !self.config.is_timeout_enabled() {
return false;
}
let timeout = self.config.get_timeout_for_operation(operation_size);
self.elapsed() >= timeout
}
/// Execute an async operation with timeout protection.
///
/// The operation receives a `CancellationToken` that it can use to:
@@ -320,6 +478,8 @@ impl RequestTimeoutWrapper {
lock_hold_time: None,
disk_reads_completed: 0,
disk_reads_pending: 0,
object_size: None,
progress_percent: None,
})
}
}
@@ -441,6 +601,8 @@ impl RequestTimeoutWrapper {
lock_hold_time: None,
disk_reads_completed: 0,
disk_reads_pending: 0,
object_size: None,
progress_percent: None,
})
}
}
@@ -460,6 +622,130 @@ pub fn get_io_buffer_size() -> usize {
rustfs_utils::get_env_usize(rustfs_config::ENV_OBJECT_IO_BUFFER_SIZE, rustfs_config::DEFAULT_OBJECT_IO_BUFFER_SIZE)
}
/// Calculate adaptive timeout based on historical performance
///
/// This function adjusts timeout based on:
/// - Historical transfer rates
/// - Recent timeout occurrences
/// - System load indicators
pub fn calculate_adaptive_timeout(
base_timeout: Duration,
historical_rate_bps: Option<u64>,
recent_timeout_count: u32,
object_size: u64,
) -> Duration {
// If we have recent timeouts, increase timeout
let timeout_multiplier = if recent_timeout_count > 3 {
2.0 // Double timeout if many recent timeouts
} else if recent_timeout_count > 1 {
1.5 // 50% increase if some timeouts
} else {
1.0 // No adjustment
};
// If we have historical rate data, use it for estimation
let estimated_duration = if let Some(rate) = historical_rate_bps {
if rate > 0 {
let estimated_secs = (object_size as f64 / rate as f64) * 1.2; // 20% buffer
Duration::from_secs_f64(estimated_secs)
} else {
base_timeout
}
} else {
base_timeout
};
// Apply timeout multiplier but clamp to reasonable bounds
let adaptive_duration = Duration::from_secs_f64(estimated_duration.as_secs_f64() * timeout_multiplier);
// Clamp to 5 seconds minimum and 10 minutes maximum
adaptive_duration.max(Duration::from_secs(5)).min(Duration::from_secs(600))
}
/// Estimate bytes per second for timeout calculation
///
/// Uses a conservative estimate to avoid premature timeouts
pub fn estimate_bytes_per_second(object_size: u64, expected_duration: Duration) -> u64 {
let secs = expected_duration.as_secs_f64();
if secs > 0.0 {
(object_size as f64 / secs) as u64
} else {
rustfs_config::DEFAULT_OBJECT_BYTES_PER_SECOND
}
}
#[cfg(test)]
mod adaptive_timeout_tests {
use super::*;
#[test]
fn test_calculate_adaptive_timeout_basic() {
let base_timeout = Duration::from_secs(30);
let adaptive = calculate_adaptive_timeout(base_timeout, None, 0, 1024 * 1024);
// Should return base timeout when no historical data
assert_eq!(adaptive, base_timeout);
}
#[test]
fn test_calculate_adaptive_timeout_with_history() {
let base_timeout = Duration::from_secs(30);
let historical_rate = 2 * 1024 * 1024; // 2 MB/s
let object_size = 10 * 1024 * 1024; // 10 MB
let adaptive = calculate_adaptive_timeout(base_timeout, Some(historical_rate), 0, object_size);
// With 2 MB/s, 10 MB should take ~5 seconds + 20% buffer = 6 seconds
assert!(adaptive >= Duration::from_secs(5));
assert!(adaptive <= Duration::from_secs(10));
}
#[test]
fn test_calculate_adaptive_timeout_with_recent_timeouts() {
let base_timeout = Duration::from_secs(30);
// No timeouts
let adaptive1 = calculate_adaptive_timeout(base_timeout, None, 0, 1024 * 1024);
assert_eq!(adaptive1, base_timeout);
// Some timeouts (2 timeouts -> 1.5x multiplier -> 30 * 1.5 = 45 seconds)
let adaptive2 = calculate_adaptive_timeout(base_timeout, None, 2, 1024 * 1024);
assert!(adaptive2 > base_timeout);
assert!(adaptive2 <= Duration::from_secs(45)); // Changed from < to <=
// Many timeouts
let adaptive3 = calculate_adaptive_timeout(base_timeout, None, 5, 1024 * 1024);
assert!(adaptive3 >= base_timeout * 2);
}
#[test]
fn test_calculate_adaptive_timeout_clamping() {
let base_timeout = Duration::from_secs(1);
let adaptive = calculate_adaptive_timeout(base_timeout, None, 10, 1024 * 1024);
// Should clamp to minimum of 5 seconds
assert!(adaptive >= Duration::from_secs(5));
}
#[test]
fn test_estimate_bytes_per_second() {
let object_size = 10 * 1024 * 1024; // 10 MB
let duration = Duration::from_secs(10);
let bps = estimate_bytes_per_second(object_size, duration);
assert_eq!(bps, 1024 * 1024); // 1 MB/s
}
#[test]
fn test_estimate_bytes_per_second_zero_duration() {
let object_size = 1024;
let duration = Duration::from_secs(0);
let bps = estimate_bytes_per_second(object_size, duration);
assert_eq!(bps, rustfs_config::DEFAULT_OBJECT_BYTES_PER_SECOND);
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -579,4 +865,147 @@ mod tests {
let size = get_io_buffer_size();
assert_eq!(size, 128 * 1024);
}
#[test]
fn test_timeout_config_default_with_dynamic() {
let config = TimeoutConfig::default();
assert!(config.enable_dynamic_timeout);
assert_eq!(config.bytes_per_second, rustfs_config::DEFAULT_OBJECT_BYTES_PER_SECOND);
assert_eq!(config.min_timeout, Duration::from_secs(rustfs_config::DEFAULT_OBJECT_MIN_TIMEOUT));
assert_eq!(config.max_timeout, Duration::from_secs(rustfs_config::DEFAULT_OBJECT_MAX_TIMEOUT));
}
#[test]
fn test_calculate_timeout_for_size() {
let config = TimeoutConfig::default();
// Test with small object (should use min timeout)
let small_timeout = config.calculate_timeout_for_size(1024); // 1KB
assert_eq!(small_timeout, Duration::from_secs(rustfs_config::DEFAULT_OBJECT_MIN_TIMEOUT));
// Test with large object
let large_timeout = config.calculate_timeout_for_size(10 * 1024 * 1024); // 10MB
// At 1MB/s with 50% buffer: 10MB / 1MB/s * 1.5 = 15 seconds
assert!(large_timeout >= Duration::from_secs(14));
assert!(large_timeout <= Duration::from_secs(16));
// Test with very large object (should cap at max_timeout)
let huge_timeout = config.calculate_timeout_for_size(1000 * 1024 * 1024); // 1GB
assert!(huge_timeout <= Duration::from_secs(rustfs_config::DEFAULT_OBJECT_MAX_TIMEOUT));
}
#[test]
fn test_timeout_with_dynamic_disabled() {
let config = TimeoutConfig {
enable_dynamic_timeout: false,
..Default::default()
};
// Should use base timeout regardless of size
let timeout1 = config.get_timeout_for_operation(Some(1024));
let timeout2 = config.get_timeout_for_operation(Some(100 * 1024 * 1024));
assert_eq!(timeout1, config.get_object_timeout);
assert_eq!(timeout2, config.get_object_timeout);
}
#[test]
fn test_operation_progress_new() {
let progress = OperationProgress::new(Some(1000), Duration::from_secs(5));
assert_eq!(progress.bytes_transferred, 0);
assert_eq!(progress.total_size, Some(1000));
assert!(!progress.is_stale());
}
#[test]
fn test_operation_progress_update() {
let mut progress = OperationProgress::new(Some(1000), Duration::from_secs(5));
progress.update(500);
assert_eq!(progress.bytes_transferred, 500);
assert!(!progress.is_stale());
// Simulate time passing
std::thread::sleep(Duration::from_millis(100));
progress.update(1000);
assert_eq!(progress.bytes_transferred, 1000);
}
#[test]
fn test_operation_progress_stale() {
let mut progress = OperationProgress::new(Some(1000), Duration::from_millis(100));
progress.update(500);
assert!(!progress.is_stale());
// Wait for stale timeout
std::thread::sleep(Duration::from_millis(150));
assert!(progress.is_stale());
// Update should clear stale status
progress.update(600);
assert!(!progress.is_stale());
}
#[test]
fn test_operation_progress_percent() {
let progress = OperationProgress::new(Some(1000), Duration::from_secs(5));
assert_eq!(progress.progress_percent(), Some(0.0));
let mut progress = progress;
progress.update(500);
assert_eq!(progress.progress_percent(), Some(50.0));
progress.update(1000);
assert_eq!(progress.progress_percent(), Some(100.0));
}
#[test]
fn test_operation_progress_no_total_size() {
let progress = OperationProgress::new(None, Duration::from_secs(5));
assert_eq!(progress.progress_percent(), None);
}
#[test]
fn test_operation_progress_zero_size() {
let progress = OperationProgress::new(Some(0), Duration::from_secs(5));
assert_eq!(progress.progress_percent(), Some(100.0));
}
#[test]
fn test_should_timeout() {
let config = TimeoutConfig {
get_object_timeout: Duration::from_millis(100),
..Default::default()
};
let wrapper = RequestTimeoutWrapper::new(config);
// Should not timeout immediately
assert!(!wrapper.should_timeout(None));
// Wait for timeout
std::thread::sleep(Duration::from_millis(150));
assert!(wrapper.should_timeout(None));
}
#[test]
fn test_should_timeout_with_size() {
let config = TimeoutConfig {
enable_dynamic_timeout: true,
bytes_per_second: 1024, // 1KB/s
min_timeout: Duration::from_secs(rustfs_config::DEFAULT_OBJECT_MIN_TIMEOUT),
max_timeout: Duration::from_secs(rustfs_config::DEFAULT_OBJECT_MAX_TIMEOUT),
..Default::default()
};
let wrapper = RequestTimeoutWrapper::new(config);
// Small size should use min timeout
assert!(!wrapper.should_timeout(Some(1024)));
// Large size should calculate longer timeout
assert!(!wrapper.should_timeout(Some(10 * 1024 * 1024)));
}
}

132
scripts/run.sh Normal file → Executable file
View File

@@ -15,6 +15,8 @@ set -e
# See the License for the specific language governing permissions and
# limitations under the License.
# RustFS Startup Script
# This script sets up environment variables and starts the RustFS service
# check ./rustfs/static/index.html not exists
if [ ! -f ./rustfs/static/index.html ]; then
@@ -215,6 +217,118 @@ export RUSTFS_TRUST_SYSTEM_CA=true
# export RUSTFS_FTPS_ADDRESS="0.0.0.0:8022"
# export RUSTFS_FTPS_CERTS_DIR="${current_dir}/deploy/certs/ftps"
# ============================================================================
# Capacity Statistics Configuration
# ============================================================================
# --- Capacity Management System ---
# The capacity management system provides accurate capacity statistics with
# high performance through hybrid caching strategy.
#
# Features:
# - Hybrid caching: scheduled updates + write triggers + smart detection
# - Performance protection: sampling, timeout, fallback
# - Comprehensive metrics: 17 metrics for monitoring
# - Low overhead: < 0.1% CPU, < 1MB memory
#
# For more details, see: .codeartsdoer/specs/fix-capacity-calculation/
# --- Basic Configuration ---
# Scheduled update interval (seconds)
# How often to perform full capacity recalculation
# Default: 300 (5 minutes)
# Recommended: 300-600 for production, 60-120 for testing
export RUSTFS_CAPACITY_SCHEDULED_INTERVAL=300
# Write trigger delay (seconds)
# Delay after write operation before triggering capacity update
# Default: 10
# Recommended: 5-15
export RUSTFS_CAPACITY_WRITE_TRIGGER_DELAY=10
# Write frequency threshold (writes per minute)
# Threshold for triggering fast updates during high write frequency
# Default: 10
# Recommended: 5-20
export RUSTFS_CAPACITY_WRITE_FREQUENCY_THRESHOLD=10
# Fast update threshold (seconds)
# Cache age threshold for considering data as fresh
# Default: 60
# Recommended: 30-120
export RUSTFS_CAPACITY_FAST_UPDATE_THRESHOLD=60
# --- Performance Protection ---
# Maximum files threshold
# When file count exceeds this, sampling is used for performance
# Default: 1000000 (1 million)
# Recommended: 500000-2000000
export RUSTFS_CAPACITY_MAX_FILES_THRESHOLD=1000000
# Statistics timeout (seconds)
# Maximum time to wait for capacity calculation
# Default: 5
# Recommended: 3-10
export RUSTFS_CAPACITY_STAT_TIMEOUT=5
# Sample rate
# When sampling is enabled, check every N files
# Default: 100
# Recommended: 50-200
export RUSTFS_CAPACITY_SAMPLE_RATE=100
# --- Monitoring Configuration ---
# Metrics logging interval (seconds)
# How often to log capacity metrics summary
# Default: 600 (10 minutes)
# Recommended: 300-900
export RUSTFS_CAPACITY_METRICS_INTERVAL=600
# --- Scenario 1: High Performance Production ---
# For high-throughput production environments with millions of files
# export RUSTFS_CAPACITY_SCHEDULED_INTERVAL=600
# export RUSTFS_CAPACITY_WRITE_TRIGGER_DELAY=15
# export RUSTFS_CAPACITY_WRITE_FREQUENCY_THRESHOLD=20
# export RUSTFS_CAPACITY_FAST_UPDATE_THRESHOLD=120
# export RUSTFS_CAPACITY_MAX_FILES_THRESHOLD=2000000
# export RUSTFS_CAPACITY_STAT_TIMEOUT=10
# export RUSTFS_CAPACITY_SAMPLE_RATE=200
# export RUSTFS_CAPACITY_METRICS_INTERVAL=900
# --- Scenario 2: Low Latency Testing ---
# For testing environments requiring frequent updates
# export RUSTFS_CAPACITY_SCHEDULED_INTERVAL=60
# export RUSTFS_CAPACITY_WRITE_TRIGGER_DELAY=5
# export RUSTFS_CAPACITY_WRITE_FREQUENCY_THRESHOLD=5
# export RUSTFS_CAPACITY_FAST_UPDATE_THRESHOLD=30
# export RUSTFS_CAPACITY_MAX_FILES_THRESHOLD=500000
# export RUSTFS_CAPACITY_STAT_TIMEOUT=3
# export RUSTFS_CAPACITY_SAMPLE_RATE=50
# export RUSTFS_CAPACITY_METRICS_INTERVAL=300
# --- Scenario 3: Small Scale Deployment ---
# For small deployments with < 100K files
# export RUSTFS_CAPACITY_SCHEDULED_INTERVAL=300
# export RUSTFS_CAPACITY_WRITE_TRIGGER_DELAY=10
# export RUSTFS_CAPACITY_WRITE_FREQUENCY_THRESHOLD=10
# export RUSTFS_CAPACITY_FAST_UPDATE_THRESHOLD=60
# export RUSTFS_CAPACITY_MAX_FILES_THRESHOLD=100000
# export RUSTFS_CAPACITY_STAT_TIMEOUT=5
# export RUSTFS_CAPACITY_SAMPLE_RATE=100
# export RUSTFS_CAPACITY_METRICS_INTERVAL=600
# --- Scenario 4: Debugging / Troubleshooting ---
# Enable more frequent updates and shorter timeouts for debugging
# export RUSTFS_CAPACITY_SCHEDULED_INTERVAL=30
# export RUSTFS_CAPACITY_WRITE_TRIGGER_DELAY=2
# export RUSTFS_CAPACITY_WRITE_FREQUENCY_THRESHOLD=3
# export RUSTFS_CAPACITY_FAST_UPDATE_THRESHOLD=10
# export RUSTFS_CAPACITY_MAX_FILES_THRESHOLD=10000
# export RUSTFS_CAPACITY_STAT_TIMEOUT=2
# export RUSTFS_CAPACITY_SAMPLE_RATE=10
# export RUSTFS_CAPACITY_METRICS_INTERVAL=60
# ============================================
# Concurrent Request Optimization Configuration
# ============================================
@@ -302,7 +416,11 @@ export RUSTFS_OBJECT_PRIORITY_SCHEDULING_ENABLE=true
# export RUSTFS_OBJECT_DEADLOCK_CHECK_INTERVAL=3
# export RUSTFS_OBJECT_DEADLOCK_HANG_THRESHOLD=5
# --- Backpressure Configuration ---
# ============================================================================
# Backpressure Configuration
# ============================================================================
# High watermark: trigger backpressure when buffer usage exceeds this percentage
export RUSTFS_BACKPRESSURE_HIGH_WATERMARK=80
# Low watermark: release backpressure when buffer usage drops below this percentage
@@ -312,6 +430,10 @@ if [ -n "$1" ]; then
export RUSTFS_VOLUMES="$1"
fi
# ============================================================================
# Memory Profiling Configuration
# ============================================================================
# Enable jemalloc for memory profiling
# MALLOC_CONF parameters:
# prof:true - Enable heap profiling
@@ -328,14 +450,22 @@ if [ -z "$MALLOC_CONF" ]; then
export MALLOC_CONF="prof:true,prof_active:true,lg_prof_sample:16,log:true,narenas:2,lg_chunk:21,background_thread:true,dirty_decay_ms:1000,muzzy_decay_ms:1000"
fi
# ============================================================================
# Service Startup
# ============================================================================
# Start webhook server
#cargo run --example webhook -p rustfs-notify &
# Start main service
# To run with profiling enabled, uncomment the following line and comment the next line
#cargo run --profile profiling --bin rustfs
# To run with FTP/FTPS support, use:
# cargo run --bin rustfs --features ftps
# To run in release mode, use the following line
#cargo run --profile release --bin rustfs
# To run in debug mode, use the following line
cargo run --bin rustfs