fix(cache): wire trusted-proxy cache and remove stale cache traces (#2581)

This commit is contained in:
houseme
2026-04-18 00:57:21 +08:00
committed by GitHub
parent 38eb0781cf
commit ffcf18f5f3
13 changed files with 231 additions and 51 deletions

View File

@@ -237,15 +237,6 @@ println!("最大并发读: {}", config.scheduler.max_concurrent_reads);
## 🔧 配置
### 环境变量
| 变量名 | 描述 | 默认值 |
|--------|------|--------|
| `RUSTFS_CACHE_MAX_CAPACITY` | 缓存最大容量 | 10000 |
| `RUSTFS_CACHE_TTL_SECS` | 缓存 TTL 秒数 | 300 |
| `RUSTFS_CACHE_MAX_MEMORY` | 缓存最大内存 | 104857600 |
| `RUSTFS_ADAPTIVE_TTL_ENABLED` | 启用自适应 TTL | true |
### 代码配置
```rust

View File

@@ -30,7 +30,7 @@ axum = { workspace = true }
http = { workspace = true }
ipnetwork = { workspace = true }
metrics = { workspace = true }
moka = { workspace = true, features = ["future"] }
moka = { workspace = true, features = ["future", "sync"] }
reqwest = { workspace = true }
rustfs-config = { workspace = true }
rustfs-utils = { workspace = true, features = ["net"] }

View File

@@ -64,7 +64,7 @@ pub fn init() {
.expect("Trusted proxy metrics already initialized");
// Initialize the trusted proxy layer.
let layer = TrustedProxyLayer::new(config.proxy.clone(), metrics, enabled);
let layer = TrustedProxyLayer::with_cache_config(config.proxy.clone(), config.cache.clone(), metrics, enabled);
PROXY_LAYER.set(layer).expect("Trusted proxy layer already initialized");
tracing::info!("Trusted Proxies module initialized");

View File

@@ -17,10 +17,10 @@
use std::sync::Arc;
use tower::Layer;
use crate::ProxyMetrics;
use crate::ProxyValidator;
use crate::TrustedProxyConfig;
use crate::TrustedProxyMiddleware;
use crate::{CacheConfig, ProxyMetrics};
/// Tower Layer for the trusted proxy middleware.
#[derive(Clone, Debug)]
@@ -34,12 +34,22 @@ pub struct TrustedProxyLayer {
impl TrustedProxyLayer {
/// Creates a new `TrustedProxyLayer`.
pub fn new(config: TrustedProxyConfig, metrics: Option<ProxyMetrics>, enabled: bool) -> Self {
let validator = ProxyValidator::new(config, metrics);
Self::with_cache_config(config, CacheConfig::default(), metrics, enabled)
}
Self {
validator: Arc::new(validator),
enabled,
/// Creates a new `TrustedProxyLayer` with explicit cache configuration.
pub fn with_cache_config(
config: TrustedProxyConfig,
cache_config: CacheConfig,
metrics: Option<ProxyMetrics>,
enabled: bool,
) -> Self {
let validator = Arc::new(ProxyValidator::with_cache_config(config, cache_config.clone(), metrics));
if enabled {
validator.spawn_cache_maintenance_task(cache_config.cleanup_interval());
}
Self { validator, enabled }
}
/// Creates a new `TrustedProxyLayer` that is enabled by default.

View File

@@ -14,62 +14,114 @@
//! High-performance cache implementation for proxy validation results using Moka.
use moka::future::Cache;
use moka::sync::Cache;
use std::net::IpAddr;
use std::time::Duration;
use crate::ProxyMetrics;
/// Cache for storing IP validation results.
#[derive(Debug, Clone)]
pub struct IpValidationCache {
/// The underlying Moka cache.
cache: Cache<IpAddr, bool>,
/// Configured capacity.
capacity: usize,
/// Whether the cache is enabled.
enabled: bool,
/// Optional metrics collector for cache activity.
metrics: Option<ProxyMetrics>,
}
impl IpValidationCache {
/// Creates a new `IpValidationCache` using Moka.
pub fn new(capacity: usize, ttl: Duration, enabled: bool) -> Self {
pub fn new(capacity: usize, ttl: Duration, enabled: bool, metrics: Option<ProxyMetrics>) -> Self {
let cache = Cache::builder().max_capacity(capacity as u64).time_to_live(ttl).build();
Self { cache, enabled }
let this = Self {
cache,
capacity,
enabled,
metrics,
};
this.update_cache_size_metric();
this
}
/// Checks if an IP is trusted, using the cache if available.
pub async fn is_trusted(&self, ip: &IpAddr, validator: impl FnOnce(&IpAddr) -> bool) -> bool {
pub fn is_trusted(&self, ip: &IpAddr, validator: impl FnOnce(&IpAddr) -> bool) -> bool {
if !self.enabled {
return validator(ip);
}
// Attempt to get the result from cache.
if let Some(is_trusted) = self.cache.get(ip).await {
metrics::counter!("rustfs_trusted_proxy_cache_hits").increment(1);
if let Some(is_trusted) = self.cache.get(ip) {
self.record_cache_hit();
return is_trusted;
}
// Cache miss: perform validation and update cache.
metrics::counter!("rustfs_trusted_proxy_cache_misses").increment(1);
// Cache miss: perform validation. Only positive trust decisions are cached
// to avoid polluting the cache with one-off untrusted client IPs.
self.record_cache_miss();
let is_trusted = validator(ip);
self.cache.insert(*ip, is_trusted).await;
if is_trusted {
self.cache.insert(*ip, is_trusted);
self.update_cache_size_metric();
}
is_trusted
}
/// Clears all entries from the cache.
pub async fn clear(&self) {
pub fn clear(&self) {
self.cache.invalidate_all();
metrics::gauge!("rustfs_trusted_proxy_cache_size").set(0.0);
self.cache.run_pending_tasks();
self.update_cache_size_metric();
}
/// Runs pending cache maintenance tasks and refreshes size metrics.
pub fn run_maintenance(&self) {
if !self.enabled {
return;
}
self.cache.run_pending_tasks();
self.update_cache_size_metric();
}
/// Returns statistics about the current state of the cache.
pub fn stats(&self) -> CacheStats {
if self.enabled {
self.cache.run_pending_tasks();
}
let entry_count = self.cache.entry_count();
CacheStats {
size: entry_count as usize,
// Moka doesn't expose max_capacity directly in a simple way after build,
// but we can track it if needed.
capacity: 0,
capacity: self.capacity,
}
}
/// Returns whether the cache is enabled.
pub fn is_enabled(&self) -> bool {
self.enabled
}
fn record_cache_hit(&self) {
if let Some(metrics) = &self.metrics {
metrics.record_cache_hit();
}
}
fn record_cache_miss(&self) {
if let Some(metrics) = &self.metrics {
metrics.record_cache_miss();
}
}
fn update_cache_size_metric(&self) {
if let Some(metrics) = &self.metrics {
metrics.set_cache_size(self.cache.entry_count() as usize);
}
}
}

View File

@@ -228,7 +228,7 @@ impl ProxyChainAnalyzer {
}
/// Checks if an IP address is trusted based on the configuration.
fn is_ip_trusted(&self, ip: &IpAddr) -> bool {
pub(crate) fn is_ip_trusted(&self, ip: &IpAddr) -> bool {
if self.trusted_ip_cache.contains(ip) {
return true;
}

View File

@@ -178,6 +178,33 @@ impl ProxyMetrics {
});
}
/// Records a cache hit.
pub fn record_cache_hit(&self) {
if !self.enabled {
return;
}
counter!("rustfs_trusted_proxy_cache_hits_total", "app" => self.app_name.clone()).increment(1);
}
/// Records a cache miss.
pub fn record_cache_miss(&self) {
if !self.enabled {
return;
}
counter!("rustfs_trusted_proxy_cache_misses_total", "app" => self.app_name.clone()).increment(1);
}
/// Updates only the cache size gauge.
pub fn set_cache_size(&self, size: usize) {
if !self.enabled {
return;
}
gauge!("rustfs_trusted_proxy_cache_size", "app" => self.app_name.clone()).set(size as f64);
}
/// Records cache performance metrics.
pub fn record_cache_metrics(&self, hits: u64, misses: u64, size: usize) {
if !self.enabled {

View File

@@ -16,10 +16,13 @@
use axum::http::HeaderMap;
use std::net::{IpAddr, SocketAddr};
use std::time::Instant;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::{debug, warn};
use crate::{ProxyChainAnalyzer, ProxyError, ProxyMetrics, TrustedProxyConfig, ValidationMode};
use crate::{
CacheConfig, CacheStats, IpValidationCache, ProxyChainAnalyzer, ProxyError, ProxyMetrics, TrustedProxyConfig, ValidationMode,
};
/// Information about the client extracted from the request and proxy headers.
#[derive(Debug, Clone)]
@@ -95,6 +98,8 @@ pub struct ProxyValidator {
config: TrustedProxyConfig,
/// Analyzer for verifying the integrity of the proxy chain.
chain_analyzer: ProxyChainAnalyzer,
/// Cache for repeated direct-peer trusted proxy decisions.
validation_cache: Arc<IpValidationCache>,
/// Metrics collector for observability.
metrics: Option<ProxyMetrics>,
}
@@ -102,11 +107,24 @@ pub struct ProxyValidator {
impl ProxyValidator {
/// Creates a new `ProxyValidator` with the given configuration and metrics.
pub fn new(config: TrustedProxyConfig, metrics: Option<ProxyMetrics>) -> Self {
Self::with_cache_config(config, CacheConfig::default(), metrics)
}
/// Creates a new `ProxyValidator` with explicit cache configuration.
pub fn with_cache_config(config: TrustedProxyConfig, cache_config: CacheConfig, metrics: Option<ProxyMetrics>) -> Self {
let chain_analyzer = ProxyChainAnalyzer::new(config.clone());
let cache_enabled = cache_config.capacity > 0 && cache_config.ttl_seconds > 0;
let validation_cache = Arc::new(IpValidationCache::new(
cache_config.capacity,
cache_config.ttl_duration(),
cache_enabled,
metrics.clone(),
));
Self {
config,
chain_analyzer,
validation_cache,
metrics,
}
}
@@ -130,21 +148,33 @@ impl ProxyValidator {
/// Internal logic for request validation.
fn validate_request_internal(&self, peer_addr: Option<SocketAddr>, headers: &HeaderMap) -> Result<ClientInfo, ProxyError> {
// Fallback to unspecified address if peer address is missing.
let peer_addr = peer_addr.unwrap_or_else(|| SocketAddr::new(IpAddr::from([0, 0, 0, 0]), 0));
let Some(peer_addr) = peer_addr else {
debug!("SocketAddr extension is missing; skipping trusted proxy evaluation");
return Ok(ClientInfo::direct(SocketAddr::new(IpAddr::from([0, 0, 0, 0]), 0)));
};
let peer_ip = peer_addr.ip();
if peer_ip.is_unspecified() {
debug!("Peer address is unspecified; skipping trusted proxy evaluation");
return Ok(ClientInfo::direct(peer_addr));
}
let is_trusted_proxy = self
.validation_cache
.is_trusted(&peer_ip, |ip| self.chain_analyzer.is_ip_trusted(ip));
// Check if the direct peer is a trusted proxy.
if self.config.is_trusted(&peer_addr) {
debug!("Request received from trusted proxy: {}", peer_addr.ip());
if is_trusted_proxy {
debug!("Request received from trusted proxy: {}", peer_ip);
// Parse and validate headers from the trusted proxy.
self.validate_trusted_proxy_request(&peer_addr, headers)
} else {
// Log a warning if the request is from a private network but not trusted.
if self.config.is_private_network(&peer_addr.ip()) {
if self.config.is_private_network(&peer_ip) {
warn!(
"Request from private network but not trusted: {}. This might indicate a configuration issue.",
peer_addr.ip()
peer_ip
);
}
@@ -153,6 +183,31 @@ impl ProxyValidator {
}
}
/// Returns cache statistics for direct-peer validation decisions.
pub fn cache_stats(&self) -> CacheStats {
self.validation_cache.stats()
}
pub(crate) fn spawn_cache_maintenance_task(self: &Arc<Self>, cleanup_interval: Duration) {
if cleanup_interval.is_zero() || !self.validation_cache.is_enabled() {
return;
}
let Ok(handle) = tokio::runtime::Handle::try_current() else {
tracing::debug!("No Tokio runtime available; trusted proxy cache maintenance is disabled");
return;
};
let cache = self.validation_cache.clone();
handle.spawn(async move {
let mut interval = tokio::time::interval(cleanup_interval);
loop {
interval.tick().await;
cache.run_maintenance();
}
});
}
/// Validates a request that originated from a trusted proxy.
fn validate_trusted_proxy_request(&self, proxy_addr: &SocketAddr, headers: &HeaderMap) -> Result<ClientInfo, ProxyError> {
let proxy_ip = proxy_addr.ip();

View File

@@ -13,7 +13,9 @@
// limitations under the License.
use axum::http::HeaderMap;
use rustfs_trusted_proxies::{ClientInfo, ProxyChainAnalyzer, ProxyValidator, TrustedProxy, TrustedProxyConfig, ValidationMode};
use rustfs_trusted_proxies::{
CacheConfig, ClientInfo, ProxyChainAnalyzer, ProxyValidator, TrustedProxy, TrustedProxyConfig, ValidationMode,
};
use std::net::{IpAddr, SocketAddr};
use std::str::FromStr;
@@ -77,3 +79,56 @@ fn test_proxy_chain_too_long() {
_ => panic!("Expected ChainTooLong error"),
}
}
#[test]
fn test_validator_caches_trusted_direct_peer_decision() {
let validator = ProxyValidator::with_cache_config(create_test_config(), CacheConfig::default(), None);
let peer_addr = Some(SocketAddr::new(IpAddr::from_str("192.168.1.100").unwrap(), 8080));
let headers = HeaderMap::new();
assert_eq!(validator.cache_stats().size, 0);
let first = validator.validate_request(peer_addr, &headers).unwrap();
assert!(first.is_from_trusted_proxy);
assert_eq!(validator.cache_stats().size, 1);
let second = validator.validate_request(peer_addr, &headers).unwrap();
assert!(second.is_from_trusted_proxy);
assert_eq!(validator.cache_stats().size, 1);
}
#[test]
fn test_validator_caches_untrusted_direct_peer_decision() {
let validator = ProxyValidator::with_cache_config(create_test_config(), CacheConfig::default(), None);
let peer_addr = Some(SocketAddr::new(IpAddr::from_str("203.0.113.8").unwrap(), 8080));
let headers = HeaderMap::new();
let first = validator.validate_request(peer_addr, &headers).unwrap();
assert!(!first.is_from_trusted_proxy);
assert_eq!(validator.cache_stats().size, 0);
let second = validator.validate_request(peer_addr, &headers).unwrap();
assert!(!second.is_from_trusted_proxy);
assert_eq!(validator.cache_stats().size, 0);
}
#[test]
fn test_validator_skips_cache_when_peer_addr_is_missing() {
let validator = ProxyValidator::with_cache_config(create_test_config(), CacheConfig::default(), None);
let headers = HeaderMap::new();
let client_info = validator.validate_request(None, &headers).unwrap();
assert!(!client_info.is_from_trusted_proxy);
assert_eq!(validator.cache_stats().size, 0);
}
#[test]
fn test_validator_skips_cache_for_unspecified_peer_addr() {
let validator = ProxyValidator::with_cache_config(create_test_config(), CacheConfig::default(), None);
let peer_addr = Some(SocketAddr::new(IpAddr::from([0, 0, 0, 0]), 0));
let headers = HeaderMap::new();
let client_info = validator.validate_request(peer_addr, &headers).unwrap();
assert!(!client_info.is_from_trusted_proxy);
assert_eq!(validator.cache_stats().size, 0);
}

View File

@@ -35,9 +35,6 @@ services:
- RUSTFS_SECRET_KEY=rustfsadmin # CHANGEME
- RUSTFS_OBS_LOGGER_LEVEL=info
- RUSTFS_TLS_PATH=/opt/tls
# Object Cache
- RUSTFS_OBJECT_CACHE_ENABLE=true
- RUSTFS_OBJECT_CACHE_TTL_SECS=300
volumes:
- rustfs_data_0:/data/rustfs0

View File

@@ -2077,7 +2077,6 @@ impl DefaultObjectUsecase {
opts,
} = request_context;
// Try to get from cache for small, frequently accessed objects
let manager = get_concurrency_manager();
let prepared_read = Self::prepare_get_object_read_execution(

View File

@@ -159,9 +159,6 @@ $env:RUSTFS_NS_SCANNER_INTERVAL = "60"
$env:RUSTFS_SCANNER_ENABLED = "false"
$env:RUSTFS_HEAL_ENABLED = "false"
# Object cache configuration
$env:RUSTFS_OBJECT_CACHE_ENABLE = "true"
# Profiling configuration
$env:RUSTFS_ENABLE_PROFILING = "false"

View File

@@ -241,9 +241,6 @@ export RUSTFS_SCANNER_ENABLED=true
export RUSTFS_HEAL_ENABLED=true
# Object cache configuration
export RUSTFS_OBJECT_CACHE_ENABLE=true
# Profiling configuration
export RUSTFS_ENABLE_PROFILING=false
# Memory profiling periodic dump