diff --git a/src/core/storage/query_engine/algorithm/groupby_aggregate.cpp b/src/core/storage/query_engine/algorithm/groupby_aggregate.cpp index 1bad056c81..cd64981ba4 100644 --- a/src/core/storage/query_engine/algorithm/groupby_aggregate.cpp +++ b/src/core/storage/query_engine/algorithm/groupby_aggregate.cpp @@ -206,10 +206,12 @@ std::shared_ptr planner().materialize(frame_with_relevant_cols, [&](size_t segmentid, const std::shared_ptr& rows)->bool { + container.init_tls(); if (rows == nullptr) return true; for (auto& row: *rows) { container.add(row, num_keys); } + container.flush_tls(); return false; }, thread::cpu_count()); diff --git a/src/core/storage/sframe_data/groupby_aggregate.cpp b/src/core/storage/sframe_data/groupby_aggregate.cpp index acc8689e17..0265d66d27 100644 --- a/src/core/storage/sframe_data/groupby_aggregate.cpp +++ b/src/core/storage/sframe_data/groupby_aggregate.cpp @@ -23,13 +23,18 @@ sframe groupby_aggregate(const sframe& source, // first, sanity checks // check that group keys exist if (output_column_names.size() != groups.size()) { - log_and_throw("There must be as many output columns as there are groups"); + log_and_throw("There must be as many output columns as the number groups"); } + { // check that output column names are all unique, and do not intersect with // keys. Since empty values will be automatically assigned, we will skip // those. std::set all_output_columns(keys.begin(), keys.end()); + if (all_output_columns.size() != keys.size()) { + log_and_throw("groupby keys are not unique"); + } + size_t named_column_count = 0; for (auto s: output_column_names) { if (!s.empty()) { @@ -37,6 +42,8 @@ sframe groupby_aggregate(const sframe& source, ++named_column_count; } } + + // valid if keys are unique if (all_output_columns.size() != keys.size() + named_column_count) { log_and_throw("Output columns names are not unique"); } @@ -53,7 +60,7 @@ sframe groupby_aggregate(const sframe& source, for (const auto& group: groups) { // check that the column name is valid if (group.first.size() > 0) { - for(size_t index = 0; index < group.first.size();index++) { + for (size_t index = 0; index < group.first.size(); index++) { auto& col_name = group.first[index]; if (!source.contains_column(col_name)) { log_and_throw("SFrame does not contain column " + col_name); @@ -72,17 +79,17 @@ sframe groupby_aggregate(const sframe& source, } // key should not have repeated columns + // checked at very beginning std::set key_columns; - std::set group_columns; for (const auto& key: keys) key_columns.insert(key); + DASSERT_TRUE(key_columns.size() == keys.size()); + + std::set group_columns; for (const auto& group: groups) { for(auto& col_name : group.first) { group_columns.insert(col_name); } } - if (key_columns.size() != keys.size()) { - log_and_throw("Group by key cannot have repeated column names"); - } // ok. select out just the columns I care about // begin with the key columns @@ -184,6 +191,7 @@ sframe groupby_aggregate(const sframe& source, logstream(LOG_INFO) << "Filling group container: " << std::endl; parallel_for (0, input_reader->num_segments(), [&](size_t i) { + container.init_tls(); auto iter = input_reader->begin(i); auto enditer = input_reader->end(i); while(iter != enditer) { @@ -191,6 +199,7 @@ sframe groupby_aggregate(const sframe& source, container.add(row, num_keys); ++iter; } + container.flush_tls(); }); logstream(LOG_INFO) << "Group container filled in " << ti.current_time() << std::endl; diff --git a/src/core/storage/sframe_data/groupby_aggregate_impl.cpp b/src/core/storage/sframe_data/groupby_aggregate_impl.cpp index c232f7d3a6..970d0bc072 100644 --- a/src/core/storage/sframe_data/groupby_aggregate_impl.cpp +++ b/src/core/storage/sframe_data/groupby_aggregate_impl.cpp @@ -4,11 +4,14 @@ * be found in the LICENSE.txt file or at https://opensource.org/licenses/BSD-3-Clause */ #include +#include #include +#include #include #include #include #include +#include #include namespace turi { @@ -217,128 +220,174 @@ size_t groupby_element::hash() const { /* group_aggregate_container */ /* */ /****************************************************************************/ + +/* static thread_local member initialization */ +thread_local group_aggregate_container::tls_segment_set + group_aggregate_container::tss_{}; + group_aggregate_container::group_aggregate_container(size_t max_buffer_size, - size_t num_segments): - max_buffer_size(max_buffer_size), segments(num_segments) { - intermediate_buffer.open_for_write(num_segments); - for (size_t i = 0;i < segments.size(); ++i) { - segments[i].outiter = intermediate_buffer.get_output_iterator(i); + size_t num_segments) + : max_buffer_size(max_buffer_size), + num_segments(num_segments), + task_id_(0), + gl_lock_pool_(num_segments), + gl_chunk_size_set_(num_segments) { + if (num_segments == 0) { + log_and_throw("num_segments is not allowed to be 0"); + } + + size_t num_local_buffers = + fs_util::get_file_handle_limit() / num_segments / 2; + + logstream(LOG_INFO) << "num_local_buffers: " << num_local_buffers << std::endl; + + num_local_buffers = + num_local_buffers == 0 + ? 1 + : std::min(num_local_buffers, thread_pool::get_instance().size()); + + // construct local buffers + for (size_t ii = 0; ii < num_local_buffers; ++ii) { + local_buffer_set_.emplace_back(num_segments); + local_buffer_set_.back().sa_buffer_ptr_->open_for_write(num_segments); } } -void group_aggregate_container::define_group(std::vector column_numbers, - std::shared_ptr aggregator) { +void group_aggregate_container::define_group( + std::vector column_numbers, + std::shared_ptr aggregator) { group_descriptor desc; desc.column_numbers = column_numbers; desc.aggregator = aggregator; group_descriptors.push_back(desc); } +void group_aggregate_container::init_tls() { + ASSERT_MSG(num_segments > 0, "num_segments cannot be 0"); + + ASSERT_MSG(UNLIKELY(!tss_.init_), "double init is not allowed"); + + tss_.segments_.resize(num_segments); + for (size_t ii = 0; ii < num_segments; ++ii) { + tss_.segments_[ii].segment_id_ = ii; + } + + { + std::lock_guard lock(gl_buffer_lock_); + tss_.id_ = ++task_id_; + } + + tss_.init_ = true; +} + +void group_aggregate_container::flush_tls() { + throw_if_not_initialized(); + + // MUST flush and delete raw pointer elements + for (size_t ii = 0; ii < num_segments; ++ii) flush_segment(ii); + + tss_.init_ = false; + tss_.segments_.clear(); +} + void group_aggregate_container::add(const std::vector& val, size_t num_keys) { + throw_if_not_initialized(); + size_t hash = groupby_element::hash_key(val, num_keys); - size_t target_segment = hash % segments.size(); + size_t target_segment = hash % num_segments; // acquire lock on the segment - std::unique_lock lock(segments[target_segment].in_memory_group_lock); // look for the id in the group_keys structure - auto& groupby_element_vec_ptr = segments[target_segment].elements[hash]; - if (groupby_element_vec_ptr == NULL) groupby_element_vec_ptr = new std::vector; + auto& segments = tss_.segments_; + auto& groupby_element_vec_ptr = segments[target_segment].elements_[hash]; + if (groupby_element_vec_ptr == NULL) + groupby_element_vec_ptr = new std::vector; // note. not auto&. This needs to take a real value (pointer) and not a // reference since the auto& will make it really a reference to a pointer to // a vector which will make it not robust to array resizes. auto groupby_element_vec = groupby_element_vec_ptr; - segments[target_segment].refctr.inc(); - lock.unlock(); - segments[target_segment].fine_grain_locks[hash % 128].lock(); - bool found = false; - for (size_t i = 0;i < groupby_element_vec->size(); ++i) { + for (size_t i = 0; i < groupby_element_vec->size(); ++i) { if (flexible_type_vector_equality((*groupby_element_vec)[i].key, - (*groupby_element_vec)[i].key.size(), - val, + (*groupby_element_vec)[i].key.size(), val, num_keys)) { (*groupby_element_vec)[i].add_element(val, group_descriptors); - found = true; - break; + if (segments[target_segment].elements_.size() >= max_buffer_size) { + flush_segment(target_segment); + } + return; } } - if (!found) { - groupby_element_vec->push_back(groupby_element{ - std::vector(val.begin(), val.begin() + num_keys), - group_descriptors}); - (*groupby_element_vec)[groupby_element_vec->size() - 1].add_element(val, group_descriptors); - } - segments[target_segment].fine_grain_locks[hash % 128].unlock(); - segments[target_segment].refctr.dec(); + // element not found - if (segments[target_segment].elements.size() >= max_buffer_size) { - flush_segment(target_segment); - } + groupby_element_vec->push_back(groupby_element{ + std::vector(val.begin(), val.begin() + num_keys), + group_descriptors}); + (*groupby_element_vec)[groupby_element_vec->size() - 1].add_element( + val, group_descriptors); } - +/* using thread_local for optimization */ void group_aggregate_container::add(const sframe_rows::row& val, size_t num_keys) { + throw_if_not_initialized(); + size_t hash = groupby_element::hash_key(val, num_keys); - size_t target_segment = hash % segments.size(); - // acquire lock on the segment - std::unique_lock lock(segments[target_segment].in_memory_group_lock); - // look for the id in the group_keys structure - auto& groupby_element_vec_ptr = segments[target_segment].elements[hash]; - if (groupby_element_vec_ptr == NULL) groupby_element_vec_ptr = new std::vector; + size_t target_segment = hash % num_segments; + + auto& segments = tss_.segments_; + auto& groupby_element_vec_ptr = segments[target_segment].elements_[hash]; + if (groupby_element_vec_ptr == NULL) + groupby_element_vec_ptr = new std::vector; // note. not auto&. This needs to take a real value (pointer) and not a // reference since the auto& will make it really a reference to a pointer to // a vector which will make it not robust to array resizes. auto groupby_element_vec = groupby_element_vec_ptr; - segments[target_segment].refctr.inc(); - lock.unlock(); - segments[target_segment].fine_grain_locks[hash % 128].lock(); - bool found = false; - for (size_t i = 0;i < groupby_element_vec->size(); ++i) { + + for (size_t i = 0; i < groupby_element_vec->size(); ++i) { if (flexible_type_vector_equality((*groupby_element_vec)[i].key, - (*groupby_element_vec)[i].key.size(), - val, + (*groupby_element_vec)[i].key.size(), val, num_keys)) { (*groupby_element_vec)[i].add_element(val, group_descriptors); - found = true; - break; + if (segments[target_segment].elements_.size() >= max_buffer_size) { + flush_segment(target_segment); + } + return; } } - if (!found) { - std::vector keys; keys.reserve(num_keys); - for (size_t i = 0;i < num_keys; ++i) keys.push_back(val[i]); - - groupby_element_vec->push_back(groupby_element{ - std::move(keys), - group_descriptors}); - (*groupby_element_vec)[groupby_element_vec->size() - 1].add_element(val, group_descriptors); - } - segments[target_segment].fine_grain_locks[hash % 128].unlock(); - segments[target_segment].refctr.dec(); - // element not found - if (segments[target_segment].elements.size() >= max_buffer_size) { - flush_segment(target_segment); - } + + std::vector keys; + keys.reserve(num_keys); + for (size_t i = 0; i < num_keys; ++i) keys.push_back(val[i]); + + groupby_element_vec->push_back( + groupby_element{std::move(keys), group_descriptors}); + + (*groupby_element_vec)[groupby_element_vec->size() - 1].add_element( + val, group_descriptors); } void group_aggregate_container::flush_segment(size_t segmentid) { - // unlock and swap out the segment. - std::unique_lock lock(segments[segmentid].in_memory_group_lock); - if (segments[segmentid].elements.size() == 0) return; - while(segments[segmentid].refctr.value > 0) cpu_relax(); - decltype(segments[segmentid].elements) local; - local.swap(segments[segmentid].elements); - lock.unlock(); + logstream(LOG_DEBUG) << "flush buffer from task_id: " << tss_.id_ + << ", segment_id" << segmentid << std::endl; + + auto& segments = tss_.segments_; + if (segments[segmentid].elements_.size() == 0) return; + + decltype(segments[segmentid].elements_) local; + local.swap(segments[segmentid].elements_); + if (local.size() == 0) return; // sort the buckets by hash key - std::vector*> > local_ordered_by_hash; + std::vector*> > + local_ordered_by_hash; local_ordered_by_hash.reserve(local.size()); std::move(local.begin(), local.end(), std::inserter(local_ordered_by_hash, local_ordered_by_hash.end())); std::sort(local_ordered_by_hash.begin(), local_ordered_by_hash.end()); std::vector local_sorted; - for(const auto& hash_entries: local_ordered_by_hash) { + for (const auto& hash_entries : local_ordered_by_hash) { if (hash_entries.second->size() > 1) { std::sort(hash_entries.second->begin(), hash_entries.second->end()); } @@ -347,46 +396,145 @@ void group_aggregate_container::flush_segment(size_t segmentid) { delete hash_entries.second; } - for (auto& item: local_sorted) { - for (auto& value: item.values) { + for (auto& item : local_sorted) { + for (auto& value : item.values) { value->partial_finalize(); } } // ok. now we can write! lock the file - std::unique_lock filelock(segments[segmentid].file_lock); - oarchive oarc; - for (auto& item: local_sorted) { - oarc << item; - // write into the iterator - *(segments[segmentid].outiter) = std::string(oarc.buf, oarc.off); - ++(segments[segmentid].outiter); - oarc.off = 0; + size_t round_robin = (++merry_go_round_) % local_buffer_set_.size(); + + auto& local_buffer = local_buffer_set_[round_robin]; + + local_buffer.refctr_++; + /// lock the segmeent from local_buffer before using it + { + std::lock_guard slk( + local_buffer.sa_seg_locks_[segmentid]); + + logstream(LOG_INFO) << "flush buffer from task_id: " << tss_.id_ + << ", segment_id: " << segmentid + << ", on buffer: " << round_robin << std::endl; + + auto outiter = local_buffer.sa_buffer_ptr_->get_output_iterator(segmentid); + + std::vector buffer; + for (auto& item : local_sorted) { + buffer.clear(); + oarchive out(buffer); + out << item; + *(outiter) = std::string(buffer.data(), buffer.size()); + ++(outiter); + } + + local_buffer.sa_seg_chunks_[segmentid].push_back(local_sorted.size()); + } + local_buffer.refctr_--; +} + +void group_aggregate_container::merge_local_buffer_set() { + ASSERT_MSG( + UNLIKELY(!gl_buffer_.is_opened_for_write()), + "intermediate_buffer shall be closed before stealing local_buffer."); + + turi::timer ti; + logstream(LOG_INFO) << "Merging local buffer set " << std::endl; + + // merge all other local buffers to first local buffer + std::unique_ptr> buffer_to_merge = + std::move(local_buffer_set_[0].sa_buffer_ptr_); + + auto& chunk_to_merge = local_buffer_set_[0].sa_seg_chunks_; + + for (auto& chunk : chunk_to_merge) { + chunk.reserve(chunk.size() * local_buffer_set_.size()); } - free(oarc.buf); - segments[segmentid].chunk_size.push_back(local_sorted.size()); + + // skip empty segments + std::vector non_empty_segment; + non_empty_segment.reserve(num_segments); + + for (size_t ii = 1; ii < local_buffer_set_.size(); ++ii) { + auto& local_buffer = local_buffer_set_[ii]; + while (local_buffer.refctr_ > 0) cpu_relax(); + // no writers now, close it + local_buffer.sa_buffer_ptr_->close(); + + non_empty_segment.clear(); + auto reader = local_buffer.sa_buffer_ptr_->get_reader(); + DASSERT_EQ(reader->num_segments(), num_segments); + + for (size_t ii = 0; ii < num_segments; ii++) { + if (reader->segment_length(ii)) { + non_empty_segment.push_back(ii); + } + }; + + parallel_for(0, non_empty_segment.size(), [&](size_t ii) { + size_t segment_id = non_empty_segment[ii]; + + auto reader = local_buffer.sa_buffer_ptr_->get_reader(); + DASSERT_EQ(reader->num_segments(), num_segments); + + auto outiter = buffer_to_merge->get_output_iterator(segment_id); + auto begin = reader->begin(segment_id); + auto end = reader->end(segment_id); + + logstream(LOG_DEBUG) << "segment_id: " << segment_id + << ", segment_length: " + << reader->segment_length(segment_id) + << ", buffer: " << ii << std::endl; + + // sequential write of from other sarray + while (begin != end) { + *outiter = *begin; + ++begin; + ++outiter; + } + // merge chunk size + chunk_to_merge[segment_id].insert( + std::end(chunk_to_merge[segment_id]), + local_buffer.sa_seg_chunks_[segment_id].begin(), + local_buffer.sa_seg_chunks_[segment_id].end()); + }); + + } + + // steal the first local buffer + while (local_buffer_set_.front().refctr_ > 0) cpu_relax(); + buffer_to_merge->close(); + gl_buffer_ = std::move(*buffer_to_merge); + gl_chunk_size_set_ = std::move(chunk_to_merge); + + logstream(LOG_INFO) << "Merging finished in " << ti.current_time_millis() + << " ms." << std::endl; } void group_aggregate_container::group_and_write(sframe& out) { - for (size_t i = 0 ;i < segments.size(); ++i) flush_segment(i); + ASSERT_MSG(UNLIKELY(!tss_.init_), + "call flush_tls fisrt before write out groupby result"); + + merge_local_buffer_set(); - intermediate_buffer.close(); - std::shared_ptr::reader_type> reader = intermediate_buffer.get_reader(); + if (gl_buffer_.is_opened_for_write()) gl_buffer_.close(); + + std::shared_ptr::reader_type> reader = + gl_buffer_.get_reader(); logstream(LOG_INFO) << "Groupby output segment balance: "; - for (size_t i = 0; i < reader->num_segments() ; ++i) { + for (size_t i = 0; i < reader->num_segments(); ++i) { logstream(LOG_INFO) << reader->segment_length(i) << " "; } logstream(LOG_INFO) << std::endl; - parallel_for(0, reader->num_segments(), - [&](size_t i) { - this->group_and_write_segment(out, reader, i); - }); + parallel_for(0, reader->num_segments(), [&](size_t i) { + this->group_and_write_segment(out, reader, i); + }); } -void group_aggregate_container::group_and_write_segment(sframe& out, - std::shared_ptr::reader_type> reader, - size_t segmentid) { +void group_aggregate_container::group_and_write_segment( + sframe& out, std::shared_ptr::reader_type> reader, + size_t segmentid) { // prepare the begin row and end row for each chunk. size_t segment_start = 0; @@ -399,19 +547,17 @@ void group_aggregate_container::group_and_write_segment(sframe& out, std::vector > chunks; size_t prev_row_start = segment_start; - for (size_t i = 0; i < segments[segmentid].chunk_size.size(); ++i) { + for (size_t i = 0; i < gl_chunk_size_set_[segmentid].size(); ++i) { size_t row_start = prev_row_start; - size_t row_end = row_start + segments[segmentid].chunk_size[i]; + size_t row_end = row_start + gl_chunk_size_set_[segmentid][i]; prev_row_start = row_end; - chunks.push_back(sarray_reader_buffer(reader, row_start, row_end)); + chunks.push_back( + sarray_reader_buffer(reader, row_start, row_end)); } // here is where we are going to write to auto outiter = out.get_output_iterator(segmentid); - // id of the chunks that still have elements. - std::unordered_set remaining_chunks; - // merge the chunks and write to the out iterator typedef std::pair pq_value_type; std::vector pq; @@ -466,8 +612,8 @@ void group_aggregate_container::group_and_write_segment(sframe& out, // emit emission_vector.resize(cur.key.size() + cur.values.size()); - for (size_t i = 0;i < cur.key.size(); ++i) emission_vector[i] = cur.key[i]; - for (size_t i = 0;i < cur.values.size(); ++i) { + for (size_t i = 0; i < cur.key.size(); ++i) emission_vector[i] = cur.key[i]; + for (size_t i = 0; i < cur.values.size(); ++i) { emission_vector[i + cur.key.size()] = cur.values[i]->emit(); } *outiter = emission_vector; @@ -475,5 +621,5 @@ void group_aggregate_container::group_and_write_segment(sframe& out, } } -} // namespace groupby_aggregate_impl -} // namespace turi +} // namespace groupby_aggregate_impl +} // namespace turi diff --git a/src/core/storage/sframe_data/groupby_aggregate_impl.hpp b/src/core/storage/sframe_data/groupby_aggregate_impl.hpp index ad57c32bfa..88ff55d308 100644 --- a/src/core/storage/sframe_data/groupby_aggregate_impl.hpp +++ b/src/core/storage/sframe_data/groupby_aggregate_impl.hpp @@ -226,33 +226,80 @@ class group_aggregate_container { /// Sort all elements in the container and writes to the output. void group_and_write(sframe& out); + + /// init tls data members + void init_tls(); + + void flush_tls(); + private: + /*********************** helper functions *******************/ + inline void throw_if_not_initialized() const { + if (UNLIKELY(!tss_.init_)) + log_and_throw("group_aggregate_container is not initialized"); + } + /// Writes the content into the sarray segment backend. + void flush_segment(size_t segmentid); + /// merge all local buffers into global buffer + void merge_local_buffer_set(); + + /************************ data members **********************/ /// collection of all the group operations std::vector group_descriptors; + /// constants + const size_t max_buffer_size; + const size_t num_segments; struct segment_information { - /// Locks on the elements structure - turi::simple_spinlock in_memory_group_lock; - turi::simple_spinlock fine_grain_locks[128]; - atomic refctr; - /// Intermediate group values - hopscotch_map* > elements; - - /// Locks on the below structures - turi::mutex file_lock; + hopscotch_map* > elements_; /// The temporary storage for the grouped values - sarray::iterator outiter; - /// Storing the size of each sorted chunk. - std::vector chunk_size; + size_t segment_id_; }; - /// Writes the content into the sarray segment backend. - void flush_segment(size_t segmentid); + using vec_segment_t = std::vector; + using vec_chunk_t = std::vector; + + struct tls_segment_set { + size_t id_ = 0; + bool init_{false}; + vec_segment_t segments_{}; + }; + + /// used during initialization to mark task_id (each thread should have one task_id) + unsigned task_id_; + /* life cycle is from the start of a thread to end of a thread */ + static thread_local tls_segment_set tss_; + + struct sa_buffer_t { + sa_buffer_t() = default; + + sa_buffer_t(size_t num_segments) + : sa_buffer_ptr_(new sarray()), + sa_seg_chunks_(num_segments), + sa_seg_locks_(num_segments), + refctr_(0) {}; + + // avoid inadvertent copying sarray; exclusive ownership + std::unique_ptr> sa_buffer_ptr_; + std::vector sa_seg_chunks_; + std::vector sa_seg_locks_; + turi::atomic refctr_; + }; + + std::vector local_buffer_set_; + + std::atomic merry_go_round_; - size_t max_buffer_size; - std::vector segments; - sarray intermediate_buffer; + /// gloabl buffer lock, used during the initialization + turi::simple_spinlock gl_buffer_lock_; + /// global buffer + sarray gl_buffer_; + /// lock guard for each segment in global buffer + std::vector gl_lock_pool_; + /// Storing the size of each sorted chunk for each segment + std::vector gl_chunk_size_set_; + /// global buffer reader std::unique_ptr::reader_type> reader; /// Sort all elements in the container and writes to the output. diff --git a/src/core/system/startup_teardown/startup_teardown.cpp b/src/core/system/startup_teardown/startup_teardown.cpp index 542ade8dbb..434d86c4da 100644 --- a/src/core/system/startup_teardown/startup_teardown.cpp +++ b/src/core/system/startup_teardown/startup_teardown.cpp @@ -15,6 +15,7 @@ #include #endif #include +#include #include #include #include @@ -45,37 +46,6 @@ memory_release_thread* MEMORY_RELEASE_THREAD; /* Helper functions */ /* */ /**************************************************************************/ -/** - * Attempts to increase the file handle limit. - * Returns true on success, false on failure. - */ -bool upgrade_file_handle_limit(size_t limit) { -#ifndef _WIN32 - struct rlimit rlim; - rlim.rlim_cur = limit; - rlim.rlim_max = limit; - return setrlimit(RLIMIT_NOFILE, &rlim) == 0; -#else - return true; -#endif -} - -/** - * Gets the current file handle limit. - * Returns the current file handle limit on success, - * -1 on infinity, and 0 on failure. - */ -int get_file_handle_limit() { -#ifndef _WIN32 - struct rlimit rlim; - int ret = getrlimit(RLIMIT_NOFILE, &rlim); - if (ret != 0) return 0; - return int(rlim.rlim_cur); -#else - return 4096; -#endif -} - void install_sighandlers() { #ifdef _WIN32 // Make sure dialog boxes don't come up for errors (apparently doesn't affect @@ -136,8 +106,8 @@ void configure_global_environment(std::string argv0) { // reason is that on Mac, once a file descriptor has been used (even STDOUT), // the file handle limit increase will appear to work, but will in fact fail // silently. - upgrade_file_handle_limit(4096); - int file_handle_limit = get_file_handle_limit(); + fs_util::upgrade_file_handle_limit(4096); + int file_handle_limit = fs_util::get_file_handle_limit(); if (file_handle_limit < 4096) { logstream(LOG_WARNING) << "Unable to raise the file handle limit to 4096. " diff --git a/src/core/util/fs_util.cpp b/src/core/util/fs_util.cpp index 7d45996af3..3d9e19b5f3 100644 --- a/src/core/util/fs_util.cpp +++ b/src/core/util/fs_util.cpp @@ -21,6 +21,31 @@ #include #include +#ifndef _WIN32 +#include +#endif + +bool turi::fs_util::upgrade_file_handle_limit(size_t limit) { +#ifndef _WIN32 + struct rlimit rlim; + rlim.rlim_cur = limit; + rlim.rlim_max = limit; + return setrlimit(RLIMIT_NOFILE, &rlim) == 0; +#else + return true; +#endif +} + +int turi::fs_util::get_file_handle_limit() { +#ifndef _WIN32 + struct rlimit rlim; + int ret = getrlimit(RLIMIT_NOFILE, &rlim); + if (ret != 0) return 0; + return int(rlim.rlim_cur); +#else + return 4096; +#endif +} bool is_hidden(const std::string path) { if (path.length() && path[0] == '.') { diff --git a/src/core/util/fs_util.hpp b/src/core/util/fs_util.hpp index 4851a078ed..975fbb1dbe 100644 --- a/src/core/util/fs_util.hpp +++ b/src/core/util/fs_util.hpp @@ -14,39 +14,49 @@ namespace turi { namespace fs_util { - /** - * List all the files with the given suffix at the pathname - * location - */ - void list_files_with_suffix(const std::string& pathname, - const std::string& suffix, - std::vector& files, - bool ignore_hidden=true); - - - /** - * List all the files with the given prefix at the pathname - * location - */ - void list_files_with_prefix(const std::string& pathname, - const std::string& prefix, - std::vector& files, - bool ignore_hidden=true); - - - /// \ingroup util_internal - std::string change_suffix(const std::string& fname, - const std::string& new_suffix); - - - std::string join(const std::vector& components); - - // Generate a path under the system temporary directory. - // NOTE: This function (like the underlying boost::filesystem call) does - // not guard against race conditions, and therefore should not be used in - // security-critical settings. - std::string system_temp_directory_unique_path( - const std::string& prefix, const std::string& suffix); + /** + * Attempts to increase the file handle limit. + * Returns true on success, false on failure. + */ + bool upgrade_file_handle_limit(size_t limit); + + /** + * Gets the current file handle limit. + * Returns the current file handle limit on success, + * -1 on infinity, and 0 on failure. + */ + int get_file_handle_limit(); + + /** + * List all the files with the given suffix at the pathname + * location + */ + void list_files_with_suffix(const std::string& pathname, + const std::string& suffix, + std::vector& files, + bool ignore_hidden = true); + + /** + * List all the files with the given prefix at the pathname + * location + */ + void list_files_with_prefix(const std::string& pathname, + const std::string& prefix, + std::vector& files, + bool ignore_hidden = true); + + /// \ingroup util_internal + std::string change_suffix(const std::string& fname, + const std::string& new_suffix); + + std::string join(const std::vector& components); + + // Generate a path under the system temporary directory. + // NOTE: This function (like the underlying boost::filesystem call) does + // not guard against race conditions, and therefore should not be used in + // security-critical settings. + std::string system_temp_directory_unique_path(const std::string& prefix, + const std::string& suffix); }; // end of fs_utils