diff --git a/Cargo.lock b/Cargo.lock index 2cc3dd5d9..8e444fe12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -183,7 +183,7 @@ dependencies = [ "hyper", "hyper-util", "itoa", - "matchit", + "matchit 0.7.3", "memchr", "mime", "percent-encoding", @@ -506,6 +506,7 @@ dependencies = [ "block-buffer 0.11.0-rc.2", "const-oid", "crypto-common 0.2.0-rc.1", + "subtle", ] [[package]] @@ -564,7 +565,7 @@ dependencies = [ "s3s-policy", "serde", "serde_json", - "sha2 0.11.0-pre.4", + "sha2", "siphasher", "tempfile", "thiserror", @@ -865,11 +866,11 @@ checksum = "c706f1711006204c2ba8fb1a7bd55f689bbf7feca9ff40325206b5e140cff6df" [[package]] name = "hmac" -version = "0.12.1" +version = "0.13.0-pre.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +checksum = "e4b1fb14e4df79f9406b434b60acef9f45c26c50062cccf1346c6103b8c47d58" dependencies = [ - "digest 0.10.7", + "digest 0.11.0-pre.9", ] [[package]] @@ -1134,6 +1135,12 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "matchit" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" + [[package]] name = "md-5" version = "0.10.6" @@ -1633,9 +1640,9 @@ dependencies = [ [[package]] name = "quick-xml" -version = "0.36.2" +version = "0.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7649a7b4df05aed9ea7ec6f628c67c9953a43869b8bc50929569b2999d443fe" +checksum = "ffbfb3ddf5364c9cfcd65549a1e7b801d0e8d1b14c1a1590a6408aa93cfbfa84" dependencies = [ "memchr", "serde", @@ -1690,7 +1697,7 @@ dependencies = [ "md-5", "pin-project-lite", "s3s", - "sha2 0.11.0-pre.4", + "sha2", "thiserror", "tokio", "tracing", @@ -1801,6 +1808,19 @@ dependencies = [ "serde", ] +[[package]] +name = "router" +version = "0.0.1" +dependencies = [ + "async-trait", + "common", + "hyper", + "matchit 0.8.4", + "pin-project-lite", + "s3s", + "tracing", +] + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -1838,6 +1858,7 @@ dependencies = [ "lazy_static", "lock", "log", + "matchit 0.8.4", "mime", "netif", "pin-project-lite", @@ -1846,6 +1867,7 @@ dependencies = [ "prost-types", "protobuf", "protos", + "router", "s3s", "serde_json", "time", @@ -1932,7 +1954,7 @@ checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" [[package]] name = "s3s" version = "0.11.0-dev" -source = "git+https://github.com/Nugine/s3s.git?rev=0caf79822ae1f2e7a7fad3c4a093c94dd2e2c33f#0caf79822ae1f2e7a7fad3c4a093c94dd2e2c33f" +source = "git+https://github.com/Nugine/s3s.git?rev=c41ac1d30a0ae3ded6a18cc1b5bbb98f8b7f35c2#c41ac1d30a0ae3ded6a18cc1b5bbb98f8b7f35c2" dependencies = [ "arrayvec", "async-trait", @@ -1943,7 +1965,7 @@ dependencies = [ "chrono", "crc32c", "crc32fast", - "digest 0.10.7", + "digest 0.11.0-pre.9", "futures", "hex-simd", "hmac", @@ -1962,7 +1984,7 @@ dependencies = [ "serde", "serde_urlencoded", "sha1", - "sha2 0.10.8", + "sha2", "smallvec", "sync_wrapper 1.0.1", "thiserror", @@ -1978,7 +2000,7 @@ dependencies = [ [[package]] name = "s3s-policy" version = "0.11.0-dev" -source = "git+https://github.com/Nugine/s3s.git?rev=0caf79822ae1f2e7a7fad3c4a093c94dd2e2c33f#0caf79822ae1f2e7a7fad3c4a093c94dd2e2c33f" +source = "git+https://github.com/Nugine/s3s.git?rev=c41ac1d30a0ae3ded6a18cc1b5bbb98f8b7f35c2#c41ac1d30a0ae3ded6a18cc1b5bbb98f8b7f35c2" dependencies = [ "indexmap 2.6.0", "serde", @@ -2054,24 +2076,13 @@ dependencies = [ [[package]] name = "sha1" -version = "0.10.6" +version = "0.11.0-pre.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +checksum = "9540978cef7a8498211c1b1c14e5ce920fe5bd524ea84f4a3d72d4602515ae93" dependencies = [ "cfg-if", "cpufeatures", - "digest 0.10.7", -] - -[[package]] -name = "sha2" -version = "0.10.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" -dependencies = [ - "cfg-if", - "cpufeatures", - "digest 0.10.7", + "digest 0.11.0-pre.9", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 83a6489a1..5ce87bc7f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ members = [ "common/protos", "api/admin", "reader", + "router", ] [workspace.package] @@ -49,10 +50,10 @@ prost-types = "0.13.3" protobuf = "3.7" protos = { path = "./common/protos" } rand = "0.8.5" -s3s = { git = "https://github.com/Nugine/s3s.git", rev = "0caf79822ae1f2e7a7fad3c4a093c94dd2e2c33f", default-features = true, features = [ +s3s = { git = "https://github.com/Nugine/s3s.git", rev = "c41ac1d30a0ae3ded6a18cc1b5bbb98f8b7f35c2", default-features = true, features = [ "tower", ] } -s3s-policy = { git = "https://github.com/Nugine/s3s.git", rev = "0caf79822ae1f2e7a7fad3c4a093c94dd2e2c33f" } +s3s-policy = { git = "https://github.com/Nugine/s3s.git", rev = "c41ac1d30a0ae3ded6a18cc1b5bbb98f8b7f35c2" } serde = { version = "1.0.214", features = ["derive"] } serde_json = "1.0.132" tempfile = "3.13.0" diff --git a/router/Cargo.toml b/router/Cargo.toml new file mode 100644 index 000000000..31e05d448 --- /dev/null +++ b/router/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "router" +edition.workspace = true +license.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true + + +[dependencies] +async-trait.workspace = true +tracing.workspace = true +s3s.workspace = true +hyper.workspace = true +matchit = "0.8.4" +pin-project-lite.workspace = true +common.workspace = true diff --git a/router/src/handler.rs b/router/src/handler.rs new file mode 100644 index 000000000..64256d61a --- /dev/null +++ b/router/src/handler.rs @@ -0,0 +1 @@ +pub trait Handler: Send + Sync + 'static {} diff --git a/router/src/lib.rs b/router/src/lib.rs new file mode 100644 index 000000000..40f3f4be8 --- /dev/null +++ b/router/src/lib.rs @@ -0,0 +1,3 @@ +pub mod handler; +pub mod operation; +pub mod router; diff --git a/router/src/operation.rs b/router/src/operation.rs new file mode 100644 index 000000000..6b3398520 --- /dev/null +++ b/router/src/operation.rs @@ -0,0 +1,10 @@ +use hyper::{Method, StatusCode}; +use matchit::Params; +use s3s::{Body, S3Request, S3Response, S3Result}; + +#[async_trait::async_trait] +pub trait Operation: Send + Sync + 'static { + fn method(&self) -> Method; + fn uri(&self) -> &'static str; + async fn call(&self, req: S3Request, params: Params<'_, '_>) -> S3Result>; +} diff --git a/router/src/router.rs b/router/src/router.rs new file mode 100644 index 000000000..2f137279b --- /dev/null +++ b/router/src/router.rs @@ -0,0 +1,72 @@ +use std::collections::HashMap; +use std::str::FromStr; + +use hyper::http::Extensions; +use hyper::HeaderMap; +use hyper::Method; +use hyper::StatusCode; +use hyper::Uri; + +use s3s::route::S3Route; +use s3s::s3_error; +use s3s::Body; +use s3s::S3Request; +use s3s::S3Response; +use s3s::S3Result; +use tracing::warn; + +use crate::operation::Operation; +use common::error::Result; +use matchit::Router; + +const ADMIN_PREFIX: &str = "/rustfs/admin"; +// const ADMIN_VERSION: &str = "v3"; + +pub struct S3Router { + router: Router, +} + +impl S3Router { + pub fn new() -> Self { + let router = Router::new(); + + Self { router } + } + + pub fn insert(&mut self, operation: T) -> Result<()> { + let path = Self::make_route_str(operation.method(), &operation.uri()); + + warn!("set uri {}", &path); + + self.router.insert(path, operation)?; + + Ok(()) + } + + fn make_route_str(method: Method, path: &str) -> String { + format!("{}|{}{}", method.as_str(), ADMIN_PREFIX, path) + } +} + +#[async_trait::async_trait] +impl S3Route for S3Router +where + T: Operation, +{ + fn is_match(&self, _method: &Method, uri: &Uri, _headers: &HeaderMap, _: &mut Extensions) -> bool { + uri.path().starts_with("/rustfs/admin") + } + + async fn call(&self, req: S3Request) -> S3Result> { + let uri = format!("{}|{}", &req.method, req.uri.path()); + + // warn!("get uri {}", &uri); + + if let Ok(mat) = self.router.at(&uri) { + let op: &T = mat.value; + return op.call(req, mat.params).await; + } + + return Err(s3_error!(NotImplemented)); + } +} diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index deb2ed322..61a3fe07b 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -53,6 +53,8 @@ transform-stream.workspace = true uuid = "1.11.0" admin = { path = "../api/admin" } axum.workspace = true +router = { version = "0.0.1", path = "../router" } +matchit = "0.8.4" [build-dependencies] prost-build.workspace = true diff --git a/rustfs/src/main.rs b/rustfs/src/main.rs index 9913d4e65..de4e7631c 100644 --- a/rustfs/src/main.rs +++ b/rustfs/src/main.rs @@ -1,5 +1,6 @@ mod config; mod grpc; +mod route; mod service; mod storage; @@ -113,6 +114,8 @@ async fn run(opt: config::Opt) -> Result<()> { b.set_access(store.clone()); + b.set_route(route::make_admin_route()?); + // // Enable parsing virtual-hosted-style requests // if let Some(dm) = opt.domain_name { // info!("virtual-hosted-style requests are enabled use domain_name {}", &dm); @@ -135,9 +138,8 @@ async fn run(opt: config::Opt) -> Result<()> { tokio::spawn(async move { let hyper_service = service.into_shared(); - let adm_service = admin::register_admin_router(); - let hybrid_service = TowerToHyperService::new(hybrid(hyper_service, rpc_service, adm_service)); + let hybrid_service = TowerToHyperService::new(hybrid(hyper_service, rpc_service)); let http_server = ConnBuilder::new(TokioExecutor::new()); let mut ctrl_c = std::pin::pin!(tokio::signal::ctrl_c()); diff --git a/rustfs/src/route.rs b/rustfs/src/route.rs new file mode 100644 index 000000000..cba7a5efb --- /dev/null +++ b/rustfs/src/route.rs @@ -0,0 +1,68 @@ +use common::error::{Error, Result}; + +use matchit::Params; +use router::{operation::Operation, router::S3Router}; +use s3s::route::S3Route; +use s3s::{Body, S3Request, S3Response, S3Result}; +use tracing::warn; + +use hyper::Method; +use hyper::StatusCode; + +pub fn make_admin_route() -> Result { + let mut r = S3Router::new(); + + r.insert(AdminOperation(&InfoHandler {}))?; + r.insert(AdminOperation(&ListPoolHandler {}))?; + + Ok(r) +} + +struct AdminOperation(&'static dyn Operation); + +#[async_trait::async_trait] +impl Operation for AdminOperation { + fn method(&self) -> Method { + self.0.method() + } + fn uri(&self) -> &'static str { + self.0.uri() + } + async fn call(&self, req: S3Request, params: Params<'_, '_>) -> S3Result> { + self.0.call(req, params).await + } +} + +struct InfoHandler {} + +#[async_trait::async_trait] +impl Operation for InfoHandler { + fn method(&self) -> Method { + Method::GET + } + fn uri(&self) -> &'static str { + "/v3/info" + } + async fn call(&self, _req: S3Request, _params: Params<'_, '_>) -> S3Result> { + warn!("handle info"); + + unimplemented!() + } +} + +struct ListPoolHandler {} + +#[async_trait::async_trait] +impl Operation for ListPoolHandler { + fn method(&self) -> Method { + Method::GET + } + fn uri(&self) -> &'static str { + "/v3/pool/list" + } + async fn call(&self, _req: S3Request, _params: Params<'_, '_>) -> S3Result> { + warn!("handle info"); + + unimplemented!() + } +} diff --git a/rustfs/src/service.rs b/rustfs/src/service.rs index 4e47fb970..3615baf13 100644 --- a/rustfs/src/service.rs +++ b/rustfs/src/service.rs @@ -1,59 +1,41 @@ -use std::pin::Pin; -use std::task::{Context, Poll}; - -use axum::body::Body; use futures::Future; use http_body::Frame; use hyper::body::Incoming; use hyper::{Request, Response}; use pin_project_lite::pin_project; +use std::pin::Pin; +use std::task::{Context, Poll}; use tower::Service; type BoxError = Box; /// Generate a [`HybridService`] -pub(crate) fn hybrid( - make_rest: MakeRest, - grpc: Grpc, - admin: Admin, -) -> HybridService { - HybridService { - rest: make_rest, - grpc, - admin, - } +pub(crate) fn hybrid(make_rest: MakeRest, grpc: Grpc) -> HybridService { + HybridService { rest: make_rest, grpc } } /// The service that can serve both gRPC and REST HTTP Requests #[derive(Clone)] -pub struct HybridService { +pub struct HybridService { rest: Rest, grpc: Grpc, - admin: Admin, } -impl Service> for HybridService +impl Service> for HybridService where Rest: Service, Response = Response>, Grpc: Service, Response = Response>, - Admin: Service, Response = Response>, Rest::Error: Into, Grpc::Error: Into, - Admin::Error: Into, { - type Response = Response>; + type Response = Response>; type Error = BoxError; - type Future = HybridFuture; + type Future = HybridFuture; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { match self.rest.poll_ready(cx) { Poll::Ready(Ok(())) => match self.grpc.poll_ready(cx) { - Poll::Ready(Ok(())) => match self.admin.poll_ready(cx) { - Poll::Ready(Ok(())) => Poll::Ready(Ok(())), - Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())), - Poll::Pending => Poll::Pending, - }, - + Poll::Ready(Ok(())) => Poll::Ready(Ok(())), Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())), Poll::Pending => Poll::Pending, }, @@ -72,13 +54,6 @@ where grpc_future: self.grpc.call(req), }, - _ if req.uri().path().starts_with("/rustfs") => HybridFuture::Admin { - admin_future: self.admin.call({ - let (parts, body) = req.into_parts(); - Request::from_parts(parts, Body::new(body).into()) - }), - }, - _ => HybridFuture::Rest { rest_future: self.rest.call(req), }, @@ -90,7 +65,7 @@ pin_project! { /// A hybrid HTTP body that will be used in the response type for the /// [`HybridFuture`], i.e., the output of the [`HybridService`] #[project = HybridBodyProj] - pub enum HybridBody { + pub enum HybridBody { Rest { #[pin] rest_body: RestBody @@ -99,21 +74,15 @@ pin_project! { #[pin] grpc_body: GrpcBody }, - Admin { - #[pin] - admin_body: AdminBody - }, } } -impl http_body::Body for HybridBody +impl http_body::Body for HybridBody where RestBody: http_body::Body + Send + Unpin, GrpcBody: http_body::Body + Send + Unpin, - AdminBody: http_body::Body + Send + Unpin, RestBody::Error: Into, GrpcBody::Error: Into, - AdminBody::Error: Into, { type Data = RestBody::Data; type Error = BoxError; @@ -122,7 +91,6 @@ where match self { Self::Rest { rest_body } => rest_body.is_end_stream(), Self::Grpc { grpc_body } => grpc_body.is_end_stream(), - Self::Admin { admin_body } => admin_body.is_end_stream(), } } @@ -130,7 +98,6 @@ where match self.project() { HybridBodyProj::Rest { rest_body } => rest_body.poll_frame(cx).map_err(Into::into), HybridBodyProj::Grpc { grpc_body } => grpc_body.poll_frame(cx).map_err(Into::into), - HybridBodyProj::Admin { admin_body } => admin_body.poll_frame(cx).map_err(Into::into), } } @@ -138,7 +105,6 @@ where match self { Self::Rest { rest_body } => rest_body.size_hint(), Self::Grpc { grpc_body } => grpc_body.size_hint(), - Self::Admin { admin_body } => admin_body.size_hint(), } } } @@ -147,7 +113,7 @@ pin_project! { /// A future that accepts an HTTP request as input and returns an HTTP /// response as output for the [`HybridService`] #[project = HybridFutureProj] - pub enum HybridFuture { + pub enum HybridFuture { Rest { #[pin] rest_future: RestFuture, @@ -156,25 +122,17 @@ pin_project! { #[pin] grpc_future: GrpcFuture, }, - - Admin { - #[pin] - admin_future: AdminFuture, - } } } -impl Future - for HybridFuture +impl Future for HybridFuture where RestFuture: Future, RestError>>, GrpcFuture: Future, GrpcError>>, - AdminFuture: Future, AdminError>>, RestError: Into, GrpcError: Into, - AdminError: Into, { - type Output = Result>, BoxError>; + type Output = Result>, BoxError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.project() { @@ -188,11 +146,6 @@ where Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())), Poll::Pending => Poll::Pending, }, - HybridFutureProj::Admin { admin_future } => match admin_future.poll(cx) { - Poll::Ready(Ok(res)) => Poll::Ready(Ok(res.map(|admin_body| HybridBody::Admin { admin_body }))), - Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())), - Poll::Pending => Poll::Pending, - }, } } } diff --git a/scripts/run.sh b/scripts/run.sh index 371bd4113..e7bd04f0f 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -6,9 +6,9 @@ mkdir -p ./target/volume/test mkdir -p ./target/volume/test{0..4} -# if [ -z "$RUST_LOG" ]; then -# export RUST_LOG="rustfs=debug,ecstore=debug,s3s=debug,reader=debug" -# fi +if [ -z "$RUST_LOG" ]; then + export RUST_LOG="rustfs=debug,ecstore=debug,s3s=debug,reader=debug,router=debug" +fi # export RUSTFS_ERASURE_SET_DRIVE_COUNT=5