Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 57 additions & 7 deletions src/paimon/common/fs/file_system_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -979,16 +979,21 @@ TEST_P(FileSystemTest, TestMkdirsFailsWithExistingParentFile) {
}

TEST_P(FileSystemTest, TestMkdir) {
std::string path = PathUtil::JoinPath(test_root_, "/tmp.txt/tmpB");
ASSERT_OK(fs_->Mkdirs(path));
{
std::string path = PathUtil::JoinPath(test_root_, "/tmp.txt/tmpB");
ASSERT_OK(fs_->Mkdirs(path));
}
{
std::string path = PathUtil::JoinPath(test_root_, "/tmpA/tmpB/");
ASSERT_OK(fs_->Mkdirs(path));
}
{
std::string path = "/";
ASSERT_OK(fs_->Mkdirs(path));
}
}

TEST_P(FileSystemTest, TestMkdir2) {
std::string path = PathUtil::JoinPath(test_root_, "/tmpA/tmpB/");
ASSERT_OK(fs_->Mkdirs(path));
}

TEST_P(FileSystemTest, TestMkdir3) {
{
std::string dir_path = test_root_ + "/file_dir/";
ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
Expand All @@ -1012,6 +1017,7 @@ TEST_P(FileSystemTest, TestMkdir3) {
}
}

// test for create multi dir such as "/table/partition1/bucket1" and "/table/partition1/bucket2"
TEST_P(FileSystemTest, TestMkdirMultiThread) {
uint32_t runs_count = 10;
uint32_t thread_count = 10;
Expand All @@ -1036,6 +1042,50 @@ TEST_P(FileSystemTest, TestMkdirMultiThread) {
}
}

// test for create multi dir such as "/table/partition1" and "/table/partition1"
TEST_P(FileSystemTest, TestMkdirMultiThread2) {
uint32_t runs_count = 10;
uint32_t thread_count = 10;
auto executor = CreateDefaultExecutor(thread_count);

for (uint32_t i = 0; i < runs_count; i++) {
std::string uuid;
ASSERT_TRUE(UUID::Generate(&uuid));
std::vector<std::future<void>> futures;
for (uint32_t thread_idx = 0; thread_idx < thread_count; thread_idx++) {
futures.push_back(Via(executor.get(), [this, &uuid]() -> void {
std::string dir_path = PathUtil::JoinPath(test_root_, uuid);
ASSERT_OK(fs_->Mkdirs(dir_path));
ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
ASSERT_TRUE(is_exist);
}));
}
Wait(futures);
}
}

// test for create multi dir such as "partition1" and "partition1" (relative path)
TEST_P(FileSystemTest, TestMkdirMultiThread3) {
uint32_t runs_count = 10;
uint32_t thread_count = 10;
auto executor = CreateDefaultExecutor(thread_count);

for (uint32_t i = 0; i < runs_count; i++) {
std::string uuid;
ASSERT_TRUE(UUID::Generate(&uuid));
std::vector<std::future<void>> futures;
for (uint32_t thread_idx = 0; thread_idx < thread_count; thread_idx++) {
futures.push_back(Via(executor.get(), [this, &uuid]() -> void {
std::string dir_path = uuid;
ASSERT_OK(fs_->Mkdirs(dir_path));
ASSERT_OK_AND_ASSIGN(bool is_exist, fs_->Exists(dir_path));
ASSERT_TRUE(is_exist);
}));
}
Wait(futures);
}
}

TEST_P(FileSystemTest, TestInvalidMkdir) {
{
// test mkdir with one exist dir
Expand Down
2 changes: 2 additions & 0 deletions src/paimon/common/utils/path_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class PAIMON_EXPORT PathUtil {
~PathUtil() = delete;

static std::string JoinPath(const std::string& path, const std::string& name) noexcept;
// TODO(jinli.zjw): should pass `Path.path` and normalize; otherwise if path is
// "oss://bucket1/", GetParentDirPath will return "oss:"
static std::string GetParentDirPath(const std::string& path) noexcept;
static std::string GetName(const std::string& path) noexcept;
static void TrimLastDelim(std::string* dir_path) noexcept;
Expand Down
59 changes: 10 additions & 49 deletions src/paimon/fs/local/local_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,68 +123,36 @@ Status LocalFile::Delete() const {
return Status::OK();
}

Status LocalFile::MkNestDir(const std::string& dir_name) const {
Result<bool> LocalFile::MkNestDir(const std::string& dir_name) const {
CHECK_HOOK();
size_t pos = dir_name.rfind('/');
if (pos == std::string::npos) {
if (mkdir(dir_name.c_str(), 0755) < 0) {
if (errno != EEXIST) {
int32_t cur_errno = errno;
return Status::IOError(fmt::format("MkNestDir path '{}' fail, ec: {}", path_,
std::strerror(cur_errno)));
}
}
return Status::OK();
return mkdir(dir_name.c_str(), 0755) == 0;
}

std::string parent_dir = dir_name.substr(0, pos);
if (!parent_dir.empty() && access(parent_dir.c_str(), F_OK) != 0) {
PAIMON_RETURN_NOT_OK(MkNestDir(parent_dir));
}

if (mkdir(dir_name.c_str(), 0755) < 0) {
if (errno != EEXIST) {
int32_t cur_errno = errno;
return Status::IOError(
fmt::format("MkNestDir path '{}' fail, ec: {}", path_, std::strerror(cur_errno)));
PAIMON_ASSIGN_OR_RAISE(bool success, MkNestDir(parent_dir));
if (!success) {
return false;
}
}
return Status::OK();
return mkdir(dir_name.c_str(), 0755) == 0;
}

Status LocalFile::Mkdir() const {
Result<bool> LocalFile::Mkdir() const {
CHECK_HOOK();
std::string dir = path_;
size_t len = dir.size();
if (dir[len - 1] == '/') {
if (len == 1) {
return Status::Exist(fmt::format("directory '{}' already exist", dir));
return false;
} else {
dir.resize(len - 1);
}
}
if (access(dir.c_str(), F_OK) == 0) {
return Status::Exist(fmt::format("directory '{}' already exist", dir));
}
size_t pos = dir.rfind('/');
if (pos == std::string::npos) {
if (mkdir(dir.c_str(), 0755) < 0) {
int32_t cur_errno = errno;
return Status::IOError(
fmt::format("Mkdir path '{}' fail, ec: {}", dir, std::strerror(cur_errno)));
}
return Status::OK();
}
std::string parent_dir = dir.substr(0, pos);
if (!parent_dir.empty() && access(parent_dir.c_str(), F_OK) != 0) {
PAIMON_RETURN_NOT_OK(MkNestDir(parent_dir));
}
if (mkdir(dir.c_str(), 0755) < 0) {
int32_t cur_errno = errno;
return Status::IOError(
fmt::format("create directory '{}' failed, ec: {}", dir, std::strerror(cur_errno)));
}
return Status::OK();
PAIMON_ASSIGN_OR_RAISE(bool success, MkNestDir(dir));
return success;
}

Result<std::unique_ptr<LocalFileStatus>> LocalFile::GetFileStatus() const {
Expand Down Expand Up @@ -349,13 +317,6 @@ Status LocalFile::OpenFile(bool is_read_file) {
CHECK_HOOK();
file_ = fopen(path_.c_str(), "r");
} else {
LocalFile parent_dir = GetParentFile();
if (!parent_dir.GetAbsolutePath().empty()) {
PAIMON_ASSIGN_OR_RAISE(bool is_exist, parent_dir.Exists());
if (!is_exist) {
PAIMON_RETURN_NOT_OK(parent_dir.Mkdir());
}
}
CHECK_HOOK();
file_ = fopen(path_.c_str(), "w");
}
Expand Down
4 changes: 2 additions & 2 deletions src/paimon/fs/local/local_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class LocalFile {
Status Delete() const;
const std::string& GetAbsolutePath() const;
LocalFile GetParentFile() const;
Status Mkdir() const;
Result<bool> Mkdir() const;
Result<std::unique_ptr<LocalFileStatus>> GetFileStatus() const;
Result<uint64_t> Length() const;
Result<int64_t> LastModifiedTimeMs() const;
Expand All @@ -68,7 +68,7 @@ class LocalFile {
}

private:
Status MkNestDir(const std::string& dir_name) const;
Result<bool> MkNestDir(const std::string& dir_name) const;

const std::string path_;
FILE* file_ = nullptr;
Expand Down
29 changes: 16 additions & 13 deletions src/paimon/fs/local/local_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ Status LocalFileSystem::Mkdirs(const std::string& path) const {
}

Status LocalFileSystem::MkdirsInternal(const LocalFile& file) const {
// Important: The 'Exists()' check above must come before the 'IsDirectory()'
// Important: The 'Exists()' check above must come before the 'IsDir()'
// check to be safe when multiple parallel instances try to create the directory
PAIMON_ASSIGN_OR_RAISE(bool is_exist, file.Exists());
if (is_exist) {
Expand All @@ -90,7 +90,20 @@ Status LocalFileSystem::MkdirsInternal(const LocalFile& file) const {
}
}

PAIMON_RETURN_NOT_OK(file.Mkdir());
auto parent = file.GetParentFile();
if (!parent.IsEmpty()) {
PAIMON_RETURN_NOT_OK(MkdirsInternal(parent));
}
PAIMON_ASSIGN_OR_RAISE(bool success, file.Mkdir());
if (!success) {
PAIMON_ASSIGN_OR_RAISE(bool is_dir, file.IsDir());
if (is_dir) {
return Status::OK();
} else {
return Status::IOError(
fmt::format("create directory '{}' failed", file.GetAbsolutePath()));
}
}
return Status::OK();
}

Expand Down Expand Up @@ -210,17 +223,7 @@ Status LocalFileSystem::Rename(const std::string& src, const std::string& dst) c
}
PAIMON_ASSIGN_OR_RAISE(LocalFile dst_file, ToFile(dst));
auto parent = dst_file.GetParentFile();
if (!parent.GetAbsolutePath().empty()) {
PAIMON_ASSIGN_OR_RAISE(bool is_exist, parent.Exists());
if (is_exist) {
// pass
} else {
Status status = parent.Mkdir();
if (!status.ok() && !status.IsExist()) {
return status;
}
}
}
PAIMON_RETURN_NOT_OK(Mkdirs(parent.GetAbsolutePath()));
if (::rename(src.c_str(), dst.c_str()) != 0) {
int32_t cur_errno = errno;
return Status::IOError(err_msg, std::strerror(cur_errno));
Expand Down
28 changes: 17 additions & 11 deletions src/paimon/fs/local/local_file_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ TEST(LocalFileTest, TestReadWriteEmptyContent) {
if (dir.Exists().ok()) {
ASSERT_TRUE(dir.Delete().ok());
}
ASSERT_TRUE(dir.Mkdir().ok());
ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir());
ASSERT_TRUE(success);
std::string path = test_root + "/test.txt";
LocalFile file = LocalFile(path);
if (file.Exists().ok()) {
Expand Down Expand Up @@ -69,7 +70,8 @@ TEST(LocalFileTest, TestSimple) {
if (dir.Exists().ok()) {
ASSERT_OK(dir.Delete());
}
ASSERT_OK(dir.Mkdir());
ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir());
ASSERT_TRUE(success);
std::string path = test_root + "/test.txt";
LocalFile file = LocalFile(path);
if (file.Exists().ok()) {
Expand Down Expand Up @@ -127,19 +129,24 @@ TEST(LocalFileTest, TestSimple) {
ASSERT_EQ(strcmp(str_read, "test_data"), 0);
}

ASSERT_OK_AND_ASSIGN(success, dir.Mkdir());
ASSERT_FALSE(success);

ASSERT_OK(file2.Delete());
ASSERT_FALSE(file2.Exists().value());
}

TEST(LocalFileTest, TestUsage) {
std::string test_root = "tmp/local_file_test_usage";
LocalFile dir = LocalFile(test_root);
ASSERT_OK(dir.Mkdir());
ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir());
ASSERT_TRUE(success);
std::vector<std::string> file_list;
ASSERT_OK(dir.List(&file_list));
std::string path_deep_dir = test_root + "/tmp2/tmp3";
LocalFile deep_dir = LocalFile(path_deep_dir);
ASSERT_OK(deep_dir.Mkdir());
ASSERT_OK_AND_ASSIGN(success, deep_dir.Mkdir());
ASSERT_TRUE(success);
LocalFile parent_deep_dir = deep_dir.GetParentFile();
ASSERT_EQ(parent_deep_dir.GetAbsolutePath(), test_root + "/tmp2");
ASSERT_OK(deep_dir.Delete());
Expand All @@ -155,7 +162,8 @@ TEST(LocalFileTest, TestOpenFile) {
if (dir.Exists().ok()) {
ASSERT_OK(dir.Delete());
}
ASSERT_OK(dir.Mkdir());
ASSERT_OK_AND_ASSIGN(bool success, dir.Mkdir());
ASSERT_TRUE(success);
std::string path = test_root + "/test.txt";
LocalFile file = LocalFile(path);
if (file.Exists().ok()) {
Expand All @@ -167,20 +175,18 @@ TEST(LocalFileTest, TestOpenFile) {
ASSERT_NOK_WITH_MSG(file.OpenFile(/*is_read_file=*/true), "file not exist");
ASSERT_NOK_WITH_MSG(dir.OpenFile(/*is_read_file=*/true), "cannot open a directory");

std::string path2 = test_root + "/foo/test.txt";
LocalFile file2 = LocalFile(path2);
ASSERT_OK(file2.OpenFile(/*is_read_file=*/false));

std::string path3 = "test.txt";
LocalFile file3 = LocalFile(path3);
ASSERT_OK(file3.OpenFile(/*is_read_file=*/false));
ASSERT_OK_AND_ASSIGN(int64_t modify_time, file3.LastModifiedTimeMs());
ASSERT_GE(modify_time, -1);

LocalFile dir2 = LocalFile("/");
ASSERT_NOK_WITH_MSG(dir2.Mkdir(), "directory '/' already exist");
ASSERT_OK_AND_ASSIGN(success, dir2.Mkdir());
ASSERT_FALSE(success);
LocalFile dir3 = LocalFile(test_root + "/");
ASSERT_NOK_WITH_MSG(dir3.Mkdir(), "already exist");
ASSERT_OK_AND_ASSIGN(success, dir3.Mkdir());
ASSERT_FALSE(success);
}

} // namespace paimon::test
Loading