fix(rpc): use map-encoded msgpack for all internode RPC responses (#2771)

Co-authored-by: 安正超 <anzhengchao@gmail.com>
Co-authored-by: houseme <housemecn@gmail.com>
This commit is contained in:
Alexander Kharkevich
2026-05-04 23:42:29 -04:00
committed by GitHub
parent e6fdcd1ad6
commit 995e26f5ee
6 changed files with 330 additions and 122 deletions

View File

@@ -22,7 +22,7 @@ use crate::rpc::PeerRestClient;
use crate::{endpoints::EndpointServerPools, new_object_layer_fn};
use futures::future::join_all;
use lazy_static::lazy_static;
use rustfs_madmin::health::{Cpus, MemInfo, OsInfo, Partitions, ProcInfo, SysConfig, SysErrors, SysService};
use rustfs_madmin::health::{Cpus, MemInfo, OsInfo, Partitions, ProcInfo, SysConfig, SysErrors, SysServices};
use rustfs_madmin::metrics::RealtimeMetrics;
use rustfs_madmin::net::NetInfo;
use rustfs_madmin::{ItemState, ServerProperties};
@@ -622,14 +622,14 @@ impl NotificationSys {
join_all(futures).await
}
pub async fn get_sys_services(&self) -> Vec<SysService> {
pub async fn get_sys_services(&self) -> Vec<SysServices> {
let mut futures = Vec::with_capacity(self.peer_clients.len());
for client in self.peer_clients.iter().cloned() {
futures.push(async move {
if let Some(client) = client {
client.get_se_linux_info().await.unwrap_or_default()
} else {
SysService::default()
SysServices::default()
}
});
}

View File

@@ -23,7 +23,7 @@ use crate::{
use rmp_serde::{Deserializer, Serializer};
use rustfs_madmin::{
ServerProperties,
health::{Cpus, MemInfo, OsInfo, Partitions, ProcInfo, SysConfig, SysErrors, SysService},
health::{Cpus, MemInfo, OsInfo, Partitions, ProcInfo, SysConfig, SysErrors, SysServices},
metrics::RealtimeMetrics,
net::NetInfo,
};
@@ -361,7 +361,7 @@ impl PeerRestClient {
Ok(os_info)
}
pub async fn get_se_linux_info(&self) -> Result<SysService> {
pub async fn get_se_linux_info(&self) -> Result<SysServices> {
self.finalize_result(
async {
let mut client = self.get_client().await?;
@@ -377,7 +377,7 @@ impl PeerRestClient {
let data = response.sys_services;
let mut buf = Deserializer::new(Cursor::new(data));
let sys_services: SysService = Deserialize::deserialize(&mut buf)?;
let sys_services: SysServices = Deserialize::deserialize(&mut buf)?;
Ok(sys_services)
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use super::*;
use crate::storage::rpc::encode_msgpack_map;
impl NodeService {
pub(super) async fn handle_get_proc_info(
@@ -21,19 +22,18 @@ impl NodeService {
) -> Result<Response<GetProcInfoResponse>, Status> {
let addr = get_global_local_node_name().await;
let info = get_proc_info(&addr);
let mut buf = Vec::new();
if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) {
return Ok(Response::new(GetProcInfoResponse {
match encode_msgpack_map(&info) {
Ok(buf) => Ok(Response::new(GetProcInfoResponse {
success: true,
proc_info: buf.into(),
error_info: None,
})),
Err(err) => Ok(Response::new(GetProcInfoResponse {
success: false,
proc_info: Bytes::new(),
error_info: Some(err.to_string()),
}));
})),
}
Ok(Response::new(GetProcInfoResponse {
success: true,
proc_info: buf.into(),
error_info: None,
}))
}
pub(super) async fn handle_get_mem_info(
@@ -42,19 +42,18 @@ impl NodeService {
) -> Result<Response<GetMemInfoResponse>, Status> {
let addr = get_global_local_node_name().await;
let info = get_mem_info(&addr);
let mut buf = Vec::new();
if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) {
return Ok(Response::new(GetMemInfoResponse {
match encode_msgpack_map(&info) {
Ok(buf) => Ok(Response::new(GetMemInfoResponse {
success: true,
mem_info: buf.into(),
error_info: None,
})),
Err(err) => Ok(Response::new(GetMemInfoResponse {
success: false,
mem_info: Bytes::new(),
error_info: Some(err.to_string()),
}));
})),
}
Ok(Response::new(GetMemInfoResponse {
success: true,
mem_info: buf.into(),
error_info: None,
}))
}
pub(super) async fn handle_get_sys_errors(
@@ -63,19 +62,18 @@ impl NodeService {
) -> Result<Response<GetSysErrorsResponse>, Status> {
let addr = get_global_local_node_name().await;
let info = get_sys_errors(&addr);
let mut buf = Vec::new();
if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) {
return Ok(Response::new(GetSysErrorsResponse {
match encode_msgpack_map(&info) {
Ok(buf) => Ok(Response::new(GetSysErrorsResponse {
success: true,
sys_errors: buf.into(),
error_info: None,
})),
Err(err) => Ok(Response::new(GetSysErrorsResponse {
success: false,
sys_errors: Bytes::new(),
error_info: Some(err.to_string()),
}));
})),
}
Ok(Response::new(GetSysErrorsResponse {
success: true,
sys_errors: buf.into(),
error_info: None,
}))
}
pub(super) async fn handle_get_sys_config(
@@ -84,19 +82,18 @@ impl NodeService {
) -> Result<Response<GetSysConfigResponse>, Status> {
let addr = get_global_local_node_name().await;
let info = get_sys_config(&addr);
let mut buf = Vec::new();
if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) {
return Ok(Response::new(GetSysConfigResponse {
match encode_msgpack_map(&info) {
Ok(buf) => Ok(Response::new(GetSysConfigResponse {
success: true,
sys_config: buf.into(),
error_info: None,
})),
Err(err) => Ok(Response::new(GetSysConfigResponse {
success: false,
sys_config: Bytes::new(),
error_info: Some(err.to_string()),
}));
})),
}
Ok(Response::new(GetSysConfigResponse {
success: true,
sys_config: buf.into(),
error_info: None,
}))
}
pub(super) async fn handle_get_se_linux_info(
@@ -105,19 +102,18 @@ impl NodeService {
) -> Result<Response<GetSeLinuxInfoResponse>, Status> {
let addr = get_global_local_node_name().await;
let info = get_sys_services(&addr);
let mut buf = Vec::new();
if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) {
return Ok(Response::new(GetSeLinuxInfoResponse {
match encode_msgpack_map(&info) {
Ok(buf) => Ok(Response::new(GetSeLinuxInfoResponse {
success: true,
sys_services: buf.into(),
error_info: None,
})),
Err(err) => Ok(Response::new(GetSeLinuxInfoResponse {
success: false,
sys_services: Bytes::new(),
error_info: Some(err.to_string()),
}));
})),
}
Ok(Response::new(GetSeLinuxInfoResponse {
success: true,
sys_services: buf.into(),
error_info: None,
}))
}
pub(super) async fn handle_get_os_info(
@@ -125,19 +121,18 @@ impl NodeService {
_request: Request<GetOsInfoRequest>,
) -> Result<Response<GetOsInfoResponse>, Status> {
let os_info = get_os_info();
let mut buf = Vec::new();
if let Err(err) = os_info.serialize(&mut Serializer::new(&mut buf)) {
return Ok(Response::new(GetOsInfoResponse {
match encode_msgpack_map(&os_info) {
Ok(buf) => Ok(Response::new(GetOsInfoResponse {
success: true,
os_info: buf.into(),
error_info: None,
})),
Err(err) => Ok(Response::new(GetOsInfoResponse {
success: false,
os_info: Bytes::new(),
error_info: Some(err.to_string()),
}));
})),
}
Ok(Response::new(GetOsInfoResponse {
success: true,
os_info: buf.into(),
error_info: None,
}))
}
pub(super) async fn handle_get_partitions(
@@ -145,19 +140,18 @@ impl NodeService {
_request: Request<GetPartitionsRequest>,
) -> Result<Response<GetPartitionsResponse>, Status> {
let partitions = get_partitions();
let mut buf = Vec::new();
if let Err(err) = partitions.serialize(&mut Serializer::new(&mut buf)) {
return Ok(Response::new(GetPartitionsResponse {
match encode_msgpack_map(&partitions) {
Ok(buf) => Ok(Response::new(GetPartitionsResponse {
success: true,
partitions: buf.into(),
error_info: None,
})),
Err(err) => Ok(Response::new(GetPartitionsResponse {
success: false,
partitions: Bytes::new(),
error_info: Some(err.to_string()),
}));
})),
}
Ok(Response::new(GetPartitionsResponse {
success: true,
partitions: buf.into(),
error_info: None,
}))
}
pub(super) async fn handle_get_net_info(
@@ -166,36 +160,34 @@ impl NodeService {
) -> Result<Response<GetNetInfoResponse>, Status> {
let addr = get_global_local_node_name().await;
let info = get_net_info(&addr, "");
let mut buf = Vec::new();
if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) {
return Ok(Response::new(GetNetInfoResponse {
match encode_msgpack_map(&info) {
Ok(buf) => Ok(Response::new(GetNetInfoResponse {
success: true,
net_info: buf.into(),
error_info: None,
})),
Err(err) => Ok(Response::new(GetNetInfoResponse {
success: false,
net_info: Bytes::new(),
error_info: Some(err.to_string()),
}));
})),
}
Ok(Response::new(GetNetInfoResponse {
success: true,
net_info: buf.into(),
error_info: None,
}))
}
pub(super) async fn handle_get_cpus(&self, _request: Request<GetCpusRequest>) -> Result<Response<GetCpusResponse>, Status> {
let info = get_cpus();
let mut buf = Vec::new();
if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) {
return Ok(Response::new(GetCpusResponse {
match encode_msgpack_map(&info) {
Ok(buf) => Ok(Response::new(GetCpusResponse {
success: true,
cpus: buf.into(),
error_info: None,
})),
Err(err) => Ok(Response::new(GetCpusResponse {
success: false,
cpus: Bytes::new(),
error_info: Some(err.to_string()),
}));
})),
}
Ok(Response::new(GetCpusResponse {
success: true,
cpus: buf.into(),
error_info: None,
}))
}
pub(super) async fn handle_server_info(
@@ -203,29 +195,24 @@ impl NodeService {
_request: Request<ServerInfoRequest>,
) -> Result<Response<ServerInfoResponse>, Status> {
let info = get_local_server_property().await;
let mut buf = Vec::new();
// Use map encoding for forward/backward compatibility across mixed versions:
// unknown fields can be ignored by older nodes during deserialization.
if let Err(err) = info.serialize(&mut Serializer::new(&mut buf).with_struct_map()) {
return Ok(Response::new(ServerInfoResponse {
match encode_msgpack_map(&info) {
Ok(buf) => Ok(Response::new(ServerInfoResponse {
success: true,
server_properties: buf.into(),
error_info: None,
})),
Err(err) => Ok(Response::new(ServerInfoResponse {
success: false,
server_properties: Bytes::new(),
error_info: Some(err.to_string()),
}));
})),
}
Ok(Response::new(ServerInfoResponse {
success: true,
server_properties: buf.into(),
error_info: None,
}))
}
pub(super) async fn handle_local_storage_info(
&self,
_request: Request<LocalStorageInfoRequest>,
) -> Result<Response<LocalStorageInfoResponse>, Status> {
// let request = request.into_inner();
let Some(store) = new_object_layer_fn() else {
return Ok(Response::new(LocalStorageInfoResponse {
success: false,
@@ -235,19 +222,17 @@ impl NodeService {
};
let info = store.local_storage_info().await;
let mut buf = Vec::new();
if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) {
return Ok(Response::new(LocalStorageInfoResponse {
match encode_msgpack_map(&info) {
Ok(buf) => Ok(Response::new(LocalStorageInfoResponse {
success: true,
storage_info: buf.into(),
error_info: None,
})),
Err(err) => Ok(Response::new(LocalStorageInfoResponse {
success: false,
storage_info: Bytes::new(),
error_info: Some(err.to_string()),
}));
})),
}
Ok(Response::new(LocalStorageInfoResponse {
success: true,
storage_info: buf.into(),
error_info: None,
}))
}
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use super::*;
use crate::storage::rpc::encode_msgpack_map;
impl NodeService {
pub(super) async fn handle_get_metrics(
@@ -50,19 +51,17 @@ impl NodeService {
};
let info = collect_local_metrics(t, &opts).await;
let mut buf = Vec::new();
if let Err(err) = info.serialize(&mut Serializer::new(&mut buf)) {
return Ok(Response::new(GetMetricsResponse {
match encode_msgpack_map(&info) {
Ok(buf) => Ok(Response::new(GetMetricsResponse {
success: true,
realtime_metrics: buf.into(),
error_info: None,
})),
Err(err) => Ok(Response::new(GetMetricsResponse {
success: false,
realtime_metrics: Bytes::new(),
error_info: Some(err.to_string()),
}));
})),
}
Ok(Response::new(GetMetricsResponse {
success: true,
realtime_metrics: buf.into(),
error_info: None,
}))
}
}

View File

@@ -17,3 +17,77 @@ pub mod node_service;
pub use http_service::InternodeRpcService;
pub use node_service::{NodeService, make_server};
use rmp_serde::Serializer;
use serde::Serialize;
/// Encode a value as map-keyed msgpack for internode RPC responses.
///
/// Uses `.with_struct_map()` so structs are serialized with named fields
/// (msgpack map) instead of positional arrays. This matches what the
/// client-side `Deserializer::new()` expects.
pub(crate) fn encode_msgpack_map<T: Serialize>(value: &T) -> Result<Vec<u8>, rmp_serde::encode::Error> {
let mut buf = Vec::new();
value.serialize(&mut Serializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
#[cfg(test)]
mod tests {
use super::*;
use rmp_serde::Deserializer;
use serde::Deserialize;
use std::collections::HashMap;
use std::io::Cursor;
#[derive(Debug, PartialEq, Serialize, Deserialize)]
struct Simple {
name: String,
count: u32,
}
#[derive(Debug, PartialEq, Serialize, Deserialize)]
struct Nested {
label: String,
tags: HashMap<String, String>,
#[serde(skip_serializing_if = "Option::is_none")]
optional: Option<u64>,
}
#[test]
fn encode_decode_round_trip() {
let val = Simple {
name: "rustfs".into(),
count: 42,
};
let buf = encode_msgpack_map(&val).unwrap();
let decoded: Simple = Deserialize::deserialize(&mut Deserializer::new(Cursor::new(&buf))).unwrap();
assert_eq!(val, decoded);
}
#[test]
fn encode_produces_map_not_array() {
let val = Simple {
name: "test".into(),
count: 1,
};
let buf = encode_msgpack_map(&val).unwrap();
// Map marker for 2 fields: fixmap with N=2 is 0x82
assert_eq!(buf[0], 0x82, "expected msgpack fixmap marker, got array");
}
#[test]
fn nested_struct_with_optional_and_hashmap() {
let mut tags = HashMap::new();
tags.insert("env".into(), "production".into());
let val = Nested {
label: "node1".into(),
tags,
optional: None,
};
let buf = encode_msgpack_map(&val).unwrap();
let decoded: Nested = Deserialize::deserialize(&mut Deserializer::new(Cursor::new(&buf))).unwrap();
assert_eq!(val, decoded);
}
}

View File

@@ -16,7 +16,7 @@ use crate::admin::service::site_replication::reload_site_replication_runtime_sta
use bytes::Bytes;
use futures::Stream;
use futures_util::future::join_all;
use rmp_serde::{Deserializer, Serializer};
use rmp_serde::Deserializer;
use rustfs_common::{get_global_local_node_name, heal_channel::HealOpts};
use rustfs_ecstore::{
admin_server_info::get_local_server_property,
@@ -43,7 +43,7 @@ use rustfs_protos::{
models::{PingBody, PingBodyBuilder},
proto_gen::node_service::{node_service_server::NodeService as Node, *},
};
use serde::{Deserialize, Serialize};
use serde::Deserialize;
use std::{io::Cursor, pin::Pin, sync::Arc};
use tokio::spawn;
use tokio::sync::mpsc;
@@ -1982,6 +1982,156 @@ mod tests {
assert!(!proc_response.proc_info.is_empty());
}
#[tokio::test]
async fn test_get_proc_info_round_trip() {
let service = create_test_node_service();
let response = service
.get_proc_info(Request::new(GetProcInfoRequest {}))
.await
.unwrap()
.into_inner();
assert!(response.success);
let mut de = rmp_serde::Deserializer::new(std::io::Cursor::new(response.proc_info));
let _: rustfs_madmin::health::ProcInfo = serde::Deserialize::deserialize(&mut de).expect("ProcInfo round-trip failed");
}
#[tokio::test]
async fn test_get_mem_info_round_trip() {
let service = create_test_node_service();
let response = service
.get_mem_info(Request::new(GetMemInfoRequest {}))
.await
.unwrap()
.into_inner();
assert!(response.success);
let mut de = rmp_serde::Deserializer::new(std::io::Cursor::new(response.mem_info));
let _: rustfs_madmin::health::MemInfo = serde::Deserialize::deserialize(&mut de).expect("MemInfo round-trip failed");
}
#[tokio::test]
async fn test_get_sys_errors_round_trip() {
let service = create_test_node_service();
let response = service
.get_sys_errors(Request::new(GetSysErrorsRequest {}))
.await
.unwrap()
.into_inner();
assert!(response.success);
let mut de = rmp_serde::Deserializer::new(std::io::Cursor::new(response.sys_errors));
let _: rustfs_madmin::health::SysErrors = serde::Deserialize::deserialize(&mut de).expect("SysErrors round-trip failed");
}
#[tokio::test]
async fn test_get_sys_config_round_trip() {
let service = create_test_node_service();
let response = service
.get_sys_config(Request::new(GetSysConfigRequest {}))
.await
.unwrap()
.into_inner();
assert!(response.success);
let mut de = rmp_serde::Deserializer::new(std::io::Cursor::new(response.sys_config));
let _: rustfs_madmin::health::SysConfig = serde::Deserialize::deserialize(&mut de).expect("SysConfig round-trip failed");
}
#[tokio::test]
async fn test_get_se_linux_info_round_trip() {
let service = create_test_node_service();
let response = service
.get_se_linux_info(Request::new(GetSeLinuxInfoRequest {}))
.await
.unwrap()
.into_inner();
assert!(response.success);
let mut de = rmp_serde::Deserializer::new(std::io::Cursor::new(response.sys_services));
let _: rustfs_madmin::health::SysServices =
serde::Deserialize::deserialize(&mut de).expect("SysServices round-trip failed");
}
#[tokio::test]
async fn test_get_os_info_round_trip() {
let service = create_test_node_service();
let response = service
.get_os_info(Request::new(GetOsInfoRequest {}))
.await
.unwrap()
.into_inner();
assert!(response.success);
let mut de = rmp_serde::Deserializer::new(std::io::Cursor::new(response.os_info));
let _: rustfs_madmin::health::OsInfo = serde::Deserialize::deserialize(&mut de).expect("OsInfo round-trip failed");
}
#[tokio::test]
async fn test_get_partitions_round_trip() {
let service = create_test_node_service();
let response = service
.get_partitions(Request::new(GetPartitionsRequest {}))
.await
.unwrap()
.into_inner();
assert!(response.success);
let mut de = rmp_serde::Deserializer::new(std::io::Cursor::new(response.partitions));
let _: rustfs_madmin::health::Partitions =
serde::Deserialize::deserialize(&mut de).expect("Partitions round-trip failed");
}
#[tokio::test]
async fn test_get_net_info_round_trip() {
let service = create_test_node_service();
let response = service
.get_net_info(Request::new(GetNetInfoRequest {}))
.await
.unwrap()
.into_inner();
assert!(response.success);
let mut de = rmp_serde::Deserializer::new(std::io::Cursor::new(response.net_info));
let _: rustfs_madmin::net::NetInfo = serde::Deserialize::deserialize(&mut de).expect("NetInfo round-trip failed");
}
#[tokio::test]
async fn test_get_cpus_round_trip() {
let service = create_test_node_service();
let response = service.get_cpus(Request::new(GetCpusRequest {})).await.unwrap().into_inner();
assert!(response.success);
let mut de = rmp_serde::Deserializer::new(std::io::Cursor::new(response.cpus));
let _: rustfs_madmin::health::Cpus = serde::Deserialize::deserialize(&mut de).expect("Cpus round-trip failed");
}
#[tokio::test]
async fn test_server_info_round_trip() {
let service = create_test_node_service();
let response = service
.server_info(Request::new(ServerInfoRequest { metrics: false }))
.await
.unwrap()
.into_inner();
assert!(response.success);
let mut de = rmp_serde::Deserializer::new(std::io::Cursor::new(response.server_properties));
let _: rustfs_madmin::ServerProperties =
serde::Deserialize::deserialize(&mut de).expect("ServerProperties round-trip failed");
}
#[tokio::test]
async fn test_get_metrics_round_trip() {
let service = create_test_node_service();
let metric_type = MetricType::DISK;
let opts = CollectMetricsOpts::default();
let metric_type_bytes = rmp_serde::to_vec(&metric_type).unwrap();
let opts_bytes = rmp_serde::to_vec(&opts).unwrap();
let response = service
.get_metrics(Request::new(GetMetricsRequest {
metric_type: Bytes::from(metric_type_bytes),
opts: Bytes::from(opts_bytes),
}))
.await
.unwrap()
.into_inner();
assert!(response.success);
let mut de = rmp_serde::Deserializer::new(std::io::Cursor::new(response.realtime_metrics));
let _: rustfs_madmin::metrics::RealtimeMetrics =
serde::Deserialize::deserialize(&mut de).expect("RealtimeMetrics round-trip failed");
}
#[tokio::test]
#[ignore = "requires isolated global object layer state"]
async fn test_reload_pool_meta() {