Skip to content
Merged
Show file tree
Hide file tree
Changes from 81 commits
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
60e554b
Initial implementation
IsaacWarren Feb 9, 2026
27b009c
Fix hang
IsaacWarren Feb 10, 2026
86da5fa
Fix streams
IsaacWarren Feb 10, 2026
d4f9507
Change device assignment location
IsaacWarren Feb 10, 2026
a0e639d
Fix raii variable lifetime
IsaacWarren Feb 10, 2026
3f33757
Add print
IsaacWarren Feb 10, 2026
e619601
Move device assignment to executor
IsaacWarren Feb 10, 2026
47e24ed
Move device assignment to plan construction
IsaacWarren Feb 10, 2026
31c630d
Cleanup dbg msg
IsaacWarren Feb 10, 2026
7d26605
Reset read parquet
IsaacWarren Feb 10, 2026
325102e
Shuffle empty chunks
IsaacWarren Feb 10, 2026
c2a7d30
Debug pipeline
IsaacWarren Feb 10, 2026
f6f8c5c
sync is_last
IsaacWarren Feb 10, 2026
7ec2547
dbg msg
IsaacWarren Feb 10, 2026
4ca1230
dbg msg
IsaacWarren Feb 10, 2026
f4527d0
dbg msg
IsaacWarren Feb 10, 2026
b4cf5f7
dbg msg
IsaacWarren Feb 10, 2026
fe1f17a
Don't unpack empty buffers
IsaacWarren Feb 10, 2026
6fa881e
Shuffle only when a rank has data
IsaacWarren Feb 11, 2026
7e15f63
Set device in tests
IsaacWarren Feb 11, 2026
2be2ca1
Only set device if have one
IsaacWarren Feb 11, 2026
71bacc2
Only assign to device on gpu assigned ranks
IsaacWarren Feb 11, 2026
9584967
Cleanup dbg msg
IsaacWarren Feb 11, 2026
6706b3e
Cleanup dbg msg
IsaacWarren Feb 11, 2026
8e16177
Cleanup prints
IsaacWarren Feb 11, 2026
16ecbfa
Add new dbg msg
IsaacWarren Feb 11, 2026
54c66ef
Trigger barrier only if no inflight/pending shuffles
IsaacWarren Feb 11, 2026
b90859f
Dbg shuffle coordination
IsaacWarren Feb 11, 2026
64b876a
If shuffling and empty table, send empty table
IsaacWarren Feb 11, 2026
6eb8920
Fix completion in progress
IsaacWarren Feb 11, 2026
f6a21c1
Don't redeclare packed_tables
IsaacWarren Feb 11, 2026
4581899
dbg msg
IsaacWarren Feb 11, 2026
0753ab5
Wait for shuffle stream if finished
IsaacWarren Feb 11, 2026
f167865
Sync before probe
IsaacWarren Feb 11, 2026
97c56e0
Generate parquet filenames based on rank
IsaacWarren Feb 11, 2026
326a6ce
dbg msg
IsaacWarren Feb 12, 2026
f795a57
dbg msg
IsaacWarren Feb 12, 2026
b5cac34
Dbg pipeline
IsaacWarren Feb 12, 2026
03d5eda
Check probe output
IsaacWarren Feb 12, 2026
c9da5ae
Shuffle build
IsaacWarren Feb 12, 2026
35cbecd
If global completion, exit early
IsaacWarren Feb 12, 2026
182fc15
Revert "If global completion, exit early"
IsaacWarren Feb 12, 2026
0142e55
Return early if all_complete
IsaacWarren Feb 12, 2026
6dc95b6
Make sure there's not a shuffle coordination inflight before exiting
IsaacWarren Feb 12, 2026
fde278e
Reset message even on empty shuffle coordination
IsaacWarren Feb 12, 2026
4122a29
sync build stream in build
IsaacWarren Feb 12, 2026
fc930e1
dbg pipeline
IsaacWarren Feb 12, 2026
1cab505
Cancel inflight shuffle coordination when all complete
IsaacWarren Feb 12, 2026
d3c93c2
Add all_complete print
IsaacWarren Feb 12, 2026
9c2045f
Don't wait for completion request
IsaacWarren Feb 12, 2026
3dd3e42
Debug barrier
IsaacWarren Feb 12, 2026
44c3b98
Modify end signaling condition
IsaacWarren Feb 12, 2026
6655d39
Debug from pipeline
IsaacWarren Feb 12, 2026
d1ca8d2
Check why barrier isn't completing
IsaacWarren Feb 12, 2026
dbfc3eb
Print global_completion
IsaacWarren Feb 12, 2026
699d390
Remove cancel
IsaacWarren Feb 12, 2026
f62a47c
Cleanup dbg msg
IsaacWarren Feb 12, 2026
0bcc4ae
Print mpi rank
IsaacWarren Feb 12, 2026
de8f93a
Print in build batch
IsaacWarren Feb 12, 2026
0b9a402
Debug pipeline
IsaacWarren Feb 12, 2026
8369877
Print in finalizej
IsaacWarren Feb 12, 2026
c8b52fa
Disable pipeline dbg
IsaacWarren Feb 12, 2026
bb36555
More dbg
IsaacWarren Feb 12, 2026
2f4ac4f
dbg msg
IsaacWarren Feb 12, 2026
080b057
More print
IsaacWarren Feb 12, 2026
c3dd975
Only build hash table on gpu ranks
IsaacWarren Feb 12, 2026
f7c7124
Build hash table if gpu assigned
IsaacWarren Feb 12, 2026
7d57a9d
More dbg msg
IsaacWarren Feb 12, 2026
20a0f3b
More dbg msg
IsaacWarren Feb 12, 2026
e068e05
Move arrow schema creation
IsaacWarren Feb 12, 2026
7709c8b
dbg msg
IsaacWarren Feb 12, 2026
bfea87d
Cleanup prints
IsaacWarren Feb 13, 2026
a08a9f9
Try inlining variable
IsaacWarren Feb 13, 2026
54bdd38
Merge remote-tracking branch 'origin/main' into isaac/gpu_shuffle_join
IsaacWarren Feb 16, 2026
8dbeabc
Fix GPU parquet reader join filter initialization
IsaacWarren Feb 16, 2026
adcf98f
Fix ranks > gpu
IsaacWarren Feb 16, 2026
d82f6ee
Add cudf-debug pixi task
IsaacWarren Feb 16, 2026
0afcdc7
Remove prints
IsaacWarren Feb 16, 2026
aea318e
[Run CI]
IsaacWarren Feb 16, 2026
adb1ea1
Cleanup and fix compiling without cudf [run ci]
IsaacWarren Feb 16, 2026
ddc0984
Move cuda import into use_cudf [run ci]
IsaacWarren Feb 16, 2026
0022895
Review comments
IsaacWarren Feb 16, 2026
68cea41
Review comments [run ci]
IsaacWarren Feb 16, 2026
c04742a
Get back, not src_rank [run ci]
IsaacWarren Feb 16, 2026
905db6e
Review cleanup
IsaacWarren Feb 17, 2026
1d27204
[Run CI]
IsaacWarren Feb 17, 2026
bf69bc0
Fix header mismatch [run ci]
IsaacWarren Feb 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 114 additions & 19 deletions bodo/libs/gpu_utils.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#include "gpu_utils.h"
#include <mpi_proto.h>
#include "vendored/simd-block-fixed-fpp.h"

#ifdef USE_CUDF
#include <thrust/execution_policy.h>
Expand Down Expand Up @@ -27,9 +29,6 @@ GpuShuffleManager::GpuShuffleManager()
return;
}

// There's probably a more robust way to handle this
CHECK_CUDA(cudaSetDevice(this->gpu_id.value()));

// Get rank and size
MPI_Comm_rank(mpi_comm, &this->rank);
MPI_Comm_size(mpi_comm, &this->n_ranks);
Expand Down Expand Up @@ -80,18 +79,46 @@ void GpuShuffleManager::shuffle_table(
if (mpi_comm == MPI_COMM_NULL) {
return;
}
// Hash partition the table
auto [partitioned_table, partition_start_rows] =
hash_partition_table(table, partition_indices, n_ranks);
if (table->num_rows() == 0) {
return;
}
this->tables_to_shuffle.emplace_back(std::move(table), partition_indices);
}

assert(partition_start_rows.size() == static_cast<size_t>(n_ranks));
// Contiguous splits requires the split indices excluding the first 0
// So we create a new vector from partition_start_rows[1..end]
std::vector<cudf::size_type> splits = std::vector<cudf::size_type>(
partition_start_rows.begin() + 1, partition_start_rows.end());
// Pack the tables for sending
std::vector<cudf::packed_table> packed_tables =
cudf::contiguous_split(partitioned_table->view(), splits, stream);
void GpuShuffleManager::do_shuffle() {
std::vector<cudf::packed_table> packed_tables;
if (!this->tables_to_shuffle.empty()) {
auto [table, partition_indices] =
std::move(this->tables_to_shuffle.back());
this->tables_to_shuffle.pop_back();

// Hash partition the table
auto [partitioned_table, partition_start_rows] =
hash_partition_table(table, partition_indices, n_ranks);

assert(partition_start_rows.size() == static_cast<size_t>(n_ranks));
// Contiguous splits requires the split indices excluding the first 0
// So we create a new vector from partition_start_rows[1..end]
std::vector<cudf::size_type> splits = std::vector<cudf::size_type>(
partition_start_rows.begin() + 1, partition_start_rows.end());
// Pack the tables for sending
packed_tables =
cudf::contiguous_split(partitioned_table->view(), splits, stream);
} else {
// If we have no data to shuffle, we still need to create empty packed
// tables for each rank so that the shuffle can proceed without special
// casing empty sends/receives
cudf::table empty_table(
cudf::table_view(std::vector<cudf::column_view>{}));

for (int i = 0; i < n_ranks; i++) {
cudf::packed_columns empty_packed_columns =
cudf::pack(empty_table.view(), stream);
cudf::packed_table empty_packed_table(
empty_table.view(), std::move(empty_packed_columns));
packed_tables.push_back(std::move(empty_packed_table));
}
}

assert(packed_tables.size() == static_cast<size_t>(n_ranks));

Expand All @@ -101,14 +128,57 @@ void GpuShuffleManager::shuffle_table(

// Each shuffle will use 3 tags for shuffling metadata/gpu data
// sizes and metadata buffers
if (inflight_shuffles.size() * 3 < static_cast<size_t>(MAX_TAG_VAL)) {
if (inflight_shuffles.size() * 3 > static_cast<size_t>(MAX_TAG_VAL)) {
throw std::runtime_error(
"Exceeded maxiumum number of inflight shuffles");
}
this->curr_tag = (this->curr_tag + 3) % MAX_TAG_VAL;
}

std::vector<std::unique_ptr<cudf::table>> GpuShuffleManager::progress() {
// If complete has been signaled and there are no inflight shuffles or
// tables to shuffle, we can start the global completion barrier. This needs
// to be called on all ranks even without GPUs assigned so they know when
// they can exit the pipeline.
if (this->complete_signaled && inflight_shuffles.empty() &&
tables_to_shuffle.empty() &&
global_completion_req == MPI_REQUEST_NULL && !global_completion) {
int mpi_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
CHECK_MPI(MPI_Ibarrier(MPI_COMM_WORLD, &global_completion_req),
"GpuShuffleManager::complete: MPI_Ibarrier failed:");
}

if (mpi_comm == MPI_COMM_NULL || this->all_complete()) {
return {};
}

if (this->shuffle_coordination.req == MPI_REQUEST_NULL) {
// Coordinate when to shuffle by doing an allreduce, ranks with data
// send 1, ranks without data send 0, this way all ranks will know when
// a shuffle is needed and can call progress to start it
this->shuffle_coordination.has_data =
this->tables_to_shuffle.empty() ? 0 : 1;
CHECK_MPI(
MPI_Iallreduce(MPI_IN_PLACE, &this->shuffle_coordination.has_data,
1, MPI_INT, MPI_MAX, mpi_comm,
&this->shuffle_coordination.req),
"GpuShuffleManager::progress: MPI_Iallreduce failed:");
} else {
int coordination_finished;
CHECK_MPI(MPI_Test(&this->shuffle_coordination.req,
&coordination_finished, MPI_STATUS_IGNORE),
"GpuShuffleManager::progress: MPI_Test failed:");
if (coordination_finished) {
if (this->shuffle_coordination.has_data) {
// If a shuffle is needed, start it
this->do_shuffle();
}
// Reset coordination for next shuffle
this->shuffle_coordination.req = MPI_REQUEST_NULL;
}
}

std::vector<std::unique_ptr<cudf::table>> received_tables;
for (GpuShuffle& shuffle : this->inflight_shuffles) {
std::optional<std::unique_ptr<cudf::table>> progress_res =
Expand Down Expand Up @@ -312,14 +382,17 @@ GpuShuffle::progress_waiting_for_data() {

if (all_metadata_received && gpu_data_received) {
// Unpack received tables
std::vector<cudf::table_view> table_views(n_ranks);
std::vector<cudf::packed_columns> packed_recv_columns(n_ranks);
std::vector<cudf::table_view> table_views;
std::vector<cudf::packed_columns> packed_recv_columns;
for (size_t src_rank = 0; src_rank < packed_recv_buffers.size();
src_rank++) {
packed_recv_columns[src_rank] = cudf::packed_columns(
if (this->packed_recv_buffers[src_rank]->size() == 0) {
continue;
}
packed_recv_columns.emplace_back(
std::move(this->metadata_recv_buffers[src_rank]),
std::move(this->packed_recv_buffers[src_rank]));
table_views[src_rank] = cudf::unpack(packed_recv_columns[src_rank]);
table_views.push_back(cudf::unpack(packed_recv_columns[src_rank]));
}
// Deallocate all receive data
this->metadata_recv_buffers.clear();
Expand Down Expand Up @@ -382,6 +455,28 @@ void GpuShuffle::progress_sending_data() {
}
}

void GpuShuffleManager::complete() { this->complete_signaled = true; }

bool GpuShuffleManager::all_complete() {
if (global_completion_req != MPI_REQUEST_NULL) {
CHECK_MPI(MPI_Test(&global_completion_req, &this->global_completion,
MPI_STATUS_IGNORE),
"GpuShuffleManager::all_complete: MPI_Test failed:");
if (global_completion) {
// If global completion is reached, we can cancel any inflight
// shuffle coordination since we know all data has been sent
if (this->shuffle_coordination.req != MPI_REQUEST_NULL) {
// CHECK_MPI(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead code? Nothing else in the if?

// MPI_Cancel(&this->shuffle_coordination.req),
// "GpuShuffleManager::all_complete: MPI_Cancel failed:");
}
this->shuffle_coordination.req = MPI_REQUEST_NULL;
this->global_completion_req = MPI_REQUEST_NULL;
}
}
return this->global_completion;
}

std::pair<std::unique_ptr<cudf::table>, std::vector<cudf::size_type>>
hash_partition_table(std::shared_ptr<cudf::table> table,
const std::vector<cudf::size_type>& column_indices,
Expand Down
30 changes: 29 additions & 1 deletion bodo/libs/gpu_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ struct GpuShuffle {
void progress_sending_data();
};

struct DoShuffleCoordination {
MPI_Request req = MPI_REQUEST_NULL;
int has_data;
};

/**
* @brief Class for managing async shuffle of cudf::tables using NCCL
*/
Expand Down Expand Up @@ -197,11 +202,29 @@ class GpuShuffleManager {

const int MAX_TAG_VAL;

// This is used to coordinate the start of shuffles across ranks
DoShuffleCoordination shuffle_coordination;

// IBarrier to know when all ranks are done sending data
MPI_Request global_completion_req = MPI_REQUEST_NULL;
int global_completion = false;
bool complete_signaled = false;

std::vector<
std::pair<std::shared_ptr<cudf::table>, std::vector<cudf::size_type>>>
tables_to_shuffle;

/**
* @brief Initialize NCCL communicator
*/
void initialize_nccl();

/**
* @brief Once we've determined we will shuffle, start the shuffle by
* partitioning the table and posting sends/receives
*/
void do_shuffle();

public:
GpuShuffleManager();
~GpuShuffleManager();
Expand Down Expand Up @@ -243,7 +266,12 @@ class GpuShuffleManager {
* @brief Check if there are any inflight shuffles
* @return true if there are inflight shuffles, false otherwise
*/
bool inflight_exists() const { return !inflight_shuffles.empty(); }
bool all_complete();

/**
* @brief Idempotent call to signify that this rank has no more data to send
*/
void complete();

bool is_available() const { return true; }
};
Expand Down
Loading
Loading