init admin_route

This commit is contained in:
weisd
2024-11-05 17:41:35 +08:00
parent ce93715287
commit e73a055bc3
12 changed files with 234 additions and 94 deletions

63
Cargo.lock generated
View File

@@ -183,7 +183,7 @@ dependencies = [
"hyper",
"hyper-util",
"itoa",
"matchit",
"matchit 0.7.3",
"memchr",
"mime",
"percent-encoding",
@@ -506,6 +506,7 @@ dependencies = [
"block-buffer 0.11.0-rc.2",
"const-oid",
"crypto-common 0.2.0-rc.1",
"subtle",
]
[[package]]
@@ -564,7 +565,7 @@ dependencies = [
"s3s-policy",
"serde",
"serde_json",
"sha2 0.11.0-pre.4",
"sha2",
"siphasher",
"tempfile",
"thiserror",
@@ -865,11 +866,11 @@ checksum = "c706f1711006204c2ba8fb1a7bd55f689bbf7feca9ff40325206b5e140cff6df"
[[package]]
name = "hmac"
version = "0.12.1"
version = "0.13.0-pre.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e"
checksum = "e4b1fb14e4df79f9406b434b60acef9f45c26c50062cccf1346c6103b8c47d58"
dependencies = [
"digest 0.10.7",
"digest 0.11.0-pre.9",
]
[[package]]
@@ -1134,6 +1135,12 @@ version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
[[package]]
name = "matchit"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3"
[[package]]
name = "md-5"
version = "0.10.6"
@@ -1633,9 +1640,9 @@ dependencies = [
[[package]]
name = "quick-xml"
version = "0.36.2"
version = "0.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7649a7b4df05aed9ea7ec6f628c67c9953a43869b8bc50929569b2999d443fe"
checksum = "ffbfb3ddf5364c9cfcd65549a1e7b801d0e8d1b14c1a1590a6408aa93cfbfa84"
dependencies = [
"memchr",
"serde",
@@ -1690,7 +1697,7 @@ dependencies = [
"md-5",
"pin-project-lite",
"s3s",
"sha2 0.11.0-pre.4",
"sha2",
"thiserror",
"tokio",
"tracing",
@@ -1801,6 +1808,19 @@ dependencies = [
"serde",
]
[[package]]
name = "router"
version = "0.0.1"
dependencies = [
"async-trait",
"common",
"hyper",
"matchit 0.8.4",
"pin-project-lite",
"s3s",
"tracing",
]
[[package]]
name = "rustc-demangle"
version = "0.1.24"
@@ -1838,6 +1858,7 @@ dependencies = [
"lazy_static",
"lock",
"log",
"matchit 0.8.4",
"mime",
"netif",
"pin-project-lite",
@@ -1846,6 +1867,7 @@ dependencies = [
"prost-types",
"protobuf",
"protos",
"router",
"s3s",
"serde_json",
"time",
@@ -1932,7 +1954,7 @@ checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f"
[[package]]
name = "s3s"
version = "0.11.0-dev"
source = "git+https://github.com/Nugine/s3s.git?rev=0caf79822ae1f2e7a7fad3c4a093c94dd2e2c33f#0caf79822ae1f2e7a7fad3c4a093c94dd2e2c33f"
source = "git+https://github.com/Nugine/s3s.git?rev=c41ac1d30a0ae3ded6a18cc1b5bbb98f8b7f35c2#c41ac1d30a0ae3ded6a18cc1b5bbb98f8b7f35c2"
dependencies = [
"arrayvec",
"async-trait",
@@ -1943,7 +1965,7 @@ dependencies = [
"chrono",
"crc32c",
"crc32fast",
"digest 0.10.7",
"digest 0.11.0-pre.9",
"futures",
"hex-simd",
"hmac",
@@ -1962,7 +1984,7 @@ dependencies = [
"serde",
"serde_urlencoded",
"sha1",
"sha2 0.10.8",
"sha2",
"smallvec",
"sync_wrapper 1.0.1",
"thiserror",
@@ -1978,7 +2000,7 @@ dependencies = [
[[package]]
name = "s3s-policy"
version = "0.11.0-dev"
source = "git+https://github.com/Nugine/s3s.git?rev=0caf79822ae1f2e7a7fad3c4a093c94dd2e2c33f#0caf79822ae1f2e7a7fad3c4a093c94dd2e2c33f"
source = "git+https://github.com/Nugine/s3s.git?rev=c41ac1d30a0ae3ded6a18cc1b5bbb98f8b7f35c2#c41ac1d30a0ae3ded6a18cc1b5bbb98f8b7f35c2"
dependencies = [
"indexmap 2.6.0",
"serde",
@@ -2054,24 +2076,13 @@ dependencies = [
[[package]]
name = "sha1"
version = "0.10.6"
version = "0.11.0-pre.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
checksum = "9540978cef7a8498211c1b1c14e5ce920fe5bd524ea84f4a3d72d4602515ae93"
dependencies = [
"cfg-if",
"cpufeatures",
"digest 0.10.7",
]
[[package]]
name = "sha2"
version = "0.10.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8"
dependencies = [
"cfg-if",
"cpufeatures",
"digest 0.10.7",
"digest 0.11.0-pre.9",
]
[[package]]

View File

@@ -9,6 +9,7 @@ members = [
"common/protos",
"api/admin",
"reader",
"router",
]
[workspace.package]
@@ -49,10 +50,10 @@ prost-types = "0.13.3"
protobuf = "3.7"
protos = { path = "./common/protos" }
rand = "0.8.5"
s3s = { git = "https://github.com/Nugine/s3s.git", rev = "0caf79822ae1f2e7a7fad3c4a093c94dd2e2c33f", default-features = true, features = [
s3s = { git = "https://github.com/Nugine/s3s.git", rev = "c41ac1d30a0ae3ded6a18cc1b5bbb98f8b7f35c2", default-features = true, features = [
"tower",
] }
s3s-policy = { git = "https://github.com/Nugine/s3s.git", rev = "0caf79822ae1f2e7a7fad3c4a093c94dd2e2c33f" }
s3s-policy = { git = "https://github.com/Nugine/s3s.git", rev = "c41ac1d30a0ae3ded6a18cc1b5bbb98f8b7f35c2" }
serde = { version = "1.0.214", features = ["derive"] }
serde_json = "1.0.132"
tempfile = "3.13.0"

17
router/Cargo.toml Normal file
View File

@@ -0,0 +1,17 @@
[package]
name = "router"
edition.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
version.workspace = true
[dependencies]
async-trait.workspace = true
tracing.workspace = true
s3s.workspace = true
hyper.workspace = true
matchit = "0.8.4"
pin-project-lite.workspace = true
common.workspace = true

1
router/src/handler.rs Normal file
View File

@@ -0,0 +1 @@
pub trait Handler: Send + Sync + 'static {}

3
router/src/lib.rs Normal file
View File

@@ -0,0 +1,3 @@
pub mod handler;
pub mod operation;
pub mod router;

10
router/src/operation.rs Normal file
View File

@@ -0,0 +1,10 @@
use hyper::{Method, StatusCode};
use matchit::Params;
use s3s::{Body, S3Request, S3Response, S3Result};
#[async_trait::async_trait]
pub trait Operation: Send + Sync + 'static {
fn method(&self) -> Method;
fn uri(&self) -> &'static str;
async fn call(&self, req: S3Request<Body>, params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>>;
}

72
router/src/router.rs Normal file
View File

@@ -0,0 +1,72 @@
use std::collections::HashMap;
use std::str::FromStr;
use hyper::http::Extensions;
use hyper::HeaderMap;
use hyper::Method;
use hyper::StatusCode;
use hyper::Uri;
use s3s::route::S3Route;
use s3s::s3_error;
use s3s::Body;
use s3s::S3Request;
use s3s::S3Response;
use s3s::S3Result;
use tracing::warn;
use crate::operation::Operation;
use common::error::Result;
use matchit::Router;
const ADMIN_PREFIX: &str = "/rustfs/admin";
// const ADMIN_VERSION: &str = "v3";
pub struct S3Router<T> {
router: Router<T>,
}
impl<T: Operation> S3Router<T> {
pub fn new() -> Self {
let router = Router::new();
Self { router }
}
pub fn insert(&mut self, operation: T) -> Result<()> {
let path = Self::make_route_str(operation.method(), &operation.uri());
warn!("set uri {}", &path);
self.router.insert(path, operation)?;
Ok(())
}
fn make_route_str(method: Method, path: &str) -> String {
format!("{}|{}{}", method.as_str(), ADMIN_PREFIX, path)
}
}
#[async_trait::async_trait]
impl<T> S3Route for S3Router<T>
where
T: Operation,
{
fn is_match(&self, _method: &Method, uri: &Uri, _headers: &HeaderMap, _: &mut Extensions) -> bool {
uri.path().starts_with("/rustfs/admin")
}
async fn call(&self, req: S3Request<Body>) -> S3Result<S3Response<(StatusCode, Body)>> {
let uri = format!("{}|{}", &req.method, req.uri.path());
// warn!("get uri {}", &uri);
if let Ok(mat) = self.router.at(&uri) {
let op: &T = mat.value;
return op.call(req, mat.params).await;
}
return Err(s3_error!(NotImplemented));
}
}

View File

@@ -53,6 +53,8 @@ transform-stream.workspace = true
uuid = "1.11.0"
admin = { path = "../api/admin" }
axum.workspace = true
router = { version = "0.0.1", path = "../router" }
matchit = "0.8.4"
[build-dependencies]
prost-build.workspace = true

View File

@@ -1,5 +1,6 @@
mod config;
mod grpc;
mod route;
mod service;
mod storage;
@@ -113,6 +114,8 @@ async fn run(opt: config::Opt) -> Result<()> {
b.set_access(store.clone());
b.set_route(route::make_admin_route()?);
// // Enable parsing virtual-hosted-style requests
// if let Some(dm) = opt.domain_name {
// info!("virtual-hosted-style requests are enabled use domain_name {}", &dm);
@@ -135,9 +138,8 @@ async fn run(opt: config::Opt) -> Result<()> {
tokio::spawn(async move {
let hyper_service = service.into_shared();
let adm_service = admin::register_admin_router();
let hybrid_service = TowerToHyperService::new(hybrid(hyper_service, rpc_service, adm_service));
let hybrid_service = TowerToHyperService::new(hybrid(hyper_service, rpc_service));
let http_server = ConnBuilder::new(TokioExecutor::new());
let mut ctrl_c = std::pin::pin!(tokio::signal::ctrl_c());

68
rustfs/src/route.rs Normal file
View File

@@ -0,0 +1,68 @@
use common::error::{Error, Result};
use matchit::Params;
use router::{operation::Operation, router::S3Router};
use s3s::route::S3Route;
use s3s::{Body, S3Request, S3Response, S3Result};
use tracing::warn;
use hyper::Method;
use hyper::StatusCode;
pub fn make_admin_route() -> Result<impl S3Route> {
let mut r = S3Router::new();
r.insert(AdminOperation(&InfoHandler {}))?;
r.insert(AdminOperation(&ListPoolHandler {}))?;
Ok(r)
}
struct AdminOperation(&'static dyn Operation);
#[async_trait::async_trait]
impl Operation for AdminOperation {
fn method(&self) -> Method {
self.0.method()
}
fn uri(&self) -> &'static str {
self.0.uri()
}
async fn call(&self, req: S3Request<Body>, params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
self.0.call(req, params).await
}
}
struct InfoHandler {}
#[async_trait::async_trait]
impl Operation for InfoHandler {
fn method(&self) -> Method {
Method::GET
}
fn uri(&self) -> &'static str {
"/v3/info"
}
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle info");
unimplemented!()
}
}
struct ListPoolHandler {}
#[async_trait::async_trait]
impl Operation for ListPoolHandler {
fn method(&self) -> Method {
Method::GET
}
fn uri(&self) -> &'static str {
"/v3/pool/list"
}
async fn call(&self, _req: S3Request<Body>, _params: Params<'_, '_>) -> S3Result<S3Response<(StatusCode, Body)>> {
warn!("handle info");
unimplemented!()
}
}

View File

@@ -1,59 +1,41 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use axum::body::Body;
use futures::Future;
use http_body::Frame;
use hyper::body::Incoming;
use hyper::{Request, Response};
use pin_project_lite::pin_project;
use std::pin::Pin;
use std::task::{Context, Poll};
use tower::Service;
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
/// Generate a [`HybridService`]
pub(crate) fn hybrid<MakeRest, Grpc, Admin>(
make_rest: MakeRest,
grpc: Grpc,
admin: Admin,
) -> HybridService<MakeRest, Grpc, Admin> {
HybridService {
rest: make_rest,
grpc,
admin,
}
pub(crate) fn hybrid<MakeRest, Grpc>(make_rest: MakeRest, grpc: Grpc) -> HybridService<MakeRest, Grpc> {
HybridService { rest: make_rest, grpc }
}
/// The service that can serve both gRPC and REST HTTP Requests
#[derive(Clone)]
pub struct HybridService<Rest, Grpc, Admin> {
pub struct HybridService<Rest, Grpc> {
rest: Rest,
grpc: Grpc,
admin: Admin,
}
impl<Rest, Grpc, Admin, RestBody, GrpcBody> Service<Request<Incoming>> for HybridService<Rest, Grpc, Admin>
impl<Rest, Grpc, RestBody, GrpcBody> Service<Request<Incoming>> for HybridService<Rest, Grpc>
where
Rest: Service<Request<Incoming>, Response = Response<RestBody>>,
Grpc: Service<Request<Incoming>, Response = Response<GrpcBody>>,
Admin: Service<Request<Body>, Response = Response<Body>>,
Rest::Error: Into<BoxError>,
Grpc::Error: Into<BoxError>,
Admin::Error: Into<BoxError>,
{
type Response = Response<HybridBody<RestBody, GrpcBody, Body>>;
type Response = Response<HybridBody<RestBody, GrpcBody>>;
type Error = BoxError;
type Future = HybridFuture<Rest::Future, Grpc::Future, Admin::Future>;
type Future = HybridFuture<Rest::Future, Grpc::Future>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.rest.poll_ready(cx) {
Poll::Ready(Ok(())) => match self.grpc.poll_ready(cx) {
Poll::Ready(Ok(())) => match self.admin.poll_ready(cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
Poll::Pending => Poll::Pending,
},
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
Poll::Pending => Poll::Pending,
},
@@ -72,13 +54,6 @@ where
grpc_future: self.grpc.call(req),
},
_ if req.uri().path().starts_with("/rustfs") => HybridFuture::Admin {
admin_future: self.admin.call({
let (parts, body) = req.into_parts();
Request::from_parts(parts, Body::new(body).into())
}),
},
_ => HybridFuture::Rest {
rest_future: self.rest.call(req),
},
@@ -90,7 +65,7 @@ pin_project! {
/// A hybrid HTTP body that will be used in the response type for the
/// [`HybridFuture`], i.e., the output of the [`HybridService`]
#[project = HybridBodyProj]
pub enum HybridBody<RestBody, GrpcBody, AdminBody> {
pub enum HybridBody<RestBody, GrpcBody> {
Rest {
#[pin]
rest_body: RestBody
@@ -99,21 +74,15 @@ pin_project! {
#[pin]
grpc_body: GrpcBody
},
Admin {
#[pin]
admin_body: AdminBody
},
}
}
impl<RestBody, GrpcBody, AdminBody> http_body::Body for HybridBody<RestBody, GrpcBody, AdminBody>
impl<RestBody, GrpcBody> http_body::Body for HybridBody<RestBody, GrpcBody>
where
RestBody: http_body::Body + Send + Unpin,
GrpcBody: http_body::Body<Data = RestBody::Data> + Send + Unpin,
AdminBody: http_body::Body<Data = RestBody::Data> + Send + Unpin,
RestBody::Error: Into<BoxError>,
GrpcBody::Error: Into<BoxError>,
AdminBody::Error: Into<BoxError>,
{
type Data = RestBody::Data;
type Error = BoxError;
@@ -122,7 +91,6 @@ where
match self {
Self::Rest { rest_body } => rest_body.is_end_stream(),
Self::Grpc { grpc_body } => grpc_body.is_end_stream(),
Self::Admin { admin_body } => admin_body.is_end_stream(),
}
}
@@ -130,7 +98,6 @@ where
match self.project() {
HybridBodyProj::Rest { rest_body } => rest_body.poll_frame(cx).map_err(Into::into),
HybridBodyProj::Grpc { grpc_body } => grpc_body.poll_frame(cx).map_err(Into::into),
HybridBodyProj::Admin { admin_body } => admin_body.poll_frame(cx).map_err(Into::into),
}
}
@@ -138,7 +105,6 @@ where
match self {
Self::Rest { rest_body } => rest_body.size_hint(),
Self::Grpc { grpc_body } => grpc_body.size_hint(),
Self::Admin { admin_body } => admin_body.size_hint(),
}
}
}
@@ -147,7 +113,7 @@ pin_project! {
/// A future that accepts an HTTP request as input and returns an HTTP
/// response as output for the [`HybridService`]
#[project = HybridFutureProj]
pub enum HybridFuture<RestFuture, GrpcFuture, AdminFuture> {
pub enum HybridFuture<RestFuture, GrpcFuture> {
Rest {
#[pin]
rest_future: RestFuture,
@@ -156,25 +122,17 @@ pin_project! {
#[pin]
grpc_future: GrpcFuture,
},
Admin {
#[pin]
admin_future: AdminFuture,
}
}
}
impl<RestFuture, GrpcFuture, AdminFuture, RestBody, GrpcBody, AdminBody, RestError, GrpcError, AdminError> Future
for HybridFuture<RestFuture, GrpcFuture, AdminFuture>
impl<RestFuture, GrpcFuture, RestBody, GrpcBody, RestError, GrpcError> Future for HybridFuture<RestFuture, GrpcFuture>
where
RestFuture: Future<Output = Result<Response<RestBody>, RestError>>,
GrpcFuture: Future<Output = Result<Response<GrpcBody>, GrpcError>>,
AdminFuture: Future<Output = Result<Response<AdminBody>, AdminError>>,
RestError: Into<BoxError>,
GrpcError: Into<BoxError>,
AdminError: Into<BoxError>,
{
type Output = Result<Response<HybridBody<RestBody, GrpcBody, AdminBody>>, BoxError>;
type Output = Result<Response<HybridBody<RestBody, GrpcBody>>, BoxError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project() {
@@ -188,11 +146,6 @@ where
Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
Poll::Pending => Poll::Pending,
},
HybridFutureProj::Admin { admin_future } => match admin_future.poll(cx) {
Poll::Ready(Ok(res)) => Poll::Ready(Ok(res.map(|admin_body| HybridBody::Admin { admin_body }))),
Poll::Ready(Err(err)) => Poll::Ready(Err(err.into())),
Poll::Pending => Poll::Pending,
},
}
}
}

View File

@@ -6,9 +6,9 @@ mkdir -p ./target/volume/test
mkdir -p ./target/volume/test{0..4}
# if [ -z "$RUST_LOG" ]; then
# export RUST_LOG="rustfs=debug,ecstore=debug,s3s=debug,reader=debug"
# fi
if [ -z "$RUST_LOG" ]; then
export RUST_LOG="rustfs=debug,ecstore=debug,s3s=debug,reader=debug,router=debug"
fi
# export RUSTFS_ERASURE_SET_DRIVE_COUNT=5