Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 3 additions & 12 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,17 +307,7 @@ impl ObjectStore for HdfsObjectStore {
.to_object_store_err();
}

Ok(ObjectMeta {
location: location.clone(),
last_modified: DateTime::<Utc>::from_timestamp(status.modification_time as i64, 0)
.unwrap(),
size: status
.length
.try_into()
.expect("unable to convert status.length to usize"),
e_tag: None,
version: None,
})
get_object_meta(&status)
}

/// Delete the object at the specified location.
Expand Down Expand Up @@ -638,7 +628,8 @@ fn make_absolute_dir(path: &Path) -> String {
fn get_object_meta(status: &FileStatus) -> Result<ObjectMeta> {
Ok(ObjectMeta {
location: Path::parse(&status.path)?,
last_modified: DateTime::<Utc>::from_timestamp(status.modification_time as i64, 0).unwrap(),
last_modified: DateTime::<Utc>::from_timestamp_millis(status.modification_time as i64)
.unwrap(),
size: status
.length
.try_into()
Expand Down
19 changes: 15 additions & 4 deletions tests/test_object_store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#[cfg(feature = "integration-test")]
mod test {
use bytes::{Buf, BufMut, BytesMut};
use chrono::{Datelike, Utc};
use hdfs_native::{
minidfs::{DfsFeatures, MiniDfs},
Client, WriteOptions,
Expand All @@ -16,7 +17,7 @@ mod test {
#[serial]
async fn test_object_store() -> object_store::Result<()> {
let dfs = MiniDfs::with_features(&HashSet::from([DfsFeatures::HA]));
let client = Client::new(&dfs.url).to_object_store_err()?;
let client = Arc::new(Client::new(&dfs.url).to_object_store_err()?);

// Create a test file with the client directly to sanity check reads and lists
let mut file = client
Expand All @@ -32,9 +33,9 @@ mod test {

client.mkdirs("/testdir", 0o755, true).await.unwrap();

let store = HdfsObjectStore::new(Arc::new(client));
let store = HdfsObjectStore::new(Arc::clone(&client));

test_object_store_head(&store).await?;
test_object_store_head(&store, &client).await?;
test_object_store_list(&store).await?;
test_object_store_rename(&store).await?;
test_object_store_read(&store).await?;
Expand All @@ -45,12 +46,22 @@ mod test {
Ok(())
}

async fn test_object_store_head(store: &HdfsObjectStore) -> object_store::Result<()> {
async fn test_object_store_head(
store: &HdfsObjectStore,
client: &Arc<Client>,
) -> object_store::Result<()> {
use object_store::{path::Path, ObjectStore};

let status = client.get_file_info("/testfile").await.unwrap();

let head = store.head(&Path::from("/testfile")).await?;
assert_eq!(head.location, Path::from("/testfile"));
assert_eq!(head.size, TEST_FILE_INTS * 4);
assert_eq!(
head.last_modified.timestamp_millis(),
status.modification_time as i64
);
assert_eq!(head.last_modified.year(), Utc::now().year());

assert!(store.head(&Path::from("/testfile2")).await.is_err());
assert!(store.head(&Path::from("/testdir")).await.is_err());
Expand Down