Skip to content

Commit 14c7338

Browse files
authored
feat: implement tokio::io::{AsyncRead, AsyncSeek} for ObjectReader (#1175)
* feat: impl tokio::io::{AsyncRead, AsyncSeek} for ObjectReader * chore: fmt code * chore: fix clippy * chore: improve the code
1 parent 23c2ff5 commit 14c7338

File tree

1 file changed

+129
-2
lines changed

1 file changed

+129
-2
lines changed

src/object/reader.rs

Lines changed: 129 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ use std::task::Context;
1919
use std::task::Poll;
2020

2121
use bytes::Bytes;
22-
use futures::AsyncRead;
2322
use futures::AsyncSeek;
2423
use futures::Stream;
24+
use futures::{ready, AsyncRead};
2525
use parking_lot::Mutex;
2626

2727
use crate::error::Result;
@@ -99,6 +99,7 @@ use crate::OpStat;
9999
/// In this way, we can reduce the extra cost of dropping reader.
100100
pub struct ObjectReader {
101101
inner: output::Reader,
102+
seek_state: SeekState,
102103
}
103104

104105
impl ObjectReader {
@@ -148,7 +149,10 @@ impl ObjectReader {
148149
Box::new(output::into_reader::as_streamable(r, 256 * 1024))
149150
};
150151

151-
Ok(ObjectReader { inner: r })
152+
Ok(ObjectReader {
153+
inner: r,
154+
seek_state: SeekState::Init,
155+
})
152156
}
153157
}
154158

@@ -186,6 +190,62 @@ impl AsyncSeek for ObjectReader {
186190
}
187191
}
188192

193+
impl tokio::io::AsyncRead for ObjectReader {
194+
fn poll_read(
195+
mut self: Pin<&mut Self>,
196+
cx: &mut Context<'_>,
197+
buf: &mut tokio::io::ReadBuf<'_>,
198+
) -> Poll<io::Result<()>> {
199+
let b = buf.initialize_unfilled();
200+
let n = ready!(self.inner.poll_read(cx, b))?;
201+
unsafe {
202+
buf.assume_init(n);
203+
}
204+
buf.advance(n);
205+
Poll::Ready(Ok(()))
206+
}
207+
}
208+
209+
impl tokio::io::AsyncSeek for ObjectReader {
210+
fn start_seek(self: Pin<&mut Self>, pos: io::SeekFrom) -> io::Result<()> {
211+
let this = self.get_mut();
212+
if let SeekState::Start(_) = this.seek_state {
213+
return Err(io::Error::new(
214+
io::ErrorKind::Other,
215+
"another search is in progress.",
216+
));
217+
}
218+
this.seek_state = SeekState::Start(pos);
219+
Ok(())
220+
}
221+
222+
fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
223+
let state = self.seek_state;
224+
match state {
225+
SeekState::Init => {
226+
// AsyncSeek recommends calling poll_complete before start_seek.
227+
// We don't have to guarantee that the value returned by
228+
// poll_complete called without start_seek is correct,
229+
// so we'll return 0.
230+
Poll::Ready(Ok(0))
231+
}
232+
SeekState::Start(pos) => {
233+
let n = ready!(self.inner.poll_seek(cx, pos))?;
234+
Poll::Ready(Ok(n))
235+
}
236+
}
237+
}
238+
}
239+
240+
#[derive(Debug, Clone, Copy)]
241+
/// SeekState is used to track the tokio seek state of ObjectReader.
242+
enum SeekState {
243+
/// start_seek has not been called.
244+
Init,
245+
/// start_seek has been called, but poll_complete has not yet been called.
246+
Start(io::SeekFrom),
247+
}
248+
189249
impl Stream for ObjectReader {
190250
type Item = io::Result<Bytes>;
191251

@@ -209,3 +269,70 @@ async fn get_total_size(
209269
*(meta.lock()) = om;
210270
Ok(size)
211271
}
272+
273+
#[cfg(test)]
274+
mod tests {
275+
use crate::{Operator, Scheme};
276+
use rand::rngs::ThreadRng;
277+
use rand::{Rng, RngCore};
278+
use tokio::io::AsyncReadExt;
279+
use tokio::io::AsyncSeekExt;
280+
281+
fn gen_random_bytes() -> Vec<u8> {
282+
let mut rng = ThreadRng::default();
283+
// Generate size between 1B..16MB.
284+
let size = rng.gen_range(1..16 * 1024 * 1024);
285+
let mut content = vec![0; size];
286+
rng.fill_bytes(&mut content);
287+
content
288+
}
289+
290+
#[tokio::test]
291+
async fn test_reader_async_read() {
292+
let op = Operator::from_env(Scheme::Memory).unwrap();
293+
let obj = op.object("test_file");
294+
295+
let content = gen_random_bytes();
296+
obj.write(&*content)
297+
.await
298+
.expect("writ to object must succeed");
299+
300+
let mut reader = obj.reader().await.unwrap();
301+
let mut buf = Vec::new();
302+
reader
303+
.read_to_end(&mut buf)
304+
.await
305+
.expect("read to end must succeed");
306+
307+
assert_eq!(buf, content);
308+
}
309+
310+
#[tokio::test]
311+
async fn test_reader_async_seek() {
312+
let op = Operator::from_env(Scheme::Memory).unwrap();
313+
let obj = op.object("test_file");
314+
315+
let content = gen_random_bytes();
316+
obj.write(&*content)
317+
.await
318+
.expect("writ to object must succeed");
319+
320+
let mut reader = obj.reader().await.unwrap();
321+
let mut buf = Vec::new();
322+
reader
323+
.read_to_end(&mut buf)
324+
.await
325+
.expect("read to end must succeed");
326+
assert_eq!(buf, content);
327+
328+
let n = reader.seek(tokio::io::SeekFrom::Start(0)).await.unwrap();
329+
assert_eq!(n, 0, "seekp osition must be 0");
330+
331+
let mut buf = Vec::new();
332+
reader
333+
.read_to_end(&mut buf)
334+
.await
335+
.expect("read to end must succeed");
336+
assert_eq!(buf, content);
337+
}
338+
}

0 commit comments

Comments
 (0)