Skip to content

Commit 84c4030

Browse files
authored
feat(oio): add block_write support (#3945)
1 parent 1bc80d7 commit 84c4030

2 files changed

Lines changed: 254 additions & 0 deletions

File tree

Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::pin::Pin;
19+
use std::sync::Arc;
20+
use std::task::ready;
21+
use std::task::Context;
22+
use std::task::Poll;
23+
24+
use async_trait::async_trait;
25+
use futures::Future;
26+
use futures::FutureExt;
27+
use futures::StreamExt;
28+
29+
use crate::raw::*;
30+
use crate::*;
31+
32+
/// BlockWrite is used to implement [`Write`] based on block
33+
/// uploads. By implementing BlockWrite, services don't need to
34+
/// care about the details of uploading blocks.
35+
///
36+
/// # Architecture
37+
///
38+
/// The architecture after adopting [`BlockWrite`]:
39+
///
40+
/// - Services impl `BlockWrite`
41+
/// - `BlockWriter` impl `Write`
42+
/// - Expose `BlockWriter` as `Accessor::Writer`
43+
///
44+
/// # Notes
45+
///
46+
/// `BlockWrite` has an oneshot optimization when `write` has been called only once:
47+
///
48+
/// ```no_build
49+
/// w.write(bs).await?;
50+
/// w.close().await?;
51+
/// ```
52+
///
53+
/// We will use `write_once` instead of starting a new block upload.
54+
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
55+
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
56+
pub trait BlockWrite: Send + Sync + Unpin + 'static {
57+
/// write_once is used to write the data to underlying storage at once.
58+
///
59+
/// BlockWriter will call this API when:
60+
///
61+
/// - All the data has been written to the buffer and we can perform the upload at once.
62+
async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()>;
63+
64+
/// write_block will write a block of the data and returns the result
65+
/// [`Block`].
66+
///
67+
/// BlockWriter will call this API and stores the result in
68+
/// order.
69+
///
70+
/// - block_id is the id of the block.
71+
async fn write_block(&self, size: u64, block_id: String, body: AsyncBody) -> Result<()>;
72+
73+
/// complete_block will complete the block upload to build the final
74+
/// file.
75+
async fn complete_block(&self, block_ids: Vec<String>) -> Result<()>;
76+
77+
/// abort_block will cancel the block upload and purge all data.
78+
async fn abort_block(&self, block_ids: Vec<String>) -> Result<()>;
79+
}
80+
81+
struct WriteBlockFuture(BoxedFuture<Result<()>>);
82+
83+
/// # Safety
84+
///
85+
/// wasm32 is a special target that we only have one event-loop for this WriteBlockFuture.
86+
unsafe impl Send for WriteBlockFuture {}
87+
88+
/// # Safety
89+
///
90+
/// We will only take `&mut Self` reference for WriteBlockFuture.
91+
unsafe impl Sync for WriteBlockFuture {}
92+
93+
impl Future for WriteBlockFuture {
94+
type Output = Result<()>;
95+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
96+
self.get_mut().0.poll_unpin(cx)
97+
}
98+
}
99+
100+
/// BlockWriter will implements [`Write`] based on block
101+
/// uploads.
102+
pub struct BlockWriter<W: BlockWrite> {
103+
state: State,
104+
w: Arc<W>,
105+
106+
block_ids: Vec<String>,
107+
cache: Option<oio::ChunkedBytes>,
108+
futures: ConcurrentFutures<WriteBlockFuture>,
109+
}
110+
111+
enum State {
112+
Idle,
113+
Close(BoxedFuture<Result<()>>),
114+
Abort(BoxedFuture<Result<()>>),
115+
}
116+
117+
/// # Safety
118+
///
119+
/// wasm32 is a special target that we only have one event-loop for this state.
120+
unsafe impl Send for State {}
121+
/// # Safety
122+
///
123+
/// We will only take `&mut Self` reference for State.
124+
unsafe impl Sync for State {}
125+
126+
impl<W: BlockWrite> BlockWriter<W> {
127+
/// Create a new BlockWriter.
128+
pub fn new(inner: W, concurrent: usize) -> Self {
129+
Self {
130+
state: State::Idle,
131+
132+
w: Arc::new(inner),
133+
block_ids: Vec::new(),
134+
cache: None,
135+
futures: ConcurrentFutures::new(1.max(concurrent)),
136+
}
137+
}
138+
139+
fn fill_cache(&mut self, bs: &dyn oio::WriteBuf) -> usize {
140+
let size = bs.remaining();
141+
let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(size));
142+
assert!(self.cache.is_none());
143+
self.cache = Some(bs);
144+
size
145+
}
146+
}
147+
148+
impl<W> oio::Write for BlockWriter<W>
149+
where
150+
W: BlockWrite,
151+
{
152+
fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> {
153+
loop {
154+
match &mut self.state {
155+
State::Idle => {
156+
if self.futures.has_remaining() {
157+
let cache = self.cache.take().expect("pending write must exist");
158+
let block_id = uuid::Uuid::new_v4().to_string();
159+
self.block_ids.push(block_id.clone());
160+
let w = self.w.clone();
161+
let size = cache.len();
162+
self.futures.push(WriteBlockFuture(Box::pin(async move {
163+
w.write_block(size as u64, block_id, AsyncBody::ChunkedBytes(cache))
164+
.await
165+
})));
166+
let size = self.fill_cache(bs);
167+
return Poll::Ready(Ok(size));
168+
} else {
169+
ready!(self.futures.poll_next_unpin(cx));
170+
}
171+
}
172+
State::Close(_) => {
173+
unreachable!("BlockWriter must not go into State::Close during poll_write")
174+
}
175+
State::Abort(_) => {
176+
unreachable!("BlockWriter must not go into State::Abort during poll_write")
177+
}
178+
}
179+
}
180+
}
181+
182+
fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
183+
loop {
184+
match &mut self.state {
185+
State::Idle => {
186+
let w = self.w.clone();
187+
let block_ids = self.block_ids.clone();
188+
if self.futures.is_empty() && self.cache.is_none() {
189+
self.state =
190+
State::Close(Box::pin(
191+
async move { w.complete_block(block_ids).await },
192+
));
193+
} else {
194+
if self.futures.has_remaining() {
195+
if let Some(cache) = self.cache.take() {
196+
let block_id = uuid::Uuid::new_v4().to_string();
197+
self.block_ids.push(block_id.clone());
198+
let size = cache.len();
199+
let w = self.w.clone();
200+
self.futures.push(WriteBlockFuture(Box::pin(async move {
201+
w.write_block(
202+
size as u64,
203+
block_id,
204+
AsyncBody::ChunkedBytes(cache),
205+
)
206+
.await
207+
})));
208+
}
209+
}
210+
while ready!(self.futures.poll_next_unpin(cx)).is_some() {}
211+
}
212+
}
213+
State::Close(fut) => {
214+
let res = futures::ready!(fut.as_mut().poll(cx));
215+
self.state = State::Idle;
216+
// We should check res first before clean up cache.
217+
res?;
218+
self.cache = None;
219+
220+
return Poll::Ready(Ok(()));
221+
}
222+
State::Abort(_) => {
223+
unreachable!("BlockWriter must not go into State::Abort during poll_close")
224+
}
225+
}
226+
}
227+
}
228+
229+
fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
230+
loop {
231+
match &mut self.state {
232+
State::Idle => {
233+
let w = self.w.clone();
234+
let block_ids = self.block_ids.clone();
235+
self.futures.clear();
236+
self.state =
237+
State::Abort(Box::pin(async move { w.abort_block(block_ids).await }));
238+
}
239+
State::Abort(fut) => {
240+
let res = futures::ready!(fut.as_mut().poll(cx));
241+
self.state = State::Idle;
242+
return Poll::Ready(res);
243+
}
244+
State::Close(_) => {
245+
unreachable!("BlockWriter must not go into State::Close during poll_abort")
246+
}
247+
}
248+
}
249+
}
250+
}

core/src/raw/oio/write/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,7 @@ pub use exact_buf_write::ExactBufWriter;
4242
mod range_write;
4343
pub use range_write::RangeWrite;
4444
pub use range_write::RangeWriter;
45+
46+
mod block_write;
47+
pub use block_write::BlockWrite;
48+
pub use block_write::BlockWriter;

0 commit comments

Comments
 (0)