mirror of
https://github.com/rustfs/rustfs.git
synced 2026-05-21 22:24:38 +08:00
Merge branch 'main' into codex/security-advisories-2026-05-02
This commit is contained in:
@@ -166,8 +166,19 @@ async fn write_data_blocks<W>(
|
||||
where
|
||||
W: tokio::io::AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
if get_data_block_len(en_blocks, data_blocks) < length {
|
||||
error!("write_data_blocks get_data_block_len < length");
|
||||
if en_blocks.len() < data_blocks {
|
||||
return Err(io::Error::new(ErrorKind::InvalidInput, "data block count exceeds available shards"));
|
||||
}
|
||||
|
||||
if length == 0 {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
let Some(required_len) = offset.checked_add(length) else {
|
||||
return Err(io::Error::new(ErrorKind::InvalidInput, "offset + length overflows"));
|
||||
};
|
||||
if get_data_block_len(en_blocks, data_blocks) < required_len {
|
||||
error!("write_data_blocks not enough data after offset");
|
||||
return Err(io::Error::new(ErrorKind::UnexpectedEof, "Not enough data blocks to write"));
|
||||
}
|
||||
|
||||
@@ -188,29 +199,22 @@ where
|
||||
let block_slice = &block[offset..];
|
||||
offset = 0;
|
||||
|
||||
if write_left < block_slice.len() {
|
||||
writer.write_all(&block_slice[..write_left]).await.map_err(|e| {
|
||||
error!("write_data_blocks write_all err: {}", e);
|
||||
e
|
||||
})?;
|
||||
|
||||
total_written += write_left;
|
||||
break;
|
||||
}
|
||||
|
||||
let n = block_slice.len();
|
||||
|
||||
writer.write_all(block_slice).await.map_err(|e| {
|
||||
error!("write_data_blocks write_all2 err: {}", e);
|
||||
let write_len = write_left.min(block_slice.len());
|
||||
writer.write_all(&block_slice[..write_len]).await.map_err(|e| {
|
||||
error!("write_data_blocks write_all err: {}", e);
|
||||
e
|
||||
})?;
|
||||
|
||||
write_left -= n;
|
||||
total_written += write_len;
|
||||
write_left -= write_len;
|
||||
|
||||
total_written += n;
|
||||
if write_left == 0 {
|
||||
return Ok(total_written);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(total_written)
|
||||
error!("write_data_blocks loop exhausted with write_left>0");
|
||||
Err(io::Error::new(ErrorKind::UnexpectedEof, "Not enough data blocks to write"))
|
||||
}
|
||||
|
||||
impl Erasure {
|
||||
@@ -323,6 +327,112 @@ mod tests {
|
||||
use rustfs_utils::HashAlgorithm;
|
||||
use std::io::Cursor;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_write_data_blocks_writes_range_across_blocks() {
|
||||
let blocks = vec![Some(vec![1, 2, 3, 4]), Some(vec![5, 6, 7]), Some(vec![8, 9])];
|
||||
let mut out = Vec::new();
|
||||
|
||||
let written = write_data_blocks(&mut out, &blocks, 3, 2, 5).await.unwrap();
|
||||
|
||||
assert_eq!(written, 5);
|
||||
assert_eq!(out, vec![3, 4, 5, 6, 7]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_write_data_blocks_rejects_short_data_after_offset() {
|
||||
let blocks = vec![Some(vec![1, 2, 3, 4]), Some(vec![5, 6, 7])];
|
||||
let mut out = Vec::new();
|
||||
|
||||
let err = write_data_blocks(&mut out, &blocks, 2, 3, 5).await.unwrap_err();
|
||||
|
||||
assert_eq!(err.kind(), ErrorKind::UnexpectedEof);
|
||||
assert!(out.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_write_data_blocks_rejects_invalid_data_block_count() {
|
||||
let blocks = vec![Some(vec![1, 2, 3, 4])];
|
||||
let mut out = Vec::new();
|
||||
|
||||
let err = write_data_blocks(&mut out, &blocks, 2, 0, 1).await.unwrap_err();
|
||||
|
||||
assert_eq!(err.kind(), ErrorKind::InvalidInput);
|
||||
assert!(out.is_empty());
|
||||
}
|
||||
|
||||
/// Regression for upstream issue #2716: ranged GETs going through
|
||||
/// `Erasure::decode` must return the requested byte range without
|
||||
/// panicking or truncating, including when the range starts at a
|
||||
/// non-zero offset and crosses EC block boundaries.
|
||||
#[tokio::test]
|
||||
async fn test_erasure_decode_ranged_read_returns_correct_bytes() {
|
||||
const DATA_SHARDS: usize = 4;
|
||||
const PARITY_SHARDS: usize = 2;
|
||||
const BLOCK_SIZE: usize = 64;
|
||||
|
||||
// 200 bytes spans 3 full blocks + 1 partial block, exercising
|
||||
// the start/middle/end branches in `Erasure::decode`.
|
||||
let total_data: Vec<u8> = (0..200u32).map(|i| i as u8).collect();
|
||||
let total_len = total_data.len();
|
||||
|
||||
let erasure = Erasure::new(DATA_SHARDS, PARITY_SHARDS, BLOCK_SIZE);
|
||||
let total_shards = DATA_SHARDS + PARITY_SHARDS;
|
||||
let shard_size = erasure.shard_size();
|
||||
let hash_algo = HashAlgorithm::HighwayHash256;
|
||||
|
||||
let mut shard_writers: Vec<BitrotWriter<Cursor<Vec<u8>>>> = (0..total_shards)
|
||||
.map(|_| BitrotWriter::new(Cursor::new(Vec::new()), shard_size, hash_algo.clone()))
|
||||
.collect();
|
||||
|
||||
let mut offset = 0;
|
||||
while offset < total_len {
|
||||
let end = (offset + BLOCK_SIZE).min(total_len);
|
||||
let shards = erasure.encode_data(&total_data[offset..end]).unwrap();
|
||||
for (i, shard) in shards.iter().enumerate() {
|
||||
shard_writers[i].write(shard).await.unwrap();
|
||||
}
|
||||
offset = end;
|
||||
}
|
||||
|
||||
let shard_bufs: Vec<Vec<u8>> = shard_writers.into_iter().map(|w| w.into_inner().into_inner()).collect();
|
||||
|
||||
// `Erasure::decode` does not seek the readers; the production caller
|
||||
// (`create_bitrot_reader`) positions each reader at the shard byte
|
||||
// offset corresponding to the request's start block. Mirror that here.
|
||||
let hash_size = hash_algo.size();
|
||||
let make_readers = |off: usize| -> Vec<Option<BitrotReader<Cursor<Vec<u8>>>>> {
|
||||
let start_block = off / BLOCK_SIZE;
|
||||
let cursor_pos = start_block * (shard_size + hash_size);
|
||||
shard_bufs
|
||||
.iter()
|
||||
.map(|buf| {
|
||||
let mut cursor = Cursor::new(buf.clone());
|
||||
cursor.set_position(cursor_pos as u64);
|
||||
Some(BitrotReader::new(cursor, shard_size, hash_algo.clone(), false))
|
||||
})
|
||||
.collect()
|
||||
};
|
||||
|
||||
// (offset, length, description)
|
||||
let cases: &[(usize, usize, &str)] = &[
|
||||
(0, total_len, "full read"),
|
||||
(0, 50, "head from start, partial block"),
|
||||
(10, 30, "small range within first block"),
|
||||
(60, 80, "range crossing two block boundaries"),
|
||||
(128, 50, "range starting at block boundary"),
|
||||
(130, 10, "small range deep in middle"),
|
||||
(192, 8, "tail covering last partial block"),
|
||||
];
|
||||
|
||||
for &(off, len, desc) in cases {
|
||||
let mut output = Vec::new();
|
||||
let (written, err) = erasure.decode(&mut output, make_readers(off), off, len, total_len).await;
|
||||
assert!(err.is_none(), "{}: unexpected error: {:?}", desc, err);
|
||||
assert_eq!(written, len, "{}: written != length", desc);
|
||||
assert_eq!(output, total_data[off..off + len], "{}: bytes mismatch", desc);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_parallel_reader_normal() {
|
||||
const BLOCK_SIZE: usize = 64;
|
||||
|
||||
Reference in New Issue
Block a user