Skip to content
This repository was archived by the owner on Dec 21, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/core/storage/query_engine/algorithm/groupby_aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,12 @@ std::shared_ptr<sframe>
planner().materialize(frame_with_relevant_cols,
[&](size_t segmentid,
const std::shared_ptr<sframe_rows>& rows)->bool {
container.init_tls();
if (rows == nullptr) return true;
for (auto& row: *rows) {
container.add(row, num_keys);
}
container.flush_tls();
return false;
},
thread::cpu_count());
Expand Down
21 changes: 15 additions & 6 deletions src/core/storage/sframe_data/groupby_aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,27 @@ sframe groupby_aggregate(const sframe& source,
// first, sanity checks
// check that group keys exist
if (output_column_names.size() != groups.size()) {
log_and_throw("There must be as many output columns as there are groups");
log_and_throw("There must be as many output columns as the number groups");
}

{
// check that output column names are all unique, and do not intersect with
// keys. Since empty values will be automatically assigned, we will skip
// those.
std::set<std::string> all_output_columns(keys.begin(), keys.end());
if (all_output_columns.size() != keys.size()) {
log_and_throw("groupby keys are not unique");
}

size_t named_column_count = 0;
for (auto s: output_column_names) {
if (!s.empty()) {
all_output_columns.insert(s);
++named_column_count;
}
}

// valid if keys are unique
if (all_output_columns.size() != keys.size() + named_column_count) {
log_and_throw("Output columns names are not unique");
}
Expand All @@ -53,7 +60,7 @@ sframe groupby_aggregate(const sframe& source,
for (const auto& group: groups) {
// check that the column name is valid
if (group.first.size() > 0) {
for(size_t index = 0; index < group.first.size();index++) {
for (size_t index = 0; index < group.first.size(); index++) {
auto& col_name = group.first[index];
if (!source.contains_column(col_name)) {
log_and_throw("SFrame does not contain column " + col_name);
Expand All @@ -72,17 +79,17 @@ sframe groupby_aggregate(const sframe& source,
}

// key should not have repeated columns
// checked at very beginning
std::set<std::string> key_columns;
std::set<std::string> group_columns;
for (const auto& key: keys) key_columns.insert(key);
DASSERT_TRUE(key_columns.size() == keys.size());

std::set<std::string> group_columns;
for (const auto& group: groups) {
for(auto& col_name : group.first) {
group_columns.insert(col_name);
}
}
if (key_columns.size() != keys.size()) {
log_and_throw("Group by key cannot have repeated column names");
}

// ok. select out just the columns I care about
// begin with the key columns
Expand Down Expand Up @@ -184,13 +191,15 @@ sframe groupby_aggregate(const sframe& source,
logstream(LOG_INFO) << "Filling group container: " << std::endl;
parallel_for (0, input_reader->num_segments(),
[&](size_t i) {
container.init_tls();
auto iter = input_reader->begin(i);
auto enditer = input_reader->end(i);
while(iter != enditer) {
auto& row = *iter;
container.add(row, num_keys);
++iter;
}
container.flush_tls();
});

logstream(LOG_INFO) << "Group container filled in " << ti.current_time() << std::endl;
Expand Down
Loading