From 5879d0b59de73dbf213a2684116cccae72641e2d Mon Sep 17 00:00:00 2001 From: Henry Guo Date: Sat, 9 May 2026 21:15:37 +0800 Subject: [PATCH] fix(server): handle public health before s3 host parsing (#2866) Co-authored-by: houseme --- rustfs/src/server/http.rs | 18 +-- rustfs/src/server/layer.rs | 234 ++++++++++++++++++++++++++++++++++++- 2 files changed, 243 insertions(+), 9 deletions(-) diff --git a/rustfs/src/server/http.rs b/rustfs/src/server/http.rs index ae4f6ab24..1a058b193 100644 --- a/rustfs/src/server/http.rs +++ b/rustfs/src/server/http.rs @@ -23,7 +23,7 @@ use crate::server::{ hybrid::hybrid, layer::{ AdminChunkedContentLengthCompatLayer, BodylessStatusFixLayer, ConditionalCorsLayer, HeadRequestBodyFixLayer, - ObjectAttributesEtagFixLayer, RedirectLayer, RequestContextLayer, S3ErrorMessageCompatLayer, + ObjectAttributesEtagFixLayer, PublicHealthEndpointLayer, RedirectLayer, RequestContextLayer, S3ErrorMessageCompatLayer, }, tls_material::{TlsAcceptorHolder, TlsHandshakeFailureKind, TlsMaterialSnapshot, spawn_reload_loop}, }; @@ -673,7 +673,7 @@ fn process_connection( }; // ── Canonical Middleware Stack Order (outermost → innermost) ── // This order MUST be preserved across refactorings. - // Only AddExtensionLayer (layers 1-2) are per-connection; layers 3-15 are stateless. + // Only AddExtensionLayer (layers 1-2) are per-connection; most remaining layers are stateless. // // 1. AddExtensionLayer — per-connection peer address // 2. AddExtensionLayer — per-connection raw socket addr (TrustedProxy) @@ -694,6 +694,7 @@ fn process_connection( // 17. RedirectLayer — console redirect (conditional) // 18. BodylessStatusFixLayer — clears body for 1xx/204/205/304 responses // 19. HeadRequestBodyFixLayer — strips actual body bytes from HEAD responses + // 20. PublicHealthEndpointLayer — handles public health before s3s host parsing // ───────────────────────────────────────────────────────────── let hybrid_service = ServiceBuilder::new() // NOTE: Both extension types are intentionally inserted to maintain compatibility: @@ -871,16 +872,19 @@ fn process_connection( // Bucket-level CORS takes precedence when configured (handled in router.rs for OPTIONS, and in ecfs.rs for actual requests) .layer(ConditionalCorsLayer::new()) .option_layer(if is_console { Some(RedirectLayer) } else { None }) - // Must run first on responses: clear the body and remove + // Must run before outer response-transforming layers: clear the body and remove // Content-Length, Content-Type, and Transfer-Encoding for statuses - // that MUST NOT carry a body (1xx/204/304). Kept innermost so all - // other response-transforming layers see the already-bodyless + // that MUST NOT carry a body (1xx/204/304). Placed inside those + // layers so they see the already-bodyless // response and so no layer (e.g. CORS) re-adds body headers afterward. .layer(BodylessStatusFixLayer) // HEAD responses must not send body bytes even when the inner S3 layer - // serializes an XML error payload. Keep this innermost so the final - // HTTP response written to hyper/h2 is bodyless. + // serializes an XML error payload. .layer(HeadRequestBodyFixLayer) + // Health probes are public admin routes, but s3s parses virtual-host + // buckets before custom routes. Handle them here so SERVER_DOMAINS + // cannot turn /health into an S3 bucket request. + .layer(PublicHealthEndpointLayer) .service(service); let hybrid_service = TowerToHyperService::new(hybrid_service); diff --git a/rustfs/src/server/layer.rs b/rustfs/src/server/layer.rs index d36672594..a6db2af38 100644 --- a/rustfs/src/server/layer.rs +++ b/rustfs/src/server/layer.rs @@ -13,10 +13,14 @@ // limitations under the License. use crate::admin::console::is_console_path; +use crate::admin::handlers::health::{build_health_payload, collect_dependency_readiness, health_check_state, probe_from_path}; use crate::error::ApiError; use crate::server::cors; use crate::server::hybrid::HybridBody; -use crate::server::{ADMIN_PREFIX, CONSOLE_PREFIX, MINIO_ADMIN_PREFIX, MINIO_ADMIN_V3_PREFIX, RPC_PREFIX, RUSTFS_ADMIN_PREFIX}; +use crate::server::{ + ADMIN_PREFIX, CONSOLE_PREFIX, HEALTH_PREFIX, HEALTH_READY_PATH, MINIO_ADMIN_PREFIX, MINIO_ADMIN_V3_PREFIX, RPC_PREFIX, + RUSTFS_ADMIN_PREFIX, +}; use crate::storage::apply_cors_headers; use crate::storage::request_context::{RequestContext, extract_request_id_from_headers}; use bytes::Bytes; @@ -558,6 +562,89 @@ where } } +#[derive(Clone)] +pub struct PublicHealthEndpointLayer; + +impl Layer for PublicHealthEndpointLayer { + type Service = PublicHealthEndpointService; + + fn layer(&self, inner: S) -> Self::Service { + PublicHealthEndpointService { inner } + } +} + +#[derive(Clone)] +pub struct PublicHealthEndpointService { + inner: S, +} + +fn health_endpoint_enabled() -> bool { + rustfs_utils::get_env_bool(rustfs_config::ENV_HEALTH_ENDPOINT_ENABLE, rustfs_config::DEFAULT_HEALTH_ENDPOINT_ENABLE) +} + +fn is_public_health_endpoint_request(method: &Method, path: &str) -> bool { + (method == Method::GET || method == Method::HEAD) + && (path == HEALTH_PREFIX || path == HEALTH_READY_PATH) + && health_endpoint_enabled() +} + +async fn build_public_health_http_response( + method: Method, + path: String, +) -> Response> +where + RestBody: From, +{ + let probe = probe_from_path(&path); + let (storage_ready, iam_ready) = collect_dependency_readiness().await; + let health = health_check_state(storage_ready, iam_ready, probe); + let body = if method == Method::HEAD { + Bytes::new() + } else { + let payload = build_health_payload(health, storage_ready, iam_ready, "rustfs-endpoint", None); + Bytes::from(serde_json::to_vec(&payload).unwrap_or_else(|_| b"{}".to_vec())) + }; + + Response::builder() + .status(health.status_code) + .header(http::header::CONTENT_TYPE, "application/json") + .body(HybridBody::Rest { + rest_body: RestBody::from(body), + }) + .expect("failed to build health response") +} + +impl Service> for PublicHealthEndpointService +where + S: Service, Response = Response>> + Clone + Send + 'static, + S::Future: Send + 'static, + ReqBody: Send + 'static, + RestBody: From + Send + 'static, + GrpcBody: Send + 'static, +{ + type Response = Response>; + type Error = S::Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: HttpRequest) -> Self::Future { + let method = req.method(); + let path = req.uri().path(); + + if is_public_health_endpoint_request(method, path) { + let method = method.clone(); + let path = path.to_owned(); + return Box::pin(async move { Ok(build_public_health_http_response(method, path).await) }); + } + + let mut inner = self.inner.clone(); + Box::pin(async move { inner.call(req).await }) + } +} + fn is_bodyless_status(status: StatusCode) -> bool { status.is_informational() || status == StatusCode::NO_CONTENT @@ -933,9 +1020,11 @@ mod tests { use http::Request; use http_body_util::BodyExt; use http_body_util::Full; + use serial_test::serial; use std::convert::Infallible; use std::sync::Mutex; - use temp_env::with_var; + use std::sync::atomic::{AtomicUsize, Ordering}; + use temp_env::{async_with_vars, with_var}; #[derive(Clone, Debug)] struct CaptureService; @@ -980,6 +1069,147 @@ mod tests { } } + #[derive(Clone, Default)] + struct CountingHybridService { + calls: Arc, + } + + impl CountingHybridService { + fn calls(&self) -> Arc { + Arc::clone(&self.calls) + } + } + + impl Service> for CountingHybridService { + type Response = Response, Full>>; + type Error = Infallible; + type Future = Ready>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, _req: Request) -> Self::Future { + self.calls.fetch_add(1, Ordering::SeqCst); + ready(Ok(Response::builder() + .status(StatusCode::IM_A_TEAPOT) + .body(HybridBody::Rest { + rest_body: Full::from(Bytes::from_static(b"inner")), + }) + .expect("response"))) + } + } + + #[tokio::test] + #[serial] + async fn public_health_endpoint_layer_handles_health_before_inner_service() { + async_with_vars([(rustfs_config::ENV_HEALTH_ENDPOINT_ENABLE, Some("true"))], async { + let inner = CountingHybridService::default(); + let calls = inner.calls(); + let mut service = PublicHealthEndpointLayer.layer(inner); + + let response = service + .call( + Request::builder() + .method(Method::GET) + .uri(HEALTH_PREFIX) + .header(http::header::HOST, "localhost:9000") + .body(Full::::from(Bytes::new())) + .expect("request"), + ) + .await + .expect("health response"); + + assert_eq!(response.status(), StatusCode::OK); + assert_eq!(calls.load(Ordering::SeqCst), 0); + assert_eq!( + response + .headers() + .get(http::header::CONTENT_TYPE) + .and_then(|value| value.to_str().ok()), + Some("application/json") + ); + + let body = BodyExt::collect(response.into_body()).await.expect("body").to_bytes(); + assert!(body.windows(br#""status":"#.len()).any(|window| window == br#""status":"#)); + }) + .await; + } + + #[tokio::test] + #[serial] + async fn public_health_endpoint_layer_handles_ready_head_before_inner_service() { + async_with_vars([(rustfs_config::ENV_HEALTH_ENDPOINT_ENABLE, Some("true"))], async { + let inner = CountingHybridService::default(); + let calls = inner.calls(); + let mut service = PublicHealthEndpointLayer.layer(inner); + + let response = service + .call( + Request::builder() + .method(Method::HEAD) + .uri(HEALTH_READY_PATH) + .body(Full::::from(Bytes::new())) + .expect("request"), + ) + .await + .expect("health response"); + + assert!(response.status() == StatusCode::OK || response.status() == StatusCode::SERVICE_UNAVAILABLE); + assert_eq!(calls.load(Ordering::SeqCst), 0); + + let body = BodyExt::collect(response.into_body()).await.expect("body").to_bytes(); + assert!(body.is_empty()); + }) + .await; + } + + #[tokio::test] + #[serial] + async fn public_health_endpoint_layer_forwards_health_when_endpoint_disabled() { + async_with_vars([(rustfs_config::ENV_HEALTH_ENDPOINT_ENABLE, Some("false"))], async { + let inner = CountingHybridService::default(); + let calls = inner.calls(); + let mut service = PublicHealthEndpointLayer.layer(inner); + + let response = service + .call( + Request::builder() + .method(Method::GET) + .uri(HEALTH_PREFIX) + .body(Full::::from(Bytes::new())) + .expect("request"), + ) + .await + .expect("inner response"); + + assert_eq!(response.status(), StatusCode::IM_A_TEAPOT); + assert_eq!(calls.load(Ordering::SeqCst), 1); + }) + .await; + } + + #[tokio::test] + async fn public_health_endpoint_layer_forwards_non_health_requests() { + let inner = CountingHybridService::default(); + let calls = inner.calls(); + let mut service = PublicHealthEndpointLayer.layer(inner); + + let response = service + .call( + Request::builder() + .method(Method::GET) + .uri("/bucket/object") + .body(Full::::from(Bytes::new())) + .expect("request"), + ) + .await + .expect("inner response"); + + assert_eq!(response.status(), StatusCode::IM_A_TEAPOT); + assert_eq!(calls.load(Ordering::SeqCst), 1); + } + #[test] fn admin_chunked_put_without_content_length_is_normalized() { let request = Request::builder()