Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
848b22e
Add HdfsConfig to implement ConfigDeserializer
shbhmrzd Dec 22, 2023
6e51ac2
Merge remote-tracking branch 'upstream/main' into add_hdfs_config
shbhmrzd Dec 22, 2023
d06bf66
fix conflicting impl debug
shbhmrzd Dec 22, 2023
50665a5
add doc for struct fields
shbhmrzd Dec 22, 2023
0fb22fd
atomic write support for hdfs
shbhmrzd Dec 31, 2023
96fba38
Merge remote-tracking branch 'upstream/main' into hdfs_atomic_write
shbhmrzd Dec 31, 2023
b5c96e3
use arc of hdrs client
shbhmrzd Jan 1, 2024
dfa0e71
fix issues with test
shbhmrzd Jan 1, 2024
8b44433
cargo fmt
shbhmrzd Jan 1, 2024
e93b17d
Merge remote-tracking branch 'upstream/main' into hdfs_atomic_write
shbhmrzd Jan 1, 2024
f3d7bf6
revert .env.example
shbhmrzd Jan 1, 2024
2179349
impl sync for HdfsWriter
shbhmrzd Jan 1, 2024
a592a0d
Merge remote-tracking branch 'upstream/main' into hdfs_atomic_write
shbhmrzd Jan 1, 2024
af8f51d
take fut as mut
shbhmrzd Jan 1, 2024
d7fd22a
add atomic write test workflow
shbhmrzd Jan 1, 2024
c0e465b
use Option<F> in HdfsWriter
shbhmrzd Jan 2, 2024
55587e3
Merge remote-tracking branch 'upstream/main' into hdfs_atomic_write
shbhmrzd Jan 2, 2024
e313c5c
revert .env change
shbhmrzd Jan 2, 2024
1895b9f
no support for atomic write if append true
shbhmrzd Jan 2, 2024
474fe57
revert
shbhmrzd Jan 2, 2024
c5dd416
testing with adding atomic write dir env param
shbhmrzd Jan 2, 2024
465f9b1
revert
shbhmrzd Jan 2, 2024
dc3f8e8
add debug statements
shbhmrzd Jan 2, 2024
dafef0c
remove debugs
shbhmrzd Jan 2, 2024
0d2a780
use f.close
shbhmrzd Jan 2, 2024
d92712d
review comments
shbhmrzd Jan 2, 2024
e229a9e
review comments
shbhmrzd Jan 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ OPENDAL_AZBLOB_ACCOUNT_NAME=<account_name>
OPENDAL_AZBLOB_ACCOUNT_KEY=<account_key>
# hdfs
OPENDAL_HDFS_ROOT=/path/to/dir
OPENDAL_HDFS_ATOMIC_WRITE_DIR=/path/to/tempdir
OPENDAL_HDFS_NAME_NODE=<name_node>
# gcs
OPENDAL_GCS_ROOT=/path/to/dir
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/service_test_hdfs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ jobs:
HADOOP_HOME: "/home/runner/hadoop-3.3.5"
OPENDAL_TEST: hdfs
OPENDAL_HDFS_ROOT: /tmp/opendal/
OPENDAL_HDFS_ATOMIC_WRITE_DIR: /tmp/opendal/atomic/
OPENDAL_HDFS_NAME_NODE: default
OPENDAL_HDFS_ENABLE_APPEND: false

Expand Down
26 changes: 16 additions & 10 deletions core/src/services/hdfs/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
// under the License.

use futures::future::BoxFuture;
use std::future::Future;
use std::io::Write;
use std::pin::Pin;
use std::sync::Arc;
Expand All @@ -32,7 +31,7 @@ use crate::*;
pub struct HdfsWriter<F> {
target_path: String,
tmp_path: Option<String>,
f: F,
f: Option<F>,
client: Arc<hdrs::Client>,
fut: Option<BoxFuture<'static, Result<()>>>,
}
Expand All @@ -52,7 +51,7 @@ impl<F> HdfsWriter<F> {
Self {
target_path,
tmp_path,
f,
f: Some(f),
client,
fut: None,
}
Expand All @@ -62,7 +61,9 @@ impl<F> HdfsWriter<F> {
#[async_trait]
impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> {
Pin::new(&mut self.f)
let f = self.f.as_mut().expect("HdfsWriter must be initialized");

Pin::new(f)
.poll_write(cx, bs.chunk())
.map_err(new_std_io_error)
}
Expand All @@ -75,16 +76,19 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {
return Poll::Ready(res);
}

let _ = Pin::new(&mut self.f)
.poll_close(cx)
.map_err(new_std_io_error);

let mut f = self.f.take().expect("HdfsWriter must be initialized");
// Clone client to allow move into the future.
let tmp_path = self.tmp_path.clone();
let client = self.client.clone();
let target_path = self.target_path.clone();
// Clone the necessary parts of the context
let waker = cx.waker().clone();

self.fut = Some(Box::pin(async move {
// Now use the cloned waker in the async block
let mut pinned = std::pin::pin!(f);
let _ = pinned.as_mut().poll_close(&mut Context::from_waker(&waker));

if let Some(tmp_path) = tmp_path {
client
.rename_file(&tmp_path, &target_path)
Expand All @@ -106,11 +110,13 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> {

impl oio::BlockingWrite for HdfsWriter<hdrs::File> {
fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> {
self.f.write(bs.chunk()).map_err(new_std_io_error)
let f = self.f.as_mut().expect("HdfsWriter must be initialized");
f.write(bs.chunk()).map_err(new_std_io_error)
}

fn close(&mut self) -> Result<()> {
self.f.flush().map_err(new_std_io_error)?;
let f = self.f.as_mut().expect("HdfsWriter must be initialized");
f.flush().map_err(new_std_io_error)?;

if let Some(tmp_path) = &self.tmp_path {
let client = Arc::as_ref(&self.client);
Expand Down