Skip to content

Multi-GPU Join#1024

Merged
IsaacWarren merged 87 commits intomainfrom
isaac/gpu_shuffle_join
Feb 17, 2026
Merged

Multi-GPU Join#1024
IsaacWarren merged 87 commits intomainfrom
isaac/gpu_shuffle_join

Conversation

@IsaacWarren
Copy link
Contributor

@IsaacWarren IsaacWarren commented Feb 10, 2026

Changes included in this PR

Uses shuffle managers for multi-gpu join. Adds APIs to shuffle manager to synchronize completion across ranks. Various additional bugfixes.

Testing strategy

PR CI, shuffle tests,

import bodo.pandas as pd

customer = pd.read_parquet("bodo/tests/data/tpch-test_data/parquet/customer.pq")
orders = pd.read_parquet("bodo/tests/data/tpch-test_data/parquet/orders.pq")
merged = customer.merge(orders, how='inner', left_on=["C_CUSTKEY"], right_on=["O_CUSTKEY"])
merged.to_parquet("merged.cudf")

on 1 GPU with 1 and 2 ranks and on 4 GPUS with 1,4, and 8 ranks.

User facing changes

Parallel GPU join

Checklist

  • Pipelines passed before requesting review. To run CI you must include [run CI] in your commit message.
  • I am familiar with the Contributing Guide
  • I have installed + ran pre-commit hooks.

@IsaacWarren IsaacWarren marked this pull request as ready for review February 16, 2026 18:05
@codecov
Copy link

codecov bot commented Feb 16, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 68.72%. Comparing base (c33fbb5) to head (bf69bc0).
⚠️ Report is 210 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1024      +/-   ##
==========================================
+ Coverage   66.68%   68.72%   +2.03%     
==========================================
  Files         186      195       +9     
  Lines       66795    67835    +1040     
  Branches     9507     9668     +161     
==========================================
+ Hits        44543    46618    +2075     
+ Misses      19572    18365    -1207     
- Partials     2680     2852     +172     

Copy link
Collaborator

@DrTodd13 DrTodd13 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved but let's double check the waits and records and syncs.

// Store the incoming build chunk for later finalization
_build_chunks.push_back(std::move(build_chunk));
// TODO: remove unused columns before shuffling to save network bandwidth
// and GPU memory Store the incoming build chunk for later finalization
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// and GPU memory Store the incoming build chunk for later finalization
// and GPU memory. Store the incoming build chunk for later finalization


// Concatenate all incoming chunks into one contiguous table and join
// against it
std::vector<cudf::table_view> probe_views;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This snippet is pretty frequent. We should make a function that takes chunks and returns concated table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did but we sometimes have unique pointers and sometimes shared so you need a conversion loop outside the function some of the time either way

// The hash map object (opaque handle to the GPU hash table)
std::unique_ptr<cudf::hash_join> _join_handle;

// The output schema of the join probe phase, which is needed for for
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// The output schema of the join probe phase, which is needed for for
// The output schema of the join probe phase, which is needed for

// If global completion is reached, we can cancel any inflight
// shuffle coordination since we know all data has been sent
if (this->shuffle_coordination.req != MPI_REQUEST_NULL) {
// CHECK_MPI(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead code? Nothing else in the if?

OperatorResult ConsumeBatchGPU(
GPU_DATA input_batch, OperatorResult prev_op_result,
std::shared_ptr<StreamAndEvent> se) override {
se->event.wait(se->stream);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this working? How can a stream wait on its own completion signal? Did you mean to wait on the previous stage to finish? That is done automatically by the pipeline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know it was done by the pipeline, if we're waiting between every operator what is the benefit of the async stream?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the benefit is that we can overlap pipeline stages? For example while processing the Join consume for one batch we can start processing the next read-parquet batch. It's important to note that wait doesn't block the host it is just telling stream A that all future work submitted has to wait for an event in stream B:
https://www.cs.cmu.edu/afs/cs/academic/class/15668-s11/www/cuda-doc/html/group__CUDART__STREAM_gfe68d207dc965685d92d3f03d77b0876.html

}
cuda_join->BuildConsumeBatch(input_batch.table);
return prev_op_result == OperatorResult::FINISHED
se->event.record(se->stream);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The recording of this stage's completion is also done in the pipeline now.

std::shared_ptr<StreamAndEvent> se) override {
int mpi_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
se->event.wait(se->stream);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here. This is an anti-pattern I think.

se};
return {output_gpu_data, prev_op_result};

se->event.record(se->stream);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

std::shared_ptr<StreamAndEvent> se) override {
se->event.wait(se->stream);
if (cuda_join->build_shuffle_manager.all_complete()) {
cudaStreamSynchronize(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess there can be cases where the path you take depends on the data so you have to sync but in other cases not. Are we sure this is s a sync and not a wait?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's a difference in this case since the shuffle manager has it's own stream

// Ensure we have a GPU context for this rank
// Note: In a real test runner, this might be handled by a fixture
rmm::cuda_device_id device_id = get_gpu_id();
std::cout << "Rank GPU ID: " << device_id.value() << std::endl;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still want these cout's in here?

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates the GPU execution path to support multi-GPU joins by routing join build/probe through the GPU shuffle manager, and adds shuffle-manager APIs for coordinating completion across ranks.

Changes:

  • Extend GpuShuffleManager with rank-coordinated shuffle start and global completion signaling (complete() / all_complete()).
  • Update GPU join implementation to use shuffle managers for build/probe data movement.
  • Miscellaneous fixes: Parquet GPU write part naming, deferred GPU parquet filter initialization, Arrow LARGE_STRING mapping.

Reviewed changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 11 comments.

Show a summary per file
File Description
pixi.toml Adds pixi tasks for building a CUDF-enabled debug configuration.
bodo/tests/test_streaming/test_gpu_shuffle.cpp Updates shuffle tests to use the new completion APIs and adds device selection logic.
bodo/pandas/physical/gpu_write_parquet.h Changes GPU parquet output naming to include rank-aware piece names.
bodo/pandas/physical/gpu_read_parquet.h Defers filter expression initialization to first produce call.
bodo/pandas/physical/gpu_join.h Switches GPU join to coordinate build/probe completion via shuffle manager.
bodo/pandas/_util.cpp Adds Arrow LARGE_STRING → cuDF string type mapping.
bodo/pandas/_plan.cpp Assigns CUDA devices per-rank during plan execution (CUDF builds).
bodo/pandas/_executor.cpp Adds an RMM include (context for GPU execution).
bodo/libs/streaming/cuda_join.h Expands join state to include output schema and shuffle managers.
bodo/libs/streaming/cuda_join.cpp Implements build/probe shuffling and empty-result handling for GPU join.
bodo/libs/gpu_utils.h Adds shuffle coordination and completion APIs to the GPU shuffle manager.
bodo/libs/gpu_utils.cpp Implements shuffle coordination, completion barrier, and related shuffle receive/unpack changes.
Comments suppressed due to low confidence (1)

bodo/libs/gpu_utils.cpp:406

  • progress_waiting_for_data() builds packed_recv_columns with emplace_back, but then indexes it using src_rank (packed_recv_columns[src_rank]) after skipping some ranks. This can go out-of-bounds and/or unpack the wrong buffer. Also, skipping entries when packed_recv_buffers[src_rank]->size() == 0 drops valid "empty partition" messages (metadata present but 0 GPU bytes), which can lead to table_views being empty and cudf::concatenate(table_views) failing, or schema mismatches. The unpack path should preserve ordering (or use a separate index) and handle 0-byte GPU buffers correctly.
    if (all_metadata_received && gpu_data_received) {
        // Unpack received tables
        std::vector<cudf::table_view> table_views;
        std::vector<cudf::packed_columns> packed_recv_columns;
        for (size_t src_rank = 0; src_rank < packed_recv_buffers.size();
             src_rank++) {
            if (this->packed_recv_buffers[src_rank]->size() == 0) {
                continue;
            }
            packed_recv_columns.emplace_back(
                std::move(this->metadata_recv_buffers[src_rank]),
                std::move(this->packed_recv_buffers[src_rank]));
            table_views.push_back(cudf::unpack(packed_recv_columns[src_rank]));
        }
        // Deallocate all receive data
        this->metadata_recv_buffers.clear();
        this->packed_recv_buffers.clear();
        this->metadata_recv_reqs->clear();
        // Move to completed state
        this->recv_state = GpuShuffleState::COMPLETED;

        std::unique_ptr<cudf::table> shuffle_res =
            cudf::concatenate(table_views);


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@IsaacWarren
Copy link
Contributor Author

Re-tested on 4 GPUs after addressing review commnets

Copy link
Contributor

@scott-routledge2 scott-routledge2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @IsaacWarren!

probe_views.push_back(chunk->view());
}
std::unique_ptr<cudf::table> coalesced_probe =
cudf::concatenate(probe_views);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use stream for the cudf operations in this function? (Concat/gather)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a separate task to use the async streams in join

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that every cudf operation would operate on some stream we define. This is unilateral on the synchronous stream right? Why does it need to be that way?

GPU_DATA input_batch, OperatorResult prev_op_result,
std::shared_ptr<StreamAndEvent> se) override {
if (cuda_join->probe_shuffle_manager.all_complete()) {
cudaStreamSynchronize(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cudaStreamSynchronize is going to block CPU until the work on that stream is complete, maybe we want to do a wait here instead so we can continue with processing the next batch?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. We all need to be on the same page about use of asynchrony strategy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took another look at the synchronization and I don't think this is necessary, I was just being paranoid while debugging 😆. Removed

@IsaacWarren IsaacWarren merged commit 2fa5b9a into main Feb 17, 2026
30 checks passed
@IsaacWarren IsaacWarren deleted the isaac/gpu_shuffle_join branch February 17, 2026 17:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants