diff --git a/crates/io-metrics/README_zh.md b/crates/io-metrics/README_zh.md index 7ae458303..b2baa0921 100644 --- a/crates/io-metrics/README_zh.md +++ b/crates/io-metrics/README_zh.md @@ -237,15 +237,6 @@ println!("最大并发读: {}", config.scheduler.max_concurrent_reads); ## 🔧 配置 -### 环境变量 - -| 变量名 | 描述 | 默认值 | -|--------|------|--------| -| `RUSTFS_CACHE_MAX_CAPACITY` | 缓存最大容量 | 10000 | -| `RUSTFS_CACHE_TTL_SECS` | 缓存 TTL 秒数 | 300 | -| `RUSTFS_CACHE_MAX_MEMORY` | 缓存最大内存 | 104857600 | -| `RUSTFS_ADAPTIVE_TTL_ENABLED` | 启用自适应 TTL | true | - ### 代码配置 ```rust diff --git a/crates/trusted-proxies/Cargo.toml b/crates/trusted-proxies/Cargo.toml index a342bcfd7..b196a3367 100644 --- a/crates/trusted-proxies/Cargo.toml +++ b/crates/trusted-proxies/Cargo.toml @@ -30,7 +30,7 @@ axum = { workspace = true } http = { workspace = true } ipnetwork = { workspace = true } metrics = { workspace = true } -moka = { workspace = true, features = ["future"] } +moka = { workspace = true, features = ["future", "sync"] } reqwest = { workspace = true } rustfs-config = { workspace = true } rustfs-utils = { workspace = true, features = ["net"] } diff --git a/crates/trusted-proxies/src/global.rs b/crates/trusted-proxies/src/global.rs index f9eeadf7f..691e55bd0 100644 --- a/crates/trusted-proxies/src/global.rs +++ b/crates/trusted-proxies/src/global.rs @@ -64,7 +64,7 @@ pub fn init() { .expect("Trusted proxy metrics already initialized"); // Initialize the trusted proxy layer. - let layer = TrustedProxyLayer::new(config.proxy.clone(), metrics, enabled); + let layer = TrustedProxyLayer::with_cache_config(config.proxy.clone(), config.cache.clone(), metrics, enabled); PROXY_LAYER.set(layer).expect("Trusted proxy layer already initialized"); tracing::info!("Trusted Proxies module initialized"); diff --git a/crates/trusted-proxies/src/middleware/layer.rs b/crates/trusted-proxies/src/middleware/layer.rs index eb651d189..452a20ed0 100644 --- a/crates/trusted-proxies/src/middleware/layer.rs +++ b/crates/trusted-proxies/src/middleware/layer.rs @@ -17,10 +17,10 @@ use std::sync::Arc; use tower::Layer; -use crate::ProxyMetrics; use crate::ProxyValidator; use crate::TrustedProxyConfig; use crate::TrustedProxyMiddleware; +use crate::{CacheConfig, ProxyMetrics}; /// Tower Layer for the trusted proxy middleware. #[derive(Clone, Debug)] @@ -34,12 +34,22 @@ pub struct TrustedProxyLayer { impl TrustedProxyLayer { /// Creates a new `TrustedProxyLayer`. pub fn new(config: TrustedProxyConfig, metrics: Option, enabled: bool) -> Self { - let validator = ProxyValidator::new(config, metrics); + Self::with_cache_config(config, CacheConfig::default(), metrics, enabled) + } - Self { - validator: Arc::new(validator), - enabled, + /// Creates a new `TrustedProxyLayer` with explicit cache configuration. + pub fn with_cache_config( + config: TrustedProxyConfig, + cache_config: CacheConfig, + metrics: Option, + enabled: bool, + ) -> Self { + let validator = Arc::new(ProxyValidator::with_cache_config(config, cache_config.clone(), metrics)); + if enabled { + validator.spawn_cache_maintenance_task(cache_config.cleanup_interval()); } + + Self { validator, enabled } } /// Creates a new `TrustedProxyLayer` that is enabled by default. diff --git a/crates/trusted-proxies/src/proxy/cache.rs b/crates/trusted-proxies/src/proxy/cache.rs index f80a534d0..b4d8fae0c 100644 --- a/crates/trusted-proxies/src/proxy/cache.rs +++ b/crates/trusted-proxies/src/proxy/cache.rs @@ -14,62 +14,114 @@ //! High-performance cache implementation for proxy validation results using Moka. -use moka::future::Cache; +use moka::sync::Cache; use std::net::IpAddr; use std::time::Duration; +use crate::ProxyMetrics; + /// Cache for storing IP validation results. #[derive(Debug, Clone)] pub struct IpValidationCache { /// The underlying Moka cache. cache: Cache, + /// Configured capacity. + capacity: usize, /// Whether the cache is enabled. enabled: bool, + /// Optional metrics collector for cache activity. + metrics: Option, } impl IpValidationCache { /// Creates a new `IpValidationCache` using Moka. - pub fn new(capacity: usize, ttl: Duration, enabled: bool) -> Self { + pub fn new(capacity: usize, ttl: Duration, enabled: bool, metrics: Option) -> Self { let cache = Cache::builder().max_capacity(capacity as u64).time_to_live(ttl).build(); - - Self { cache, enabled } + let this = Self { + cache, + capacity, + enabled, + metrics, + }; + this.update_cache_size_metric(); + this } /// Checks if an IP is trusted, using the cache if available. - pub async fn is_trusted(&self, ip: &IpAddr, validator: impl FnOnce(&IpAddr) -> bool) -> bool { + pub fn is_trusted(&self, ip: &IpAddr, validator: impl FnOnce(&IpAddr) -> bool) -> bool { if !self.enabled { return validator(ip); } // Attempt to get the result from cache. - if let Some(is_trusted) = self.cache.get(ip).await { - metrics::counter!("rustfs_trusted_proxy_cache_hits").increment(1); + if let Some(is_trusted) = self.cache.get(ip) { + self.record_cache_hit(); return is_trusted; } - // Cache miss: perform validation and update cache. - metrics::counter!("rustfs_trusted_proxy_cache_misses").increment(1); + // Cache miss: perform validation. Only positive trust decisions are cached + // to avoid polluting the cache with one-off untrusted client IPs. + self.record_cache_miss(); let is_trusted = validator(ip); - self.cache.insert(*ip, is_trusted).await; + if is_trusted { + self.cache.insert(*ip, is_trusted); + self.update_cache_size_metric(); + } is_trusted } /// Clears all entries from the cache. - pub async fn clear(&self) { + pub fn clear(&self) { self.cache.invalidate_all(); - metrics::gauge!("rustfs_trusted_proxy_cache_size").set(0.0); + self.cache.run_pending_tasks(); + self.update_cache_size_metric(); + } + + /// Runs pending cache maintenance tasks and refreshes size metrics. + pub fn run_maintenance(&self) { + if !self.enabled { + return; + } + + self.cache.run_pending_tasks(); + self.update_cache_size_metric(); } /// Returns statistics about the current state of the cache. pub fn stats(&self) -> CacheStats { + if self.enabled { + self.cache.run_pending_tasks(); + } + let entry_count = self.cache.entry_count(); CacheStats { size: entry_count as usize, - // Moka doesn't expose max_capacity directly in a simple way after build, - // but we can track it if needed. - capacity: 0, + capacity: self.capacity, + } + } + + /// Returns whether the cache is enabled. + pub fn is_enabled(&self) -> bool { + self.enabled + } + + fn record_cache_hit(&self) { + if let Some(metrics) = &self.metrics { + metrics.record_cache_hit(); + } + } + + fn record_cache_miss(&self) { + if let Some(metrics) = &self.metrics { + metrics.record_cache_miss(); + } + } + + fn update_cache_size_metric(&self) { + if let Some(metrics) = &self.metrics { + metrics.set_cache_size(self.cache.entry_count() as usize); } } } diff --git a/crates/trusted-proxies/src/proxy/chain.rs b/crates/trusted-proxies/src/proxy/chain.rs index 00984dea3..629147cf7 100644 --- a/crates/trusted-proxies/src/proxy/chain.rs +++ b/crates/trusted-proxies/src/proxy/chain.rs @@ -228,7 +228,7 @@ impl ProxyChainAnalyzer { } /// Checks if an IP address is trusted based on the configuration. - fn is_ip_trusted(&self, ip: &IpAddr) -> bool { + pub(crate) fn is_ip_trusted(&self, ip: &IpAddr) -> bool { if self.trusted_ip_cache.contains(ip) { return true; } diff --git a/crates/trusted-proxies/src/proxy/metrics.rs b/crates/trusted-proxies/src/proxy/metrics.rs index 1fd03b0f4..7ae8afc3a 100644 --- a/crates/trusted-proxies/src/proxy/metrics.rs +++ b/crates/trusted-proxies/src/proxy/metrics.rs @@ -178,6 +178,33 @@ impl ProxyMetrics { }); } + /// Records a cache hit. + pub fn record_cache_hit(&self) { + if !self.enabled { + return; + } + + counter!("rustfs_trusted_proxy_cache_hits_total", "app" => self.app_name.clone()).increment(1); + } + + /// Records a cache miss. + pub fn record_cache_miss(&self) { + if !self.enabled { + return; + } + + counter!("rustfs_trusted_proxy_cache_misses_total", "app" => self.app_name.clone()).increment(1); + } + + /// Updates only the cache size gauge. + pub fn set_cache_size(&self, size: usize) { + if !self.enabled { + return; + } + + gauge!("rustfs_trusted_proxy_cache_size", "app" => self.app_name.clone()).set(size as f64); + } + /// Records cache performance metrics. pub fn record_cache_metrics(&self, hits: u64, misses: u64, size: usize) { if !self.enabled { diff --git a/crates/trusted-proxies/src/proxy/validator.rs b/crates/trusted-proxies/src/proxy/validator.rs index 05ddcbc55..993f1890f 100644 --- a/crates/trusted-proxies/src/proxy/validator.rs +++ b/crates/trusted-proxies/src/proxy/validator.rs @@ -16,10 +16,13 @@ use axum::http::HeaderMap; use std::net::{IpAddr, SocketAddr}; -use std::time::Instant; +use std::sync::Arc; +use std::time::{Duration, Instant}; use tracing::{debug, warn}; -use crate::{ProxyChainAnalyzer, ProxyError, ProxyMetrics, TrustedProxyConfig, ValidationMode}; +use crate::{ + CacheConfig, CacheStats, IpValidationCache, ProxyChainAnalyzer, ProxyError, ProxyMetrics, TrustedProxyConfig, ValidationMode, +}; /// Information about the client extracted from the request and proxy headers. #[derive(Debug, Clone)] @@ -95,6 +98,8 @@ pub struct ProxyValidator { config: TrustedProxyConfig, /// Analyzer for verifying the integrity of the proxy chain. chain_analyzer: ProxyChainAnalyzer, + /// Cache for repeated direct-peer trusted proxy decisions. + validation_cache: Arc, /// Metrics collector for observability. metrics: Option, } @@ -102,11 +107,24 @@ pub struct ProxyValidator { impl ProxyValidator { /// Creates a new `ProxyValidator` with the given configuration and metrics. pub fn new(config: TrustedProxyConfig, metrics: Option) -> Self { + Self::with_cache_config(config, CacheConfig::default(), metrics) + } + + /// Creates a new `ProxyValidator` with explicit cache configuration. + pub fn with_cache_config(config: TrustedProxyConfig, cache_config: CacheConfig, metrics: Option) -> Self { let chain_analyzer = ProxyChainAnalyzer::new(config.clone()); + let cache_enabled = cache_config.capacity > 0 && cache_config.ttl_seconds > 0; + let validation_cache = Arc::new(IpValidationCache::new( + cache_config.capacity, + cache_config.ttl_duration(), + cache_enabled, + metrics.clone(), + )); Self { config, chain_analyzer, + validation_cache, metrics, } } @@ -130,21 +148,33 @@ impl ProxyValidator { /// Internal logic for request validation. fn validate_request_internal(&self, peer_addr: Option, headers: &HeaderMap) -> Result { - // Fallback to unspecified address if peer address is missing. - let peer_addr = peer_addr.unwrap_or_else(|| SocketAddr::new(IpAddr::from([0, 0, 0, 0]), 0)); + let Some(peer_addr) = peer_addr else { + debug!("SocketAddr extension is missing; skipping trusted proxy evaluation"); + return Ok(ClientInfo::direct(SocketAddr::new(IpAddr::from([0, 0, 0, 0]), 0))); + }; + + let peer_ip = peer_addr.ip(); + if peer_ip.is_unspecified() { + debug!("Peer address is unspecified; skipping trusted proxy evaluation"); + return Ok(ClientInfo::direct(peer_addr)); + } + + let is_trusted_proxy = self + .validation_cache + .is_trusted(&peer_ip, |ip| self.chain_analyzer.is_ip_trusted(ip)); // Check if the direct peer is a trusted proxy. - if self.config.is_trusted(&peer_addr) { - debug!("Request received from trusted proxy: {}", peer_addr.ip()); + if is_trusted_proxy { + debug!("Request received from trusted proxy: {}", peer_ip); // Parse and validate headers from the trusted proxy. self.validate_trusted_proxy_request(&peer_addr, headers) } else { // Log a warning if the request is from a private network but not trusted. - if self.config.is_private_network(&peer_addr.ip()) { + if self.config.is_private_network(&peer_ip) { warn!( "Request from private network but not trusted: {}. This might indicate a configuration issue.", - peer_addr.ip() + peer_ip ); } @@ -153,6 +183,31 @@ impl ProxyValidator { } } + /// Returns cache statistics for direct-peer validation decisions. + pub fn cache_stats(&self) -> CacheStats { + self.validation_cache.stats() + } + + pub(crate) fn spawn_cache_maintenance_task(self: &Arc, cleanup_interval: Duration) { + if cleanup_interval.is_zero() || !self.validation_cache.is_enabled() { + return; + } + + let Ok(handle) = tokio::runtime::Handle::try_current() else { + tracing::debug!("No Tokio runtime available; trusted proxy cache maintenance is disabled"); + return; + }; + + let cache = self.validation_cache.clone(); + handle.spawn(async move { + let mut interval = tokio::time::interval(cleanup_interval); + loop { + interval.tick().await; + cache.run_maintenance(); + } + }); + } + /// Validates a request that originated from a trusted proxy. fn validate_trusted_proxy_request(&self, proxy_addr: &SocketAddr, headers: &HeaderMap) -> Result { let proxy_ip = proxy_addr.ip(); diff --git a/crates/trusted-proxies/tests/unit/validator_tests.rs b/crates/trusted-proxies/tests/unit/validator_tests.rs index 210bad5e0..c9c4c8693 100644 --- a/crates/trusted-proxies/tests/unit/validator_tests.rs +++ b/crates/trusted-proxies/tests/unit/validator_tests.rs @@ -13,7 +13,9 @@ // limitations under the License. use axum::http::HeaderMap; -use rustfs_trusted_proxies::{ClientInfo, ProxyChainAnalyzer, ProxyValidator, TrustedProxy, TrustedProxyConfig, ValidationMode}; +use rustfs_trusted_proxies::{ + CacheConfig, ClientInfo, ProxyChainAnalyzer, ProxyValidator, TrustedProxy, TrustedProxyConfig, ValidationMode, +}; use std::net::{IpAddr, SocketAddr}; use std::str::FromStr; @@ -77,3 +79,56 @@ fn test_proxy_chain_too_long() { _ => panic!("Expected ChainTooLong error"), } } + +#[test] +fn test_validator_caches_trusted_direct_peer_decision() { + let validator = ProxyValidator::with_cache_config(create_test_config(), CacheConfig::default(), None); + let peer_addr = Some(SocketAddr::new(IpAddr::from_str("192.168.1.100").unwrap(), 8080)); + let headers = HeaderMap::new(); + + assert_eq!(validator.cache_stats().size, 0); + + let first = validator.validate_request(peer_addr, &headers).unwrap(); + assert!(first.is_from_trusted_proxy); + assert_eq!(validator.cache_stats().size, 1); + + let second = validator.validate_request(peer_addr, &headers).unwrap(); + assert!(second.is_from_trusted_proxy); + assert_eq!(validator.cache_stats().size, 1); +} + +#[test] +fn test_validator_caches_untrusted_direct_peer_decision() { + let validator = ProxyValidator::with_cache_config(create_test_config(), CacheConfig::default(), None); + let peer_addr = Some(SocketAddr::new(IpAddr::from_str("203.0.113.8").unwrap(), 8080)); + let headers = HeaderMap::new(); + + let first = validator.validate_request(peer_addr, &headers).unwrap(); + assert!(!first.is_from_trusted_proxy); + assert_eq!(validator.cache_stats().size, 0); + + let second = validator.validate_request(peer_addr, &headers).unwrap(); + assert!(!second.is_from_trusted_proxy); + assert_eq!(validator.cache_stats().size, 0); +} + +#[test] +fn test_validator_skips_cache_when_peer_addr_is_missing() { + let validator = ProxyValidator::with_cache_config(create_test_config(), CacheConfig::default(), None); + let headers = HeaderMap::new(); + + let client_info = validator.validate_request(None, &headers).unwrap(); + assert!(!client_info.is_from_trusted_proxy); + assert_eq!(validator.cache_stats().size, 0); +} + +#[test] +fn test_validator_skips_cache_for_unspecified_peer_addr() { + let validator = ProxyValidator::with_cache_config(create_test_config(), CacheConfig::default(), None); + let peer_addr = Some(SocketAddr::new(IpAddr::from([0, 0, 0, 0]), 0)); + let headers = HeaderMap::new(); + + let client_info = validator.validate_request(peer_addr, &headers).unwrap(); + assert!(!client_info.is_from_trusted_proxy); + assert_eq!(validator.cache_stats().size, 0); +} diff --git a/docker-compose-simple.yml b/docker-compose-simple.yml index dc107d034..d6af57124 100644 --- a/docker-compose-simple.yml +++ b/docker-compose-simple.yml @@ -35,9 +35,6 @@ services: - RUSTFS_SECRET_KEY=rustfsadmin # CHANGEME - RUSTFS_OBS_LOGGER_LEVEL=info - RUSTFS_TLS_PATH=/opt/tls - # Object Cache - - RUSTFS_OBJECT_CACHE_ENABLE=true - - RUSTFS_OBJECT_CACHE_TTL_SECS=300 volumes: - rustfs_data_0:/data/rustfs0 diff --git a/rustfs/src/app/object_usecase.rs b/rustfs/src/app/object_usecase.rs index bf8f09ba8..631b07319 100644 --- a/rustfs/src/app/object_usecase.rs +++ b/rustfs/src/app/object_usecase.rs @@ -2077,7 +2077,6 @@ impl DefaultObjectUsecase { opts, } = request_context; - // Try to get from cache for small, frequently accessed objects let manager = get_concurrency_manager(); let prepared_read = Self::prepare_get_object_read_execution( diff --git a/scripts/run.ps1 b/scripts/run.ps1 index c3f2a29bb..e1953c3d6 100644 --- a/scripts/run.ps1 +++ b/scripts/run.ps1 @@ -159,9 +159,6 @@ $env:RUSTFS_NS_SCANNER_INTERVAL = "60" $env:RUSTFS_SCANNER_ENABLED = "false" $env:RUSTFS_HEAL_ENABLED = "false" -# Object cache configuration -$env:RUSTFS_OBJECT_CACHE_ENABLE = "true" - # Profiling configuration $env:RUSTFS_ENABLE_PROFILING = "false" diff --git a/scripts/run.sh b/scripts/run.sh index 36c4f788c..e392d2e98 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -241,9 +241,6 @@ export RUSTFS_SCANNER_ENABLED=true export RUSTFS_HEAL_ENABLED=true -# Object cache configuration -export RUSTFS_OBJECT_CACHE_ENABLE=true - # Profiling configuration export RUSTFS_ENABLE_PROFILING=false # Memory profiling periodic dump