Merge branch 'main' into dependabot/cargo/dependencies-0509

This commit is contained in:
houseme
2026-05-09 21:47:07 +08:00
committed by GitHub
2 changed files with 243 additions and 9 deletions

View File

@@ -23,7 +23,7 @@ use crate::server::{
hybrid::hybrid,
layer::{
AdminChunkedContentLengthCompatLayer, BodylessStatusFixLayer, ConditionalCorsLayer, HeadRequestBodyFixLayer,
ObjectAttributesEtagFixLayer, RedirectLayer, RequestContextLayer, S3ErrorMessageCompatLayer,
ObjectAttributesEtagFixLayer, PublicHealthEndpointLayer, RedirectLayer, RequestContextLayer, S3ErrorMessageCompatLayer,
},
tls_material::{TlsAcceptorHolder, TlsHandshakeFailureKind, TlsMaterialSnapshot, spawn_reload_loop},
};
@@ -673,7 +673,7 @@ fn process_connection(
};
// ── Canonical Middleware Stack Order (outermost → innermost) ──
// This order MUST be preserved across refactorings.
// Only AddExtensionLayer (layers 1-2) are per-connection; layers 3-15 are stateless.
// Only AddExtensionLayer (layers 1-2) are per-connection; most remaining layers are stateless.
//
// 1. AddExtensionLayer<RemoteAddr> — per-connection peer address
// 2. AddExtensionLayer<SocketAddr> — per-connection raw socket addr (TrustedProxy)
@@ -694,6 +694,7 @@ fn process_connection(
// 17. RedirectLayer — console redirect (conditional)
// 18. BodylessStatusFixLayer — clears body for 1xx/204/205/304 responses
// 19. HeadRequestBodyFixLayer — strips actual body bytes from HEAD responses
// 20. PublicHealthEndpointLayer — handles public health before s3s host parsing
// ─────────────────────────────────────────────────────────────
let hybrid_service = ServiceBuilder::new()
// NOTE: Both extension types are intentionally inserted to maintain compatibility:
@@ -871,16 +872,19 @@ fn process_connection(
// Bucket-level CORS takes precedence when configured (handled in router.rs for OPTIONS, and in ecfs.rs for actual requests)
.layer(ConditionalCorsLayer::new())
.option_layer(if is_console { Some(RedirectLayer) } else { None })
// Must run first on responses: clear the body and remove
// Must run before outer response-transforming layers: clear the body and remove
// Content-Length, Content-Type, and Transfer-Encoding for statuses
// that MUST NOT carry a body (1xx/204/304). Kept innermost so all
// other response-transforming layers see the already-bodyless
// that MUST NOT carry a body (1xx/204/304). Placed inside those
// layers so they see the already-bodyless
// response and so no layer (e.g. CORS) re-adds body headers afterward.
.layer(BodylessStatusFixLayer)
// HEAD responses must not send body bytes even when the inner S3 layer
// serializes an XML error payload. Keep this innermost so the final
// HTTP response written to hyper/h2 is bodyless.
// serializes an XML error payload.
.layer(HeadRequestBodyFixLayer)
// Health probes are public admin routes, but s3s parses virtual-host
// buckets before custom routes. Handle them here so SERVER_DOMAINS
// cannot turn /health into an S3 bucket request.
.layer(PublicHealthEndpointLayer)
.service(service);
let hybrid_service = TowerToHyperService::new(hybrid_service);

View File

@@ -13,10 +13,14 @@
// limitations under the License.
use crate::admin::console::is_console_path;
use crate::admin::handlers::health::{build_health_payload, collect_dependency_readiness, health_check_state, probe_from_path};
use crate::error::ApiError;
use crate::server::cors;
use crate::server::hybrid::HybridBody;
use crate::server::{ADMIN_PREFIX, CONSOLE_PREFIX, MINIO_ADMIN_PREFIX, MINIO_ADMIN_V3_PREFIX, RPC_PREFIX, RUSTFS_ADMIN_PREFIX};
use crate::server::{
ADMIN_PREFIX, CONSOLE_PREFIX, HEALTH_PREFIX, HEALTH_READY_PATH, MINIO_ADMIN_PREFIX, MINIO_ADMIN_V3_PREFIX, RPC_PREFIX,
RUSTFS_ADMIN_PREFIX,
};
use crate::storage::apply_cors_headers;
use crate::storage::request_context::{RequestContext, extract_request_id_from_headers};
use bytes::Bytes;
@@ -558,6 +562,89 @@ where
}
}
#[derive(Clone)]
pub struct PublicHealthEndpointLayer;
impl<S> Layer<S> for PublicHealthEndpointLayer {
type Service = PublicHealthEndpointService<S>;
fn layer(&self, inner: S) -> Self::Service {
PublicHealthEndpointService { inner }
}
}
#[derive(Clone)]
pub struct PublicHealthEndpointService<S> {
inner: S,
}
fn health_endpoint_enabled() -> bool {
rustfs_utils::get_env_bool(rustfs_config::ENV_HEALTH_ENDPOINT_ENABLE, rustfs_config::DEFAULT_HEALTH_ENDPOINT_ENABLE)
}
fn is_public_health_endpoint_request(method: &Method, path: &str) -> bool {
(method == Method::GET || method == Method::HEAD)
&& (path == HEALTH_PREFIX || path == HEALTH_READY_PATH)
&& health_endpoint_enabled()
}
async fn build_public_health_http_response<RestBody, GrpcBody>(
method: Method,
path: String,
) -> Response<HybridBody<RestBody, GrpcBody>>
where
RestBody: From<Bytes>,
{
let probe = probe_from_path(&path);
let (storage_ready, iam_ready) = collect_dependency_readiness().await;
let health = health_check_state(storage_ready, iam_ready, probe);
let body = if method == Method::HEAD {
Bytes::new()
} else {
let payload = build_health_payload(health, storage_ready, iam_ready, "rustfs-endpoint", None);
Bytes::from(serde_json::to_vec(&payload).unwrap_or_else(|_| b"{}".to_vec()))
};
Response::builder()
.status(health.status_code)
.header(http::header::CONTENT_TYPE, "application/json")
.body(HybridBody::Rest {
rest_body: RestBody::from(body),
})
.expect("failed to build health response")
}
impl<S, ReqBody, RestBody, GrpcBody> Service<HttpRequest<ReqBody>> for PublicHealthEndpointService<S>
where
S: Service<HttpRequest<ReqBody>, Response = Response<HybridBody<RestBody, GrpcBody>>> + Clone + Send + 'static,
S::Future: Send + 'static,
ReqBody: Send + 'static,
RestBody: From<Bytes> + Send + 'static,
GrpcBody: Send + 'static,
{
type Response = Response<HybridBody<RestBody, GrpcBody>>;
type Error = S::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: HttpRequest<ReqBody>) -> Self::Future {
let method = req.method();
let path = req.uri().path();
if is_public_health_endpoint_request(method, path) {
let method = method.clone();
let path = path.to_owned();
return Box::pin(async move { Ok(build_public_health_http_response(method, path).await) });
}
let mut inner = self.inner.clone();
Box::pin(async move { inner.call(req).await })
}
}
fn is_bodyless_status(status: StatusCode) -> bool {
status.is_informational()
|| status == StatusCode::NO_CONTENT
@@ -933,9 +1020,11 @@ mod tests {
use http::Request;
use http_body_util::BodyExt;
use http_body_util::Full;
use serial_test::serial;
use std::convert::Infallible;
use std::sync::Mutex;
use temp_env::with_var;
use std::sync::atomic::{AtomicUsize, Ordering};
use temp_env::{async_with_vars, with_var};
#[derive(Clone, Debug)]
struct CaptureService;
@@ -980,6 +1069,147 @@ mod tests {
}
}
#[derive(Clone, Default)]
struct CountingHybridService {
calls: Arc<AtomicUsize>,
}
impl CountingHybridService {
fn calls(&self) -> Arc<AtomicUsize> {
Arc::clone(&self.calls)
}
}
impl<B: Send + 'static> Service<Request<B>> for CountingHybridService {
type Response = Response<HybridBody<Full<Bytes>, Full<Bytes>>>;
type Error = Infallible;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _req: Request<B>) -> Self::Future {
self.calls.fetch_add(1, Ordering::SeqCst);
ready(Ok(Response::builder()
.status(StatusCode::IM_A_TEAPOT)
.body(HybridBody::Rest {
rest_body: Full::from(Bytes::from_static(b"inner")),
})
.expect("response")))
}
}
#[tokio::test]
#[serial]
async fn public_health_endpoint_layer_handles_health_before_inner_service() {
async_with_vars([(rustfs_config::ENV_HEALTH_ENDPOINT_ENABLE, Some("true"))], async {
let inner = CountingHybridService::default();
let calls = inner.calls();
let mut service = PublicHealthEndpointLayer.layer(inner);
let response = service
.call(
Request::builder()
.method(Method::GET)
.uri(HEALTH_PREFIX)
.header(http::header::HOST, "localhost:9000")
.body(Full::<Bytes>::from(Bytes::new()))
.expect("request"),
)
.await
.expect("health response");
assert_eq!(response.status(), StatusCode::OK);
assert_eq!(calls.load(Ordering::SeqCst), 0);
assert_eq!(
response
.headers()
.get(http::header::CONTENT_TYPE)
.and_then(|value| value.to_str().ok()),
Some("application/json")
);
let body = BodyExt::collect(response.into_body()).await.expect("body").to_bytes();
assert!(body.windows(br#""status":"#.len()).any(|window| window == br#""status":"#));
})
.await;
}
#[tokio::test]
#[serial]
async fn public_health_endpoint_layer_handles_ready_head_before_inner_service() {
async_with_vars([(rustfs_config::ENV_HEALTH_ENDPOINT_ENABLE, Some("true"))], async {
let inner = CountingHybridService::default();
let calls = inner.calls();
let mut service = PublicHealthEndpointLayer.layer(inner);
let response = service
.call(
Request::builder()
.method(Method::HEAD)
.uri(HEALTH_READY_PATH)
.body(Full::<Bytes>::from(Bytes::new()))
.expect("request"),
)
.await
.expect("health response");
assert!(response.status() == StatusCode::OK || response.status() == StatusCode::SERVICE_UNAVAILABLE);
assert_eq!(calls.load(Ordering::SeqCst), 0);
let body = BodyExt::collect(response.into_body()).await.expect("body").to_bytes();
assert!(body.is_empty());
})
.await;
}
#[tokio::test]
#[serial]
async fn public_health_endpoint_layer_forwards_health_when_endpoint_disabled() {
async_with_vars([(rustfs_config::ENV_HEALTH_ENDPOINT_ENABLE, Some("false"))], async {
let inner = CountingHybridService::default();
let calls = inner.calls();
let mut service = PublicHealthEndpointLayer.layer(inner);
let response = service
.call(
Request::builder()
.method(Method::GET)
.uri(HEALTH_PREFIX)
.body(Full::<Bytes>::from(Bytes::new()))
.expect("request"),
)
.await
.expect("inner response");
assert_eq!(response.status(), StatusCode::IM_A_TEAPOT);
assert_eq!(calls.load(Ordering::SeqCst), 1);
})
.await;
}
#[tokio::test]
async fn public_health_endpoint_layer_forwards_non_health_requests() {
let inner = CountingHybridService::default();
let calls = inner.calls();
let mut service = PublicHealthEndpointLayer.layer(inner);
let response = service
.call(
Request::builder()
.method(Method::GET)
.uri("/bucket/object")
.body(Full::<Bytes>::from(Bytes::new()))
.expect("request"),
)
.await
.expect("inner response");
assert_eq!(response.status(), StatusCode::IM_A_TEAPOT);
assert_eq!(calls.load(Ordering::SeqCst), 1);
}
#[test]
fn admin_chunked_put_without_content_length_is_normalized() {
let request = Request::builder()