Skip to content

Commit 38afc9b

Browse files
authored
feat: add concurrent writer behavior tests (#3920)
1 parent 23db343 commit 38afc9b

1 file changed

Lines changed: 188 additions & 1 deletion

File tree

core/tests/behavior/async_write.rs

Lines changed: 188 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,15 @@ pub fn tests(op: &Operator, tests: &mut Vec<Trial>) {
4242
test_write_with_content_type,
4343
test_write_with_content_disposition,
4444
test_writer_write,
45+
test_writer_write_with_concurrent,
4546
test_writer_sink,
47+
test_writer_sink_with_concurrent,
4648
test_writer_copy,
49+
test_writer_copy_with_concurrent,
4750
test_writer_abort,
48-
test_writer_futures_copy
51+
test_writer_abort_with_concurrent,
52+
test_writer_futures_copy,
53+
test_writer_futures_copy_with_concurrent
4954
))
5055
}
5156

@@ -221,6 +226,33 @@ pub async fn test_writer_abort(op: Operator) -> Result<()> {
221226
Ok(())
222227
}
223228

229+
/// Delete existing file should succeed.
230+
pub async fn test_writer_abort_with_concurrent(op: Operator) -> Result<()> {
231+
let (path, content, _) = TEST_FIXTURE.new_file(op.clone());
232+
233+
let mut writer = match op.writer_with(&path).concurrent(2).await {
234+
Ok(writer) => writer,
235+
Err(e) => {
236+
assert_eq!(e.kind(), ErrorKind::Unsupported);
237+
return Ok(());
238+
}
239+
};
240+
241+
if let Err(e) = writer.write(content).await {
242+
assert_eq!(e.kind(), ErrorKind::Unsupported);
243+
return Ok(());
244+
}
245+
246+
if let Err(e) = writer.abort().await {
247+
assert_eq!(e.kind(), ErrorKind::Unsupported);
248+
return Ok(());
249+
}
250+
251+
// Aborted writer should not write actual file.
252+
assert!(!op.is_exist(&path).await?);
253+
Ok(())
254+
}
255+
224256
/// Append data into writer
225257
pub async fn test_writer_write(op: Operator) -> Result<()> {
226258
if !(op.info().full_capability().write_can_multi) {
@@ -256,6 +288,41 @@ pub async fn test_writer_write(op: Operator) -> Result<()> {
256288
Ok(())
257289
}
258290

291+
/// Append data into writer
292+
pub async fn test_writer_write_with_concurrent(op: Operator) -> Result<()> {
293+
if !(op.info().full_capability().write_can_multi) {
294+
return Ok(());
295+
}
296+
297+
let path = TEST_FIXTURE.new_file_path();
298+
let size = 5 * 1024 * 1024; // write file with 5 MiB
299+
let content_a = gen_fixed_bytes(size);
300+
let content_b = gen_fixed_bytes(size);
301+
302+
let mut w = op.writer_with(&path).concurrent(2).await?;
303+
w.write(content_a.clone()).await?;
304+
w.write(content_b.clone()).await?;
305+
w.close().await?;
306+
307+
let meta = op.stat(&path).await.expect("stat must succeed");
308+
assert_eq!(meta.content_length(), (size * 2) as u64);
309+
310+
let bs = op.read(&path).await?;
311+
assert_eq!(bs.len(), size * 2, "read size");
312+
assert_eq!(
313+
format!("{:x}", Sha256::digest(&bs[..size])),
314+
format!("{:x}", Sha256::digest(content_a)),
315+
"read content a"
316+
);
317+
assert_eq!(
318+
format!("{:x}", Sha256::digest(&bs[size..])),
319+
format!("{:x}", Sha256::digest(content_b)),
320+
"read content b"
321+
);
322+
323+
Ok(())
324+
}
325+
259326
/// Streaming data into writer
260327
pub async fn test_writer_sink(op: Operator) -> Result<()> {
261328
let cap = op.info().full_capability();
@@ -292,6 +359,46 @@ pub async fn test_writer_sink(op: Operator) -> Result<()> {
292359
Ok(())
293360
}
294361

362+
/// Streaming data into writer
363+
pub async fn test_writer_sink_with_concurrent(op: Operator) -> Result<()> {
364+
let cap = op.info().full_capability();
365+
if !(cap.write && cap.write_can_multi) {
366+
return Ok(());
367+
}
368+
369+
let path = TEST_FIXTURE.new_file_path();
370+
let size = 5 * 1024 * 1024; // write file with 5 MiB
371+
let content_a = gen_fixed_bytes(size);
372+
let content_b = gen_fixed_bytes(size);
373+
let stream = stream::iter(vec![content_a.clone(), content_b.clone()]).map(Ok);
374+
375+
let mut w = op
376+
.writer_with(&path)
377+
.buffer(5 * 1024 * 1024)
378+
.concurrent(4)
379+
.await?;
380+
w.sink(stream).await?;
381+
w.close().await?;
382+
383+
let meta = op.stat(&path).await.expect("stat must succeed");
384+
assert_eq!(meta.content_length(), (size * 2) as u64);
385+
386+
let bs = op.read(&path).await?;
387+
assert_eq!(bs.len(), size * 2, "read size");
388+
assert_eq!(
389+
format!("{:x}", Sha256::digest(&bs[..size])),
390+
format!("{:x}", Sha256::digest(content_a)),
391+
"read content a"
392+
);
393+
assert_eq!(
394+
format!("{:x}", Sha256::digest(&bs[size..])),
395+
format!("{:x}", Sha256::digest(content_b)),
396+
"read content b"
397+
);
398+
399+
Ok(())
400+
}
401+
295402
/// Reading data into writer
296403
pub async fn test_writer_copy(op: Operator) -> Result<()> {
297404
let cap = op.info().full_capability();
@@ -333,6 +440,51 @@ pub async fn test_writer_copy(op: Operator) -> Result<()> {
333440
Ok(())
334441
}
335442

443+
/// Reading data into writer
444+
pub async fn test_writer_copy_with_concurrent(op: Operator) -> Result<()> {
445+
let cap = op.info().full_capability();
446+
if !(cap.write && cap.write_can_multi) {
447+
return Ok(());
448+
}
449+
450+
let path = TEST_FIXTURE.new_file_path();
451+
let size = 5 * 1024 * 1024; // write file with 5 MiB
452+
let content_a = gen_fixed_bytes(size);
453+
let content_b = gen_fixed_bytes(size);
454+
455+
let mut w = op
456+
.writer_with(&path)
457+
.buffer(5 * 1024 * 1024)
458+
.concurrent(4)
459+
.await?;
460+
461+
let mut content = Bytes::from([content_a.clone(), content_b.clone()].concat());
462+
while !content.is_empty() {
463+
let reader = Cursor::new(content.clone());
464+
let n = w.copy(reader).await?;
465+
content.advance(n as usize);
466+
}
467+
w.close().await?;
468+
469+
let meta = op.stat(&path).await.expect("stat must succeed");
470+
assert_eq!(meta.content_length(), (size * 2) as u64);
471+
472+
let bs = op.read(&path).await?;
473+
assert_eq!(bs.len(), size * 2, "read size");
474+
assert_eq!(
475+
format!("{:x}", Sha256::digest(&bs[..size])),
476+
format!("{:x}", Sha256::digest(content_a)),
477+
"read content a"
478+
);
479+
assert_eq!(
480+
format!("{:x}", Sha256::digest(&bs[size..])),
481+
format!("{:x}", Sha256::digest(content_b)),
482+
"read content b"
483+
);
484+
485+
Ok(())
486+
}
487+
336488
/// Copy data from reader to writer
337489
pub async fn test_writer_futures_copy(op: Operator) -> Result<()> {
338490
if !(op.info().full_capability().write_can_multi) {
@@ -364,6 +516,41 @@ pub async fn test_writer_futures_copy(op: Operator) -> Result<()> {
364516
Ok(())
365517
}
366518

519+
/// Copy data from reader to writer
520+
pub async fn test_writer_futures_copy_with_concurrent(op: Operator) -> Result<()> {
521+
if !(op.info().full_capability().write_can_multi) {
522+
return Ok(());
523+
}
524+
525+
let path = TEST_FIXTURE.new_file_path();
526+
let (content, size): (Vec<u8>, usize) =
527+
gen_bytes_with_range(10 * 1024 * 1024..20 * 1024 * 1024);
528+
529+
let mut w = op
530+
.writer_with(&path)
531+
.buffer(8 * 1024 * 1024)
532+
.concurrent(4)
533+
.await?;
534+
535+
// Wrap a buf reader here to make sure content is read in 1MiB chunks.
536+
let mut cursor = BufReader::with_capacity(1024 * 1024, Cursor::new(content.clone()));
537+
futures::io::copy_buf(&mut cursor, &mut w).await?;
538+
w.close().await?;
539+
540+
let meta = op.stat(&path).await.expect("stat must succeed");
541+
assert_eq!(meta.content_length(), size as u64);
542+
543+
let bs = op.read(&path).await?;
544+
assert_eq!(bs.len(), size, "read size");
545+
assert_eq!(
546+
format!("{:x}", Sha256::digest(&bs[..size])),
547+
format!("{:x}", Sha256::digest(content)),
548+
"read content"
549+
);
550+
551+
Ok(())
552+
}
553+
367554
/// Test append to a file must success.
368555
pub async fn test_write_with_append(op: Operator) -> Result<()> {
369556
let path = TEST_FIXTURE.new_file_path();

0 commit comments

Comments
 (0)