Skip to content
This repository was archived by the owner on Dec 21, 2023. It is now read-only.

Commit dd8f896

Browse files
author
Guihao Liang
committed
wrong local buffer set size is set; corrected it
1 parent ccb9e6a commit dd8f896

2 files changed

Lines changed: 7 additions & 4 deletions

File tree

src/core/storage/sframe_data/groupby_aggregate_impl.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ group_aggregate_container::group_aggregate_container(size_t max_buffer_size,
244244
num_local_buffers =
245245
num_local_buffers == 0
246246
? 1
247-
: std::max<size_t>(num_local_buffers, std::thread::hardware_concurrency());
247+
: std::min<size_t>(num_local_buffers, thread_pool::get_instance().size());
248248

249249
// construct local buffers
250250
for (size_t ii = 0; ii < num_local_buffers; ++ii) {
@@ -401,7 +401,7 @@ void group_aggregate_container::flush_segment(size_t segmentid) {
401401
}
402402
}
403403
// ok. now we can write! lock the file
404-
size_t round_robin = ++merry_go_round_ % local_buffer_set_.size();
404+
size_t round_robin = (++merry_go_round_) % local_buffer_set_.size();
405405

406406
auto& local_buffer = local_buffer_set_[round_robin];
407407

@@ -411,6 +411,9 @@ void group_aggregate_container::flush_segment(size_t segmentid) {
411411
std::lock_guard<turi::simple_spinlock> slk(
412412
local_buffer.sa_seg_locks_[segmentid]);
413413

414+
logstream(LOG_PROGRESS) << "flush buffer from task_id: " << tss_.id_
415+
<< ", segment_id: "<< segmentid << ", on buffer: " << round_robin << std::endl;
416+
414417
auto outiter = local_buffer.sa_buffer_ptr_->get_output_iterator(segmentid);
415418
oarchive oarc;
416419
for (auto& item : local_sorted) {

test/sframe/sframe_bench_aggregate.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,8 +273,8 @@ int main(int argc, char** argv) {
273273
if (argc > 2) reps = std::stoi(argv[2]);
274274
if (argc > 3) nusers = std::stoi(argv[3]);
275275

276-
if (false) turi::bench_test_aggreate_count_summary(nrows, reps);
277-
turi::bench_test_aggreate_min_summary(nrows, nusers, reps, -1000, 1000);
276+
turi::bench_test_aggreate_count_summary(nrows, reps);
277+
if (false) turi::bench_test_aggreate_min_summary(nrows, nusers, reps, -1000, 1000);
278278
// if (false) turi::bench_test_aggreate_avg_summary(nrows, reps, -1000,
279279
// 1000);
280280

0 commit comments

Comments
 (0)