diff --git a/bodo/libs/gpu_utils.cpp b/bodo/libs/gpu_utils.cpp index 2f27af9413..127bc212ea 100644 --- a/bodo/libs/gpu_utils.cpp +++ b/bodo/libs/gpu_utils.cpp @@ -1,4 +1,6 @@ #include "gpu_utils.h" +#include +#include "vendored/simd-block-fixed-fpp.h" #ifdef USE_CUDF #include @@ -27,9 +29,6 @@ GpuShuffleManager::GpuShuffleManager() return; } - // There's probably a more robust way to handle this - CHECK_CUDA(cudaSetDevice(this->gpu_id.value())); - // Get rank and size MPI_Comm_rank(mpi_comm, &this->rank); MPI_Comm_size(mpi_comm, &this->n_ranks); @@ -80,18 +79,46 @@ void GpuShuffleManager::shuffle_table( if (mpi_comm == MPI_COMM_NULL) { return; } - // Hash partition the table - auto [partitioned_table, partition_start_rows] = - hash_partition_table(table, partition_indices, n_ranks); + if (table->num_rows() == 0) { + return; + } + this->tables_to_shuffle.emplace_back(std::move(table), partition_indices); +} - assert(partition_start_rows.size() == static_cast(n_ranks)); - // Contiguous splits requires the split indices excluding the first 0 - // So we create a new vector from partition_start_rows[1..end] - std::vector splits = std::vector( - partition_start_rows.begin() + 1, partition_start_rows.end()); - // Pack the tables for sending - std::vector packed_tables = - cudf::contiguous_split(partitioned_table->view(), splits, stream); +void GpuShuffleManager::do_shuffle() { + std::vector packed_tables; + if (!this->tables_to_shuffle.empty()) { + auto [table, partition_indices] = + std::move(this->tables_to_shuffle.back()); + this->tables_to_shuffle.pop_back(); + + // Hash partition the table + auto [partitioned_table, partition_start_rows] = hash_partition_table( + table, partition_indices, n_ranks, this->stream); + + assert(partition_start_rows.size() == static_cast(n_ranks)); + // Contiguous splits requires the split indices excluding the first 0 + // So we create a new vector from partition_start_rows[1..end] + std::vector splits = std::vector( + partition_start_rows.begin() + 1, partition_start_rows.end()); + // Pack the tables for sending + packed_tables = + cudf::contiguous_split(partitioned_table->view(), splits, stream); + } else { + // If we have no data to shuffle, we still need to create empty packed + // tables for each rank so that the shuffle can proceed without special + // casing empty sends/receives + cudf::table empty_table( + cudf::table_view(std::vector{})); + + for (int i = 0; i < n_ranks; i++) { + cudf::packed_columns empty_packed_columns = + cudf::pack(empty_table.view(), stream); + cudf::packed_table empty_packed_table( + empty_table.view(), std::move(empty_packed_columns)); + packed_tables.push_back(std::move(empty_packed_table)); + } + } assert(packed_tables.size() == static_cast(n_ranks)); @@ -101,14 +128,55 @@ void GpuShuffleManager::shuffle_table( // Each shuffle will use 3 tags for shuffling metadata/gpu data // sizes and metadata buffers - if (inflight_shuffles.size() * 3 < static_cast(MAX_TAG_VAL)) { + if (inflight_shuffles.size() * 3 > static_cast(MAX_TAG_VAL)) { throw std::runtime_error( - "Exceeded maxiumum number of inflight shuffles"); + "Exceeded maximum number of inflight shuffles"); } this->curr_tag = (this->curr_tag + 3) % MAX_TAG_VAL; } std::vector> GpuShuffleManager::progress() { + // If complete has been signaled and there are no inflight shuffles or + // tables to shuffle, we can start the global completion barrier. This needs + // to be called on all ranks even without GPUs assigned so they know when + // they can exit the pipeline. + if (this->complete_signaled && inflight_shuffles.empty() && + tables_to_shuffle.empty() && + global_completion_req == MPI_REQUEST_NULL && !global_completion) { + CHECK_MPI(MPI_Ibarrier(MPI_COMM_WORLD, &global_completion_req), + "GpuShuffleManager::complete: MPI_Ibarrier failed:"); + } + + if (mpi_comm == MPI_COMM_NULL || this->all_complete()) { + return {}; + } + + if (this->shuffle_coordination.req == MPI_REQUEST_NULL) { + // Coordinate when to shuffle by doing an allreduce, ranks with data + // send 1, ranks without data send 0, this way all ranks will know when + // a shuffle is needed and can call progress to start it + this->shuffle_coordination.has_data = + this->tables_to_shuffle.empty() ? 0 : 1; + CHECK_MPI( + MPI_Iallreduce(MPI_IN_PLACE, &this->shuffle_coordination.has_data, + 1, MPI_INT, MPI_MAX, mpi_comm, + &this->shuffle_coordination.req), + "GpuShuffleManager::progress: MPI_Iallreduce failed:"); + } else { + int coordination_finished; + CHECK_MPI(MPI_Test(&this->shuffle_coordination.req, + &coordination_finished, MPI_STATUS_IGNORE), + "GpuShuffleManager::progress: MPI_Test failed:"); + if (coordination_finished) { + if (this->shuffle_coordination.has_data) { + // If a shuffle is needed, start it + this->do_shuffle(); + } + // Reset coordination for next shuffle + this->shuffle_coordination.req = MPI_REQUEST_NULL; + } + } + std::vector> received_tables; for (GpuShuffle& shuffle : this->inflight_shuffles) { std::optional> progress_res = @@ -312,14 +380,17 @@ GpuShuffle::progress_waiting_for_data() { if (all_metadata_received && gpu_data_received) { // Unpack received tables - std::vector table_views(n_ranks); - std::vector packed_recv_columns(n_ranks); + std::vector table_views; + std::vector packed_recv_columns; for (size_t src_rank = 0; src_rank < packed_recv_buffers.size(); src_rank++) { - packed_recv_columns[src_rank] = cudf::packed_columns( + if (this->packed_recv_buffers[src_rank]->size() == 0) { + continue; + } + packed_recv_columns.emplace_back( std::move(this->metadata_recv_buffers[src_rank]), std::move(this->packed_recv_buffers[src_rank])); - table_views[src_rank] = cudf::unpack(packed_recv_columns[src_rank]); + table_views.push_back(cudf::unpack(packed_recv_columns.back())); } // Deallocate all receive data this->metadata_recv_buffers.clear(); @@ -382,10 +453,32 @@ void GpuShuffle::progress_sending_data() { } } +void GpuShuffleManager::complete() { this->complete_signaled = true; } + +bool GpuShuffleManager::all_complete() { + if (global_completion_req != MPI_REQUEST_NULL) { + CHECK_MPI(MPI_Test(&global_completion_req, &this->global_completion, + MPI_STATUS_IGNORE), + "GpuShuffleManager::all_complete: MPI_Test failed:"); + if (global_completion) { + // If global completion is reached, we can cancel any inflight + // shuffle coordination since we know all data has been sent + if (this->shuffle_coordination.req != MPI_REQUEST_NULL) { + // CHECK_MPI( + // MPI_Cancel(&this->shuffle_coordination.req), + // "GpuShuffleManager::all_complete: MPI_Cancel failed:"); + } + this->shuffle_coordination.req = MPI_REQUEST_NULL; + this->global_completion_req = MPI_REQUEST_NULL; + } + } + return this->global_completion; +} + std::pair, std::vector> hash_partition_table(std::shared_ptr table, const std::vector& column_indices, - cudf::size_type num_partitions) { + cudf::size_type num_partitions, cudaStream_t stream) { if (column_indices.empty()) { throw std::invalid_argument("Column indices cannot be empty"); } @@ -395,7 +488,9 @@ hash_partition_table(std::shared_ptr table, column_indices.end()); // Partition the table based on the hash of the selected columns - return cudf::hash_partition(table->view(), indices, num_partitions); + return cudf::hash_partition(table->view(), indices, num_partitions, + cudf::hash_id::HASH_MURMUR3, + cudf::DEFAULT_HASH_SEED, stream); } rmm::cuda_device_id get_gpu_id() { diff --git a/bodo/libs/gpu_utils.h b/bodo/libs/gpu_utils.h index c3a449c31e..f21f50f104 100644 --- a/bodo/libs/gpu_utils.h +++ b/bodo/libs/gpu_utils.h @@ -165,6 +165,11 @@ struct GpuShuffle { void progress_sending_data(); }; +struct DoShuffleCoordination { + MPI_Request req = MPI_REQUEST_NULL; + int has_data; +}; + /** * @brief Class for managing async shuffle of cudf::tables using NCCL */ @@ -197,11 +202,29 @@ class GpuShuffleManager { const int MAX_TAG_VAL; + // This is used to coordinate the start of shuffles across ranks + DoShuffleCoordination shuffle_coordination; + + // IBarrier to know when all ranks are done sending data + MPI_Request global_completion_req = MPI_REQUEST_NULL; + int global_completion = false; + bool complete_signaled = false; + + std::vector< + std::pair, std::vector>> + tables_to_shuffle; + /** * @brief Initialize NCCL communicator */ void initialize_nccl(); + /** + * @brief Once we've determined we will shuffle, start the shuffle by + * partitioning the table and posting sends/receives + */ + void do_shuffle(); + public: GpuShuffleManager(); ~GpuShuffleManager(); @@ -243,7 +266,12 @@ class GpuShuffleManager { * @brief Check if there are any inflight shuffles * @return true if there are inflight shuffles, false otherwise */ - bool inflight_exists() const { return !inflight_shuffles.empty(); } + bool all_complete(); + + /** + * @brief Idempotent call to signify that this rank has no more data to send + */ + void complete(); bool is_available() const { return true; } }; @@ -253,12 +281,14 @@ class GpuShuffleManager { * @param table Input table * @param column_indices Column indices to hash * @param num_partitions Number of partitions + * @param stream CUDA stream to use for partitioning * @return Pair of partitioned table and partition indices */ std::pair, std::vector> hash_partition_table(std::shared_ptr table, const std::vector& column_indices, - cudf::size_type num_partitions); + cudf::size_type num_partitions, + cudaStream_t stream = cudf::get_default_stream()); /** * @brief Get the GPU device ID for the current process. All ranks must call diff --git a/bodo/libs/streaming/cuda_join.cpp b/bodo/libs/streaming/cuda_join.cpp index 0bf3506948..8f69a7cff0 100644 --- a/bodo/libs/streaming/cuda_join.cpp +++ b/bodo/libs/streaming/cuda_join.cpp @@ -5,8 +5,10 @@ #include #include #include +#include #include "../../pandas/physical/operator.h" #include "../_utils.h" +#include "_util.h" std::shared_ptr SyncAndReduceGlobalStats( std::shared_ptr local_stats) { @@ -95,31 +97,53 @@ void CudaHashJoin::build_hash_table( } void CudaHashJoin::FinalizeBuild() { - this->build_hash_table(this->_build_chunks); + // Build the hash table if we have a gpu assigned to us + if (this->build_shuffle_manager.get_mpi_comm() != MPI_COMM_NULL) { + this->build_hash_table(this->_build_chunks); + } std::shared_ptr build_table_arrow_schema = this->build_table_schema->ToArrowSchema(); + for (const auto& col_idx : this->build_key_indices) { - auto [min, max] = - cudf::minmax(this->_build_table->get_column(col_idx).view()); - std::vector> columns; - columns.emplace_back(cudf::make_column_from_scalar(*min, 1)); - columns.emplace_back(cudf::make_column_from_scalar(*max, 1)); - std::shared_ptr stats_table = - std::make_shared(std::move(columns)); - - std::vector> fields = { - arrow::field("min", - build_table_arrow_schema->field(col_idx)->type()), - arrow::field("max", - build_table_arrow_schema->field(col_idx)->type())}; - GPU_DATA stats_gpu_data = { - stats_table, std::make_shared(std::move(fields)), - make_stream_and_event(false)}; - std::shared_ptr local_stats = - convertGPUToArrow(stats_gpu_data); + std::shared_ptr local_stats; + if (this->build_shuffle_manager.get_mpi_comm() != MPI_COMM_NULL) { + auto [min, max] = + cudf::minmax(this->_build_table->get_column(col_idx).view()); + std::vector> columns; + columns.emplace_back(cudf::make_column_from_scalar(*min, 1)); + columns.emplace_back(cudf::make_column_from_scalar(*max, 1)); + std::shared_ptr stats_table = + std::make_shared(std::move(columns)); + + std::vector> fields = { + arrow::field("min", + build_table_arrow_schema->field(col_idx)->type()), + arrow::field("max", + build_table_arrow_schema->field(col_idx)->type())}; + GPU_DATA stats_gpu_data = { + stats_table, std::make_shared(std::move(fields)), + make_stream_and_event(false)}; + local_stats = convertGPUToArrow(stats_gpu_data); + } else { + // If we don't have a GPU, we still need to participate in the + // global stats reduction, so we create an table with null vals + std::vector> fields = { + arrow::field("min", + build_table_arrow_schema->field(col_idx)->type()), + arrow::field("max", + build_table_arrow_schema->field(col_idx)->type())}; + local_stats = arrow::Table::Make( + std::make_shared(std::move(fields)), + {arrow::MakeArrayOfNull( + build_table_arrow_schema->field(col_idx)->type(), 1) + .ValueOrDie(), + arrow::MakeArrayOfNull( + build_table_arrow_schema->field(col_idx)->type(), 1) + .ValueOrDie()}); + } std::shared_ptr global_stats = - SyncAndReduceGlobalStats(local_stats); + SyncAndReduceGlobalStats(std::move(local_stats)); this->min_max_stats.push_back(global_stats); } @@ -128,88 +152,84 @@ void CudaHashJoin::FinalizeBuild() { } void CudaHashJoin::BuildConsumeBatch(std::shared_ptr build_chunk) { + // TODO: remove unused columns before shuffling to save network bandwidth + // and GPU memory. // Store the incoming build chunk for later finalization - _build_chunks.push_back(std::move(build_chunk)); + this->build_shuffle_manager.shuffle_table(build_chunk, + this->build_key_indices); + std::vector> shuffled_build_chunks = + build_shuffle_manager.progress(); + for (auto& chunk : shuffled_build_chunks) { + this->_build_chunks.emplace_back(std::move(chunk)); + } } - std::unique_ptr CudaHashJoin::ProbeProcessBatch( const std::shared_ptr& probe_chunk) { - if (!_join_handle) { + if (!_join_handle && + this->probe_shuffle_manager.get_mpi_comm() != MPI_COMM_NULL) { throw std::runtime_error( "Hash table not built. Call FinalizeBuild first."); } - // Perform the join using the pre-built hash table - cudf::table_view probe_view = probe_chunk->view(); + // TODO: remove unused columns before shuffling to save network bandwidth + // and GPU memory Send local data to appropriate ranks + probe_shuffle_manager.shuffle_table(probe_chunk, this->probe_key_indices); + + // Receive data destined for this rank + std::vector> shuffled_probe_chunks = + probe_shuffle_manager.progress(); + if (shuffled_probe_chunks.empty() || + this->probe_shuffle_manager.get_mpi_comm() == MPI_COMM_NULL) { + return empty_table_from_arrow_schema( + this->output_schema->ToArrowSchema()); + } + + // Concatenate all incoming chunks into one contiguous table and join + // against it + std::vector probe_views; + probe_views.reserve(shuffled_probe_chunks.size()); + for (const auto& chunk : shuffled_probe_chunks) { + probe_views.push_back(chunk->view()); + } + std::unique_ptr coalesced_probe = + cudf::concatenate(probe_views); + + auto [probe_indices, build_indices] = _join_handle->inner_join( + coalesced_probe->select(this->probe_key_indices)); - auto [probe_indices_ptr, build_indices_ptr] = - _join_handle->inner_join(probe_view.select(this->probe_key_indices)); + if (probe_indices->size() == 0) { + return empty_table_from_arrow_schema( + this->output_schema->ToArrowSchema()); + } - cudf::table_view selected_probe_view = probe_chunk->select( + // Create views for the columns we want to keep + cudf::table_view probe_kept_view = coalesced_probe->select( this->probe_kept_cols.begin(), this->probe_kept_cols.end()); - cudf::table_view selected_build_view = _build_table->select( + cudf::table_view build_kept_view = _build_table->select( this->build_kept_cols.begin(), this->build_kept_cols.end()); - // Check for empty result to avoid errors - if (probe_indices_ptr->size() == 0) { - std::vector> final_columns; - for (auto& col : selected_probe_view) { - std::unique_ptr empty_col = cudf::empty_like(col); - final_columns.push_back(std::move(empty_col)); - } + // Create column views of the indices from the indices buffers + cudf::column_view probe_idx_view(cudf::data_type{cudf::type_id::INT32}, + probe_indices->size(), + probe_indices->data(), nullptr, 0); - // Move columns from build side - for (auto& col : selected_build_view) { - std::unique_ptr empty_col = cudf::empty_like(col); - final_columns.push_back(std::move(empty_col)); - } - - // Return empty table - std::unique_ptr empty_out = - std::make_unique(std::move(final_columns)); - return empty_out; - } + cudf::column_view build_idx_view(cudf::data_type{cudf::type_id::INT32}, + build_indices->size(), + build_indices->data(), nullptr, 0); - // 2. Create column_views from the raw indices - // We wrap the raw device memory in a column_view. - // NOTE: The underlying uvectors must outlive these views (they do here). - cudf::column_view probe_idx_view( - cudf::data_type{ - cudf::type_id::INT32}, // indices are always size_type (int32) - probe_indices_ptr->size(), probe_indices_ptr->data(), - nullptr, // no null mask - 0 // null count - ); + // Materialize the selected rows + auto gathered_probe = cudf::gather(probe_kept_view, probe_idx_view); + auto gathered_build = cudf::gather(build_kept_view, build_idx_view); - cudf::column_view build_idx_view(cudf::data_type{cudf::type_id::INT32}, - build_indices_ptr->size(), - build_indices_ptr->data(), nullptr, 0); - - // Gather the actual data - // This creates new tables containing only the matching rows - std::unique_ptr gathered_probe = - cudf::gather(selected_probe_view, probe_idx_view); - std::unique_ptr gathered_build = - cudf::gather(selected_build_view, build_idx_view); - - // Assemble the final result - // We extract the columns from the gathered tables and combine them into one - // vector. + // Assemble Final Result std::vector> final_columns; - // Move columns from probe side for (auto& col : gathered_probe->release()) { final_columns.push_back(std::move(col)); } - - // Move columns from build side for (auto& col : gathered_build->release()) { final_columns.push_back(std::move(col)); } - // Construct the final joined table - std::unique_ptr result_table = - std::make_unique(std::move(final_columns)); - - return result_table; + return std::make_unique(std::move(final_columns)); } diff --git a/bodo/libs/streaming/cuda_join.h b/bodo/libs/streaming/cuda_join.h index bb7da1688d..1f3bc30965 100644 --- a/bodo/libs/streaming/cuda_join.h +++ b/bodo/libs/streaming/cuda_join.h @@ -1,6 +1,7 @@ #pragma once -#include <../../bodo/libs/_bodo_common.h> #include +#include "../_bodo_common.h" +#include "../gpu_utils.h" #ifdef USE_CUDF #include #include @@ -16,6 +17,10 @@ struct CudaHashJoin { // The hash map object (opaque handle to the GPU hash table) std::unique_ptr _join_handle; + // The output schema of the join probe phase, which is needed for + // constructing empty result tables when there are no matches + std::shared_ptr output_schema; + /** * @brief Build the hash table from the accumulated build chunks * @param build_chunks Vector of build table chunks @@ -45,8 +50,10 @@ struct CudaHashJoin { std::shared_ptr probe_schema, std::vector build_kept_cols, std::vector probe_kept_cols, + std::shared_ptr output_schema, cudf::null_equality null_eq = cudf::null_equality::EQUAL) - : build_key_indices(std::move(build_keys)), + : output_schema(std::move(output_schema)), + build_key_indices(std::move(build_keys)), probe_key_indices(std::move(probe_keys)), build_kept_cols(std::move(build_kept_cols)), probe_kept_cols(std::move(probe_kept_cols)), @@ -81,6 +88,11 @@ struct CudaHashJoin { std::vector> get_min_max_stats() { return min_max_stats; } + + // Public so PhysicalGPUJoin can access to determine if there are pending + // shuffles + GpuShuffleManager build_shuffle_manager; + GpuShuffleManager probe_shuffle_manager; }; #else struct CudaHashJoin {}; diff --git a/bodo/pandas/_executor.cpp b/bodo/pandas/_executor.cpp index a27b61f586..ab3d90bfda 100644 --- a/bodo/pandas/_executor.cpp +++ b/bodo/pandas/_executor.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include diff --git a/bodo/pandas/_plan.cpp b/bodo/pandas/_plan.cpp index fa40717057..d5abc1b16a 100644 --- a/bodo/pandas/_plan.cpp +++ b/bodo/pandas/_plan.cpp @@ -44,6 +44,12 @@ #include "duckdb/planner/operator/logical_sample.hpp" #include "optimizer/runtime_join_filter.h" +#ifdef USE_CUDF +#include +#include "../libs/gpu_utils.h" +#include "cuda_runtime_api.h" +#endif + // if status of arrow::Result is not ok, form an err msg and raise a // runtime_error with it #undef CHECK_ARROW @@ -1230,11 +1236,28 @@ duckdb::unique_ptr make_set_operation( std::pair execute_plan( std::unique_ptr plan, PyObject *out_schema_py) { +#ifdef USE_CUDF + // Assign ranks to cuda devices + int prev_device = -1; + rmm::cuda_device_id gpu_id = get_gpu_id(); + if (gpu_id.value() != -1) { + cudaGetDevice(&prev_device); + cudaSetDevice(gpu_id.value()); + } +#endif + std::shared_ptr out_schema = unwrap_schema(out_schema_py); Executor executor(std::move(plan), out_schema); std::variant, PyObject *> output = executor.ExecutePipelines(); +#ifdef USE_CUDF + // Reset to previous cuda device + if (gpu_id.value() != -1) { + cudaSetDevice(prev_device); + } +#endif + // Iceberg write returns a PyObject* with file information if (std::holds_alternative(output)) { PyObject *file_infos = std::get(output); diff --git a/bodo/pandas/_util.cpp b/bodo/pandas/_util.cpp index 07f7f3a765..5c34f63fe8 100644 --- a/bodo/pandas/_util.cpp +++ b/bodo/pandas/_util.cpp @@ -1412,6 +1412,8 @@ cudf::data_type arrow_to_cudf_type(const std::shared_ptr &t) { return cudf::data_type{type_id::FLOAT64}; case Type::STRING: return cudf::data_type{type_id::STRING}; + case Type::LARGE_STRING: + return cudf::data_type{type_id::STRING}; case Type::TIMESTAMP: { auto unit = diff --git a/bodo/pandas/physical/gpu_join.h b/bodo/pandas/physical/gpu_join.h index 2018ef9f0a..1e0bc0a339 100644 --- a/bodo/pandas/physical/gpu_join.h +++ b/bodo/pandas/physical/gpu_join.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -157,10 +158,6 @@ class PhysicalGPUJoin : public PhysicalGPUProcessBatch, public PhysicalGPUSink { probe_kept_cols.push_back(idx); } - this->cuda_join = std::make_unique( - build_keys, probe_keys, build_table_schema, probe_table_schema, - build_kept_cols, probe_kept_cols, cudf::null_equality::EQUAL); - this->output_schema = std::make_shared(); for (const auto& kept_col : probe_kept_cols) { this->output_schema->column_types.push_back( @@ -182,6 +179,11 @@ class PhysicalGPUJoin : public PhysicalGPUProcessBatch, public PhysicalGPUSink { std::vector({}), std::vector({})); this->arrow_schema = this->output_schema->ToArrowSchema(); + this->cuda_join = std::make_unique( + build_keys, probe_keys, build_table_schema, probe_table_schema, + build_kept_cols, probe_kept_cols, output_schema, + cudf::null_equality::UNEQUAL); + assert(this->output_schema->ncols() == logical_join.GetColumnBindings().size()); } @@ -202,7 +204,14 @@ class PhysicalGPUJoin : public PhysicalGPUProcessBatch, public PhysicalGPUSink { GPU_DATA input_batch, OperatorResult prev_op_result, std::shared_ptr se) override { cuda_join->BuildConsumeBatch(input_batch.table); - return prev_op_result == OperatorResult::FINISHED + if (prev_op_result == OperatorResult::FINISHED) { + // If we are finished consuming input but the shuffle is not + // complete, we need to wait for the shuffle to complete before we + // can be finished + cuda_join->build_shuffle_manager.complete(); + } + return prev_op_result == OperatorResult::FINISHED && + cuda_join->build_shuffle_manager.all_complete() ? OperatorResult::FINISHED : OperatorResult::NEED_MORE_INPUT; } @@ -220,7 +229,20 @@ class PhysicalGPUJoin : public PhysicalGPUProcessBatch, public PhysicalGPUSink { cuda_join->ProbeProcessBatch(input_batch.table); GPU_DATA output_gpu_data = {std::move(output_table), this->arrow_schema, se}; - return {output_gpu_data, prev_op_result}; + + bool local_finished = prev_op_result == OperatorResult::FINISHED; + if (local_finished) { + // If we are finished consuming input but the shuffle is not + // complete, we need to wait for the shuffle to complete before we + // can be finished + cuda_join->probe_shuffle_manager.complete(); + } + + return { + output_gpu_data, + local_finished && cuda_join->probe_shuffle_manager.all_complete() + ? OperatorResult::FINISHED + : OperatorResult::NEED_MORE_INPUT}; } /** diff --git a/bodo/pandas/physical/gpu_read_parquet.h b/bodo/pandas/physical/gpu_read_parquet.h index a1008f4b20..77e52f744d 100644 --- a/bodo/pandas/physical/gpu_read_parquet.h +++ b/bodo/pandas/physical/gpu_read_parquet.h @@ -379,20 +379,6 @@ class PhysicalGPUReadParquet : public PhysicalGPUSource { filter_exprs(filter_exprs.Copy()) { time_pt start_init = start_timer(); - std::map old_to_new_column_map; - // Generate map of original column indices to selected column indices. - for (size_t i = 0; i < selected_columns.size(); ++i) { - old_to_new_column_map.insert({selected_columns[i], i}); - } - - this->filter_exprs = join_filter_col_stats.insert_filters( - std::move(this->filter_exprs), this->selected_columns); - - if (filter_exprs.filters.size() != 0) { - cudfExprTree = - tableFilterSetToCudf(filter_exprs, old_to_new_column_map); - } - if (py_path && PyUnicode_Check(py_path)) { path = PyUnicode_AsUTF8(py_path); } else { @@ -488,6 +474,9 @@ class PhysicalGPUReadParquet : public PhysicalGPUSource { init_batch_gen(); this->metrics.init_time += end_timer(start_init); } + if (!filters_initialized) { + init_filter_exprs(); + } time_pt start_produce = start_timer(); @@ -500,7 +489,6 @@ class PhysicalGPUReadParquet : public PhysicalGPUSource { auto result = next_batch_tup.second ? OperatorResult::FINISHED : OperatorResult::HAVE_MORE_OUTPUT; - std::pair ret = std::make_pair( GPU_DATA(std::move(next_batch_tup.first), arrow_schema, se), result); @@ -528,6 +516,8 @@ class PhysicalGPUReadParquet : public PhysicalGPUSource { std::shared_ptr batch_gen; std::shared_ptr arrow_schema; + bool filters_initialized = false; + // Communicator for GPU ranks (for part assignments) MPI_Comm comm; @@ -542,4 +532,19 @@ class PhysicalGPUReadParquet : public PhysicalGPUSource { batch_gen = std::make_shared( path, batch_size, output_schema->column_names, arrow_schema, comm); } + void init_filter_exprs() { + std::map old_to_new_column_map; + // Generate map of original column indices to selected column indices. + for (size_t i = 0; i < selected_columns.size(); ++i) { + old_to_new_column_map.insert({selected_columns[i], i}); + } + this->filter_exprs = join_filter_col_stats.insert_filters( + std::move(this->filter_exprs), this->selected_columns); + + if (this->filter_exprs->filters.size() != 0) { + cudfExprTree = + tableFilterSetToCudf(*filter_exprs, old_to_new_column_map); + } + this->filters_initialized = true; + } }; diff --git a/bodo/pandas/physical/gpu_write_parquet.h b/bodo/pandas/physical/gpu_write_parquet.h index cec0c31ddb..a3c7a77cc1 100644 --- a/bodo/pandas/physical/gpu_write_parquet.h +++ b/bodo/pandas/physical/gpu_write_parquet.h @@ -214,8 +214,12 @@ class PhysicalGPUWriteParquet : public PhysicalGPUSink { if (should_flush && buffer_table && buffer_rows > 0) { std::string fname_prefix = get_fname_prefix(iter); - std::string out_path = - (fs::path(path) / (fname_prefix + ".parquet")).string(); + int myrank, num_ranks; + MPI_Comm_rank(MPI_COMM_WORLD, &myrank); + MPI_Comm_size(MPI_COMM_WORLD, &num_ranks); + std::string fname = gen_pieces_file_name(myrank, num_ranks, + fname_prefix, ".parquet"); + std::string out_path = (fs::path(path) / fname).string(); cudf::table_view bttv = buffer_table->view(); cudf::io::table_input_metadata meta{bttv}; diff --git a/bodo/tests/test_streaming/test_gpu_shuffle.cpp b/bodo/tests/test_streaming/test_gpu_shuffle.cpp index 4fb9313fb7..d390ea0461 100644 --- a/bodo/tests/test_streaming/test_gpu_shuffle.cpp +++ b/bodo/tests/test_streaming/test_gpu_shuffle.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -35,6 +36,9 @@ static bodo::tests::suite tests([] { // Ensure we have a GPU context for this rank // Note: In a real test runner, this might be handled by a fixture rmm::cuda_device_id device_id = get_gpu_id(); + if (device_id.value() >= 0) { + cudaSetDevice(device_id.value()); + } try { GpuShuffleManager manager; @@ -46,7 +50,7 @@ static bodo::tests::suite tests([] { bodo::tests::check(manager.get_mpi_comm() == MPI_COMM_NULL); } else { // Should be empty on init - bodo::tests::check(manager.inflight_exists() == false); + bodo::tests::check(manager.all_complete() == false); // Check communicators exist bodo::tests::check(manager.get_nccl_comm() != nullptr); @@ -105,7 +109,7 @@ static bodo::tests::suite tests([] { MPI_Comm_size(MPI_COMM_WORLD, &n_ranks); rmm::cuda_device_id device_id = get_gpu_id(); - if (device_id.value() > 0) { + if (device_id.value() >= 0) { cudaSetDevice(device_id.value()); } @@ -123,25 +127,18 @@ static bodo::tests::suite tests([] { // Shuffle based on column 0 manager.shuffle_table(input_ptr, {0}); - - bodo::tests::check(manager.inflight_exists() == - (device_id.value() >= 0)); + manager.complete(); std::vector> received_tables; // Pump the progress loop - bool done = false; - while (!done) { + while (!manager.all_complete()) { auto out_batch = manager.progress(); // Move received tables into our accumulator for (auto& t : out_batch) { - if (t) + if (t) { received_tables.push_back(std::move(t)); - } - - // Check if queue is drained - if (!manager.inflight_exists()) { - done = true; + } } } @@ -192,26 +189,18 @@ static bodo::tests::suite tests([] { // Shuffle based on column 0 manager.shuffle_table(input_ptr, {0}); - - // Verify inflight status matches GPU presence - bodo::tests::check(manager.inflight_exists() == - (device_id.value() >= 0)); + manager.complete(); std::vector> received_tables; // Pump the progress loop - bool done = false; - while (!done) { + while (!manager.all_complete()) { auto out_batch = manager.progress(); // Move received tables into our accumulator for (auto& t : out_batch) { - if (t) + if (t) { received_tables.push_back(std::move(t)); - } - - // Check if queue is drained - if (!manager.inflight_exists()) { - done = true; + } } } diff --git a/pixi.toml b/pixi.toml index 293c5ab863..7122eff929 100644 --- a/pixi.toml +++ b/pixi.toml @@ -71,6 +71,35 @@ depends-on = [ { task = "build-bodo-cudf-in-default-cuda", environment = "default-cuda" } ] +[tasks.configure-bodo-cudf-debug-in-default-cuda] +cmd = "pip install --no-deps --no-build-isolation -ve . -Cbuild-dir=build/cudf-debug -Ccmake.define.USE_CUDF=ON -Ccmake.build-type='Debug' -Cinstall.strip=false" +inputs = ["CMakeLists.txt", "pyproject.toml"] +outputs = ["build/cudf-debug/CMakeCache.txt"] +[tasks.configure-bodo-cudf-debug-in-default-cuda.env] +SCCACHE_BUCKET = "engine-codebuild-cache" +SCCACHE_REGION = "us-east-2" +SCCACHE_S3_USE_SSL = "true" +SCCACHE_S3_SERVER_SIDE_ENCRYPTION = "true" + +[tasks.build-bodo-cudf-debug-in-default-cuda] +cmd = """ +cmake --build build/cudf-debug -j && \ +cmake --install build/cudf-debug --prefix "$(python -c "import sysconfig; print(sysconfig.get_path('purelib'))")" && \ +cmake -E copy build/cudf-debug/compile_commands.json compile_commands.json +""" +inputs = ["bodo/**/*.cpp", "bodo/**/*.h", "bodo/**/*.hpp", "bodo/**/*.pyx", "bodo/**/*.pxd"] +depends-on = ["configure-bodo-cudf-debug-in-default-cuda"] +[tasks.build-bodo-cudf-debug-in-default-cuda.env] +SCCACHE_BUCKET = "engine-codebuild-cache" +SCCACHE_REGION = "us-east-2" +SCCACHE_S3_USE_SSL = "true" +SCCACHE_S3_SERVER_SIDE_ENCRYPTION = "true" + +[tasks.build-bodo-cudf-debug] +depends-on = [ + { task = "build-bodo-cudf-in-default-cuda", environment = "default-cuda" } +] + [tasks.configure-bodo-cudf-san-in-default-cuda] cmd = "pip install --no-deps --no-build-isolation -ve . -Cbuild-dir=build/cudf-san -Ccmake.define.USE_CUDF=ON -Ccmake.build-type='DebugSanitize' -Cinstall.strip=false"