diff --git a/crates/ecstore/src/notification_sys.rs b/crates/ecstore/src/notification_sys.rs index d6906cdbd..4f2488663 100644 --- a/crates/ecstore/src/notification_sys.rs +++ b/crates/ecstore/src/notification_sys.rs @@ -22,7 +22,7 @@ use crate::rpc::PeerRestClient; use crate::{endpoints::EndpointServerPools, new_object_layer_fn}; use futures::future::join_all; use lazy_static::lazy_static; -use rustfs_madmin::health::{Cpus, MemInfo, OsInfo, Partitions, ProcInfo, SysConfig, SysErrors, SysService}; +use rustfs_madmin::health::{Cpus, MemInfo, OsInfo, Partitions, ProcInfo, SysConfig, SysErrors, SysServices}; use rustfs_madmin::metrics::RealtimeMetrics; use rustfs_madmin::net::NetInfo; use rustfs_madmin::{ItemState, ServerProperties}; @@ -622,14 +622,14 @@ impl NotificationSys { join_all(futures).await } - pub async fn get_sys_services(&self) -> Vec { + pub async fn get_sys_services(&self) -> Vec { let mut futures = Vec::with_capacity(self.peer_clients.len()); for client in self.peer_clients.iter().cloned() { futures.push(async move { if let Some(client) = client { client.get_se_linux_info().await.unwrap_or_default() } else { - SysService::default() + SysServices::default() } }); } diff --git a/crates/ecstore/src/rpc/peer_rest_client.rs b/crates/ecstore/src/rpc/peer_rest_client.rs index 46dae7005..aab0b0322 100644 --- a/crates/ecstore/src/rpc/peer_rest_client.rs +++ b/crates/ecstore/src/rpc/peer_rest_client.rs @@ -23,7 +23,7 @@ use crate::{ use rmp_serde::{Deserializer, Serializer}; use rustfs_madmin::{ ServerProperties, - health::{Cpus, MemInfo, OsInfo, Partitions, ProcInfo, SysConfig, SysErrors, SysService}, + health::{Cpus, MemInfo, OsInfo, Partitions, ProcInfo, SysConfig, SysErrors, SysServices}, metrics::RealtimeMetrics, net::NetInfo, }; @@ -361,7 +361,7 @@ impl PeerRestClient { Ok(os_info) } - pub async fn get_se_linux_info(&self) -> Result { + pub async fn get_se_linux_info(&self) -> Result { self.finalize_result( async { let mut client = self.get_client().await?; @@ -377,7 +377,7 @@ impl PeerRestClient { let data = response.sys_services; let mut buf = Deserializer::new(Cursor::new(data)); - let sys_services: SysService = Deserialize::deserialize(&mut buf)?; + let sys_services: SysServices = Deserialize::deserialize(&mut buf)?; Ok(sys_services) } diff --git a/rustfs/src/storage/rpc/health.rs b/rustfs/src/storage/rpc/health.rs index 3109299f7..57eec92ff 100644 --- a/rustfs/src/storage/rpc/health.rs +++ b/rustfs/src/storage/rpc/health.rs @@ -13,6 +13,7 @@ // limitations under the License. use super::*; +use crate::storage::rpc::encode_msgpack_map; impl NodeService { pub(super) async fn handle_get_proc_info( @@ -21,19 +22,18 @@ impl NodeService { ) -> Result, Status> { let addr = get_global_local_node_name().await; let info = get_proc_info(&addr); - let mut buf = Vec::new(); - if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) { - return Ok(Response::new(GetProcInfoResponse { + match encode_msgpack_map(&info) { + Ok(buf) => Ok(Response::new(GetProcInfoResponse { + success: true, + proc_info: buf.into(), + error_info: None, + })), + Err(err) => Ok(Response::new(GetProcInfoResponse { success: false, proc_info: Bytes::new(), error_info: Some(err.to_string()), - })); + })), } - Ok(Response::new(GetProcInfoResponse { - success: true, - proc_info: buf.into(), - error_info: None, - })) } pub(super) async fn handle_get_mem_info( @@ -42,19 +42,18 @@ impl NodeService { ) -> Result, Status> { let addr = get_global_local_node_name().await; let info = get_mem_info(&addr); - let mut buf = Vec::new(); - if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) { - return Ok(Response::new(GetMemInfoResponse { + match encode_msgpack_map(&info) { + Ok(buf) => Ok(Response::new(GetMemInfoResponse { + success: true, + mem_info: buf.into(), + error_info: None, + })), + Err(err) => Ok(Response::new(GetMemInfoResponse { success: false, mem_info: Bytes::new(), error_info: Some(err.to_string()), - })); + })), } - Ok(Response::new(GetMemInfoResponse { - success: true, - mem_info: buf.into(), - error_info: None, - })) } pub(super) async fn handle_get_sys_errors( @@ -63,19 +62,18 @@ impl NodeService { ) -> Result, Status> { let addr = get_global_local_node_name().await; let info = get_sys_errors(&addr); - let mut buf = Vec::new(); - if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) { - return Ok(Response::new(GetSysErrorsResponse { + match encode_msgpack_map(&info) { + Ok(buf) => Ok(Response::new(GetSysErrorsResponse { + success: true, + sys_errors: buf.into(), + error_info: None, + })), + Err(err) => Ok(Response::new(GetSysErrorsResponse { success: false, sys_errors: Bytes::new(), error_info: Some(err.to_string()), - })); + })), } - Ok(Response::new(GetSysErrorsResponse { - success: true, - sys_errors: buf.into(), - error_info: None, - })) } pub(super) async fn handle_get_sys_config( @@ -84,19 +82,18 @@ impl NodeService { ) -> Result, Status> { let addr = get_global_local_node_name().await; let info = get_sys_config(&addr); - let mut buf = Vec::new(); - if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) { - return Ok(Response::new(GetSysConfigResponse { + match encode_msgpack_map(&info) { + Ok(buf) => Ok(Response::new(GetSysConfigResponse { + success: true, + sys_config: buf.into(), + error_info: None, + })), + Err(err) => Ok(Response::new(GetSysConfigResponse { success: false, sys_config: Bytes::new(), error_info: Some(err.to_string()), - })); + })), } - Ok(Response::new(GetSysConfigResponse { - success: true, - sys_config: buf.into(), - error_info: None, - })) } pub(super) async fn handle_get_se_linux_info( @@ -105,19 +102,18 @@ impl NodeService { ) -> Result, Status> { let addr = get_global_local_node_name().await; let info = get_sys_services(&addr); - let mut buf = Vec::new(); - if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) { - return Ok(Response::new(GetSeLinuxInfoResponse { + match encode_msgpack_map(&info) { + Ok(buf) => Ok(Response::new(GetSeLinuxInfoResponse { + success: true, + sys_services: buf.into(), + error_info: None, + })), + Err(err) => Ok(Response::new(GetSeLinuxInfoResponse { success: false, sys_services: Bytes::new(), error_info: Some(err.to_string()), - })); + })), } - Ok(Response::new(GetSeLinuxInfoResponse { - success: true, - sys_services: buf.into(), - error_info: None, - })) } pub(super) async fn handle_get_os_info( @@ -125,19 +121,18 @@ impl NodeService { _request: Request, ) -> Result, Status> { let os_info = get_os_info(); - let mut buf = Vec::new(); - if let Err(err) = os_info.serialize(&mut Serializer::new(&mut buf)) { - return Ok(Response::new(GetOsInfoResponse { + match encode_msgpack_map(&os_info) { + Ok(buf) => Ok(Response::new(GetOsInfoResponse { + success: true, + os_info: buf.into(), + error_info: None, + })), + Err(err) => Ok(Response::new(GetOsInfoResponse { success: false, os_info: Bytes::new(), error_info: Some(err.to_string()), - })); + })), } - Ok(Response::new(GetOsInfoResponse { - success: true, - os_info: buf.into(), - error_info: None, - })) } pub(super) async fn handle_get_partitions( @@ -145,19 +140,18 @@ impl NodeService { _request: Request, ) -> Result, Status> { let partitions = get_partitions(); - let mut buf = Vec::new(); - if let Err(err) = partitions.serialize(&mut Serializer::new(&mut buf)) { - return Ok(Response::new(GetPartitionsResponse { + match encode_msgpack_map(&partitions) { + Ok(buf) => Ok(Response::new(GetPartitionsResponse { + success: true, + partitions: buf.into(), + error_info: None, + })), + Err(err) => Ok(Response::new(GetPartitionsResponse { success: false, partitions: Bytes::new(), error_info: Some(err.to_string()), - })); + })), } - Ok(Response::new(GetPartitionsResponse { - success: true, - partitions: buf.into(), - error_info: None, - })) } pub(super) async fn handle_get_net_info( @@ -166,36 +160,34 @@ impl NodeService { ) -> Result, Status> { let addr = get_global_local_node_name().await; let info = get_net_info(&addr, ""); - let mut buf = Vec::new(); - if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) { - return Ok(Response::new(GetNetInfoResponse { + match encode_msgpack_map(&info) { + Ok(buf) => Ok(Response::new(GetNetInfoResponse { + success: true, + net_info: buf.into(), + error_info: None, + })), + Err(err) => Ok(Response::new(GetNetInfoResponse { success: false, net_info: Bytes::new(), error_info: Some(err.to_string()), - })); + })), } - Ok(Response::new(GetNetInfoResponse { - success: true, - net_info: buf.into(), - error_info: None, - })) } pub(super) async fn handle_get_cpus(&self, _request: Request) -> Result, Status> { let info = get_cpus(); - let mut buf = Vec::new(); - if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) { - return Ok(Response::new(GetCpusResponse { + match encode_msgpack_map(&info) { + Ok(buf) => Ok(Response::new(GetCpusResponse { + success: true, + cpus: buf.into(), + error_info: None, + })), + Err(err) => Ok(Response::new(GetCpusResponse { success: false, cpus: Bytes::new(), error_info: Some(err.to_string()), - })); + })), } - Ok(Response::new(GetCpusResponse { - success: true, - cpus: buf.into(), - error_info: None, - })) } pub(super) async fn handle_server_info( @@ -203,29 +195,24 @@ impl NodeService { _request: Request, ) -> Result, Status> { let info = get_local_server_property().await; - let mut buf = Vec::new(); - // Use map encoding for forward/backward compatibility across mixed versions: - // unknown fields can be ignored by older nodes during deserialization. - if let Err(err) = info.serialize(&mut Serializer::new(&mut buf).with_struct_map()) { - return Ok(Response::new(ServerInfoResponse { + match encode_msgpack_map(&info) { + Ok(buf) => Ok(Response::new(ServerInfoResponse { + success: true, + server_properties: buf.into(), + error_info: None, + })), + Err(err) => Ok(Response::new(ServerInfoResponse { success: false, server_properties: Bytes::new(), error_info: Some(err.to_string()), - })); + })), } - Ok(Response::new(ServerInfoResponse { - success: true, - server_properties: buf.into(), - error_info: None, - })) } pub(super) async fn handle_local_storage_info( &self, _request: Request, ) -> Result, Status> { - // let request = request.into_inner(); - let Some(store) = new_object_layer_fn() else { return Ok(Response::new(LocalStorageInfoResponse { success: false, @@ -235,19 +222,17 @@ impl NodeService { }; let info = store.local_storage_info().await; - let mut buf = Vec::new(); - if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) { - return Ok(Response::new(LocalStorageInfoResponse { + match encode_msgpack_map(&info) { + Ok(buf) => Ok(Response::new(LocalStorageInfoResponse { + success: true, + storage_info: buf.into(), + error_info: None, + })), + Err(err) => Ok(Response::new(LocalStorageInfoResponse { success: false, storage_info: Bytes::new(), error_info: Some(err.to_string()), - })); + })), } - - Ok(Response::new(LocalStorageInfoResponse { - success: true, - storage_info: buf.into(), - error_info: None, - })) } } diff --git a/rustfs/src/storage/rpc/metrics.rs b/rustfs/src/storage/rpc/metrics.rs index f029ee32e..e64d56938 100644 --- a/rustfs/src/storage/rpc/metrics.rs +++ b/rustfs/src/storage/rpc/metrics.rs @@ -13,6 +13,7 @@ // limitations under the License. use super::*; +use crate::storage::rpc::encode_msgpack_map; impl NodeService { pub(super) async fn handle_get_metrics( @@ -50,19 +51,17 @@ impl NodeService { }; let info = collect_local_metrics(t, &opts).await; - - let mut buf = Vec::new(); - if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) { - return Ok(Response::new(GetMetricsResponse { + match encode_msgpack_map(&info) { + Ok(buf) => Ok(Response::new(GetMetricsResponse { + success: true, + realtime_metrics: buf.into(), + error_info: None, + })), + Err(err) => Ok(Response::new(GetMetricsResponse { success: false, realtime_metrics: Bytes::new(), error_info: Some(err.to_string()), - })); + })), } - Ok(Response::new(GetMetricsResponse { - success: true, - realtime_metrics: buf.into(), - error_info: None, - })) } } diff --git a/rustfs/src/storage/rpc/mod.rs b/rustfs/src/storage/rpc/mod.rs index bef5bb854..bec1a4e23 100644 --- a/rustfs/src/storage/rpc/mod.rs +++ b/rustfs/src/storage/rpc/mod.rs @@ -17,3 +17,77 @@ pub mod node_service; pub use http_service::InternodeRpcService; pub use node_service::{NodeService, make_server}; + +use rmp_serde::Serializer; +use serde::Serialize; + +/// Encode a value as map-keyed msgpack for internode RPC responses. +/// +/// Uses `.with_struct_map()` so structs are serialized with named fields +/// (msgpack map) instead of positional arrays. This matches what the +/// client-side `Deserializer::new()` expects. +pub(crate) fn encode_msgpack_map(value: &T) -> Result, rmp_serde::encode::Error> { + let mut buf = Vec::new(); + value.serialize(&mut Serializer::new(&mut buf).with_struct_map())?; + Ok(buf) +} + +#[cfg(test)] +mod tests { + use super::*; + use rmp_serde::Deserializer; + use serde::Deserialize; + use std::collections::HashMap; + use std::io::Cursor; + + #[derive(Debug, PartialEq, Serialize, Deserialize)] + struct Simple { + name: String, + count: u32, + } + + #[derive(Debug, PartialEq, Serialize, Deserialize)] + struct Nested { + label: String, + tags: HashMap, + #[serde(skip_serializing_if = "Option::is_none")] + optional: Option, + } + + #[test] + fn encode_decode_round_trip() { + let val = Simple { + name: "rustfs".into(), + count: 42, + }; + let buf = encode_msgpack_map(&val).unwrap(); + let decoded: Simple = Deserialize::deserialize(&mut Deserializer::new(Cursor::new(&buf))).unwrap(); + assert_eq!(val, decoded); + } + + #[test] + fn encode_produces_map_not_array() { + let val = Simple { + name: "test".into(), + count: 1, + }; + let buf = encode_msgpack_map(&val).unwrap(); + // Map marker for 2 fields: fixmap with N=2 is 0x82 + assert_eq!(buf[0], 0x82, "expected msgpack fixmap marker, got array"); + } + + #[test] + fn nested_struct_with_optional_and_hashmap() { + let mut tags = HashMap::new(); + tags.insert("env".into(), "production".into()); + + let val = Nested { + label: "node1".into(), + tags, + optional: None, + }; + let buf = encode_msgpack_map(&val).unwrap(); + let decoded: Nested = Deserialize::deserialize(&mut Deserializer::new(Cursor::new(&buf))).unwrap(); + assert_eq!(val, decoded); + } +} diff --git a/rustfs/src/storage/rpc/node_service.rs b/rustfs/src/storage/rpc/node_service.rs index 0f8a473b7..c19714fed 100644 --- a/rustfs/src/storage/rpc/node_service.rs +++ b/rustfs/src/storage/rpc/node_service.rs @@ -16,7 +16,7 @@ use crate::admin::service::site_replication::reload_site_replication_runtime_sta use bytes::Bytes; use futures::Stream; use futures_util::future::join_all; -use rmp_serde::{Deserializer, Serializer}; +use rmp_serde::Deserializer; use rustfs_common::{get_global_local_node_name, heal_channel::HealOpts}; use rustfs_ecstore::{ admin_server_info::get_local_server_property, @@ -43,7 +43,7 @@ use rustfs_protos::{ models::{PingBody, PingBodyBuilder}, proto_gen::node_service::{node_service_server::NodeService as Node, *}, }; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; use std::{io::Cursor, pin::Pin, sync::Arc}; use tokio::spawn; use tokio::sync::mpsc; @@ -1982,6 +1982,156 @@ mod tests { assert!(!proc_response.proc_info.is_empty()); } + #[tokio::test] + async fn test_get_proc_info_round_trip() { + let service = create_test_node_service(); + let response = service + .get_proc_info(Request::new(GetProcInfoRequest {})) + .await + .unwrap() + .into_inner(); + assert!(response.success); + let mut de = rmp_serde::Deserializer::new(std::io::Cursor::new(response.proc_info)); + let _: rustfs_madmin::health::ProcInfo = serde::Deserialize::deserialize(&mut de).expect("ProcInfo round-trip failed"); + } + + #[tokio::test] + async fn test_get_mem_info_round_trip() { + let service = create_test_node_service(); + let response = service + .get_mem_info(Request::new(GetMemInfoRequest {})) + .await + .unwrap() + .into_inner(); + assert!(response.success); + let mut de = rmp_serde::Deserializer::new(std::io::Cursor::new(response.mem_info)); + let _: rustfs_madmin::health::MemInfo = serde::Deserialize::deserialize(&mut de).expect("MemInfo round-trip failed"); + } + + #[tokio::test] + async fn test_get_sys_errors_round_trip() { + let service = create_test_node_service(); + let response = service + .get_sys_errors(Request::new(GetSysErrorsRequest {})) + .await + .unwrap() + .into_inner(); + assert!(response.success); + let mut de = rmp_serde::Deserializer::new(std::io::Cursor::new(response.sys_errors)); + let _: rustfs_madmin::health::SysErrors = serde::Deserialize::deserialize(&mut de).expect("SysErrors round-trip failed"); + } + + #[tokio::test] + async fn test_get_sys_config_round_trip() { + let service = create_test_node_service(); + let response = service + .get_sys_config(Request::new(GetSysConfigRequest {})) + .await + .unwrap() + .into_inner(); + assert!(response.success); + let mut de = rmp_serde::Deserializer::new(std::io::Cursor::new(response.sys_config)); + let _: rustfs_madmin::health::SysConfig = serde::Deserialize::deserialize(&mut de).expect("SysConfig round-trip failed"); + } + + #[tokio::test] + async fn test_get_se_linux_info_round_trip() { + let service = create_test_node_service(); + let response = service + .get_se_linux_info(Request::new(GetSeLinuxInfoRequest {})) + .await + .unwrap() + .into_inner(); + assert!(response.success); + let mut de = rmp_serde::Deserializer::new(std::io::Cursor::new(response.sys_services)); + let _: rustfs_madmin::health::SysServices = + serde::Deserialize::deserialize(&mut de).expect("SysServices round-trip failed"); + } + + #[tokio::test] + async fn test_get_os_info_round_trip() { + let service = create_test_node_service(); + let response = service + .get_os_info(Request::new(GetOsInfoRequest {})) + .await + .unwrap() + .into_inner(); + assert!(response.success); + let mut de = rmp_serde::Deserializer::new(std::io::Cursor::new(response.os_info)); + let _: rustfs_madmin::health::OsInfo = serde::Deserialize::deserialize(&mut de).expect("OsInfo round-trip failed"); + } + + #[tokio::test] + async fn test_get_partitions_round_trip() { + let service = create_test_node_service(); + let response = service + .get_partitions(Request::new(GetPartitionsRequest {})) + .await + .unwrap() + .into_inner(); + assert!(response.success); + let mut de = rmp_serde::Deserializer::new(std::io::Cursor::new(response.partitions)); + let _: rustfs_madmin::health::Partitions = + serde::Deserialize::deserialize(&mut de).expect("Partitions round-trip failed"); + } + + #[tokio::test] + async fn test_get_net_info_round_trip() { + let service = create_test_node_service(); + let response = service + .get_net_info(Request::new(GetNetInfoRequest {})) + .await + .unwrap() + .into_inner(); + assert!(response.success); + let mut de = rmp_serde::Deserializer::new(std::io::Cursor::new(response.net_info)); + let _: rustfs_madmin::net::NetInfo = serde::Deserialize::deserialize(&mut de).expect("NetInfo round-trip failed"); + } + + #[tokio::test] + async fn test_get_cpus_round_trip() { + let service = create_test_node_service(); + let response = service.get_cpus(Request::new(GetCpusRequest {})).await.unwrap().into_inner(); + assert!(response.success); + let mut de = rmp_serde::Deserializer::new(std::io::Cursor::new(response.cpus)); + let _: rustfs_madmin::health::Cpus = serde::Deserialize::deserialize(&mut de).expect("Cpus round-trip failed"); + } + + #[tokio::test] + async fn test_server_info_round_trip() { + let service = create_test_node_service(); + let response = service + .server_info(Request::new(ServerInfoRequest { metrics: false })) + .await + .unwrap() + .into_inner(); + assert!(response.success); + let mut de = rmp_serde::Deserializer::new(std::io::Cursor::new(response.server_properties)); + let _: rustfs_madmin::ServerProperties = + serde::Deserialize::deserialize(&mut de).expect("ServerProperties round-trip failed"); + } + + #[tokio::test] + async fn test_get_metrics_round_trip() { + let service = create_test_node_service(); + let metric_type = MetricType::DISK; + let opts = CollectMetricsOpts::default(); + let metric_type_bytes = rmp_serde::to_vec(&metric_type).unwrap(); + let opts_bytes = rmp_serde::to_vec(&opts).unwrap(); + let response = service + .get_metrics(Request::new(GetMetricsRequest { + metric_type: Bytes::from(metric_type_bytes), + opts: Bytes::from(opts_bytes), + })) + .await + .unwrap() + .into_inner(); + assert!(response.success); + let mut de = rmp_serde::Deserializer::new(std::io::Cursor::new(response.realtime_metrics)); + let _: rustfs_madmin::metrics::RealtimeMetrics = + serde::Deserialize::deserialize(&mut de).expect("RealtimeMetrics round-trip failed"); + } + #[tokio::test] #[ignore = "requires isolated global object layer state"] async fn test_reload_pool_meta() {