Skip to content

Commit 3ea78ea

Browse files
Build with OpenMPI on Mac (#1010)
* Use OpenMPI for Mac builds to improve initialization times. * Upgrade mpi4py dependency and remove vendored mpi4py for pip * Use MPI_Scatterv/MPI_Gatherv for OpenMPI builds (large count APIs aren't supported yet)
1 parent 76cbdbd commit 3ea78ea

File tree

15 files changed

+610
-465
lines changed

15 files changed

+610
-465
lines changed

.github/workflows/_build_bodo_pip.yml

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,6 @@ jobs:
140140
SCCACHE_REGION=us-east-2
141141
SCCACHE_S3_USE_SSL=true
142142
SCCACHE_S3_SERVER_SIDE_ENCRYPTION=true
143-
BODO_VENDOR_MPI4PY=1
144143
BODO_WINDOWS_BUILD_TYPE="Release"
145144
SETUPTOOLS_SCM_PRETEND_VERSION=${{ inputs.bodo_version }}
146145
CIBW_ENVIRONMENT_MACOS: >
@@ -150,8 +149,6 @@ jobs:
150149
SCCACHE_S3_USE_SSL=true
151150
SCCACHE_S3_SERVER_SIDE_ENCRYPTION=true
152151
MACOSX_DEPLOYMENT_TARGET=${{ inputs.name == 'macos-arm' && '12.0' || '11.0' }}
153-
BODO_VENDOR_MPICH=1
154-
BODO_VENDOR_MPI4PY=1
155152
PATH=$HOME/.pixi/bin:$PATH
156153
CONDA_PREFIX=$(pwd)/.pixi/envs/pip-cpp-macos-win
157154
SETUPTOOLS_SCM_PRETEND_VERSION=${{ inputs.bodo_version }}
@@ -164,7 +161,6 @@ jobs:
164161
SCCACHE_S3_USE_SSL=true
165162
SCCACHE_S3_SERVER_SIDE_ENCRYPTION=true
166163
BODO_VENDOR_MPICH=1
167-
BODO_VENDOR_MPI4PY=1
168164
CONDA_PREFIX=""
169165
SETUPTOOLS_SCM_PRETEND_VERSION=${{ inputs.bodo_version }}
170166
NO_HDF5=1
@@ -177,18 +173,15 @@ jobs:
177173
sccache --show-stats &&
178174
delvewheel repair --exclude impi.dll --exclude msmpi.dll
179175
--exclude arrow.dll --exclude arrow_acero.dll --exclude arrow_dataset.dll --exclude arrow_python.dll
180-
--exclude parquet.dll -v -w {dest_dir} {wheel} &&
181-
python buildscripts/bodo/pip/windows/patch_bodo_for_pip.py -p {dest_dir}
176+
--exclude parquet.dll -v -w {dest_dir} {wheel}
182177
CIBW_REPAIR_WHEEL_COMMAND_MACOS: >
183178
sccache --show-stats &&
184179
delocate-wheel --ignore-missing-dependencies --sanitize-rpaths
185180
-e libmpi -e libpmpi
186181
-e libarrow -e libarrow_acero -e libarrow_dataset -e libarrow_flight
187182
-e libarrow_python -e libarrow_python_flight -e libarrow_python_parquet_encryption
188183
-e libarrow_substrait -e libparquet
189-
--require-archs {delocate_archs} -v {wheel} &&
190-
python buildscripts/bodo/pip/macos/patch_libs_for_pip.py -p {wheel} &&
191-
mv {wheel} {dest_dir}
184+
--require-archs {delocate_archs} -v {wheel} && mv {wheel} {dest_dir}
192185
CIBW_REPAIR_WHEEL_COMMAND_LINUX: >
193186
sccache --show-stats &&
194187
auditwheel -v repair

.github/workflows/pr_ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ on:
99
options:
1010
- ubuntu-latest
1111
- windows-latest
12+
- macos-latest
1213

1314
# Limit CI to cancel previous runs in the same PR
1415
concurrency:

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ python_add_library(csv_json_reader
398398
)
399399
target_include_directories(csv_json_reader PRIVATE ${BASE_INCLUDE_DIRS} "${CMAKE_CURRENT_SOURCE_DIR}/bodo/io/" "${CMAKE_CURRENT_BINARY_DIR}/bodo/io/")
400400
target_link_directories(csv_json_reader PRIVATE ${PYARROW_LIBRARY_DIRS} ${CONDA_LIB_DIR} ${MPI_LIB_DIR})
401-
target_link_libraries(csv_json_reader PRIVATE arrow arrow_python ${MPI_LIBRARIES})
401+
target_link_libraries(csv_json_reader PRIVATE fmt::fmt arrow arrow_python ${MPI_LIBRARIES})
402402
install(TARGETS csv_json_reader DESTINATION "bodo/io/")
403403

404404
# ------------------------ Cython Target - bodo.io.pyarrow_wrappers ---------------

bodo/io/_fs_io.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -532,8 +532,8 @@ void parallel_in_order_write(
532532
int64_t buff_size = count * elem_size;
533533
MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
534534
MPI_Comm_size(MPI_COMM_WORLD, &num_ranks);
535-
CHECK_MPI(MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN),
536-
"parallel_in_order_write: MPI error on MPI_Errhandler_set:");
535+
CHECK_MPI(MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN),
536+
"parallel_in_order_write: MPI error on MPI_Comm_set_errhandler:");
537537

538538
if (fs_option == Bodo_Fs::s3) {
539539
int size_tag = 1;

bodo/io/_io.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -246,9 +246,10 @@ void file_read_parallel(const char* file_name, char* buff, int64_t start,
246246
delete f_reader;
247247
} else {
248248
// posix
249-
CHECK_MPI(MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN),
250-
"_io.cpp::file_read_parallel: MPI error on "
251-
"MPI_Errhandler_set:");
249+
CHECK_MPI(
250+
MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN),
251+
"_io.cpp::file_read_parallel: MPI error on "
252+
"MPI_Comm_set_errhandler:");
252253

253254
MPI_File fh;
254255
CHECK_MPI(
@@ -345,9 +346,9 @@ void file_write_parallel(const char* file_name, char* buff, int64_t start,
345346
char err_string[MPI_MAX_ERROR_STRING];
346347
err_string[MPI_MAX_ERROR_STRING - 1] = '\0';
347348
int err_len, err_class;
348-
CHECK_MPI(
349-
MPI_Errhandler_set(MPI_COMM_WORLD, MPI_ERRORS_RETURN),
350-
"_io.cpp::file_write_parallel: MPI error on MPI_Errhandler_set:");
349+
CHECK_MPI(MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN),
350+
"_io.cpp::file_write_parallel: MPI error on "
351+
"MPI_Comm_set_errhandler:");
351352

352353
int ierr;
353354
bool throw_error = false;

bodo/libs/_distributed.cpp

Lines changed: 51 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -143,24 +143,6 @@ int MPI_Gengather(void *sendbuf, int sendcount, MPI_Datatype sendtype,
143143
}
144144
}
145145

146-
int MPI_Gengatherv(const void *sendbuf, int64_t sendcount,
147-
MPI_Datatype sendtype, void *recvbuf,
148-
const int64_t *recvcounts, const int64_t *displs,
149-
MPI_Datatype recvtype, int root_pe, MPI_Comm comm,
150-
bool all_gather) {
151-
const MPI_Aint *mpi_displs = reinterpret_cast<const MPI_Aint *>(displs);
152-
const MPI_Count *mpi_recvcounts =
153-
reinterpret_cast<const MPI_Count *>(recvcounts);
154-
if (all_gather) {
155-
return MPI_Allgatherv_c(sendbuf, sendcount, sendtype, recvbuf,
156-
mpi_recvcounts, mpi_displs, recvtype, comm);
157-
} else {
158-
return MPI_Gatherv_c(sendbuf, sendcount, sendtype, recvbuf,
159-
mpi_recvcounts, mpi_displs, recvtype, root_pe,
160-
comm);
161-
}
162-
}
163-
164146
std::shared_ptr<array_info> gather_array(std::shared_ptr<array_info> in_arr,
165147
bool all_gather, bool is_parallel,
166148
int mpi_root, int n_pes, int myrank,
@@ -575,14 +557,15 @@ std::shared_ptr<array_info> scatter_array(
575557
is_sender = (mpi_root == MPI_ROOT);
576558
if (is_sender) {
577559
CHECK_MPI(MPI_Comm_remote_size(*comm_ptr, &n_pes),
578-
"scatter_array: MPI error on MPI_Comm_remote_size:");
560+
"_distributed.cpp::scatter_array: MPI error on "
561+
"MPI_Comm_remote_size:");
579562
}
580563
}
581564

582565
// Broadcast length
583566
int64_t n_rows = in_arr->length;
584567
CHECK_MPI(MPI_Bcast(&n_rows, 1, MPI_INT64_T, mpi_root, comm),
585-
"_distributed.h::c_bcast: MPI error on MPI_Bcast:");
568+
"_distributed.cpp::scatter_array: MPI error on MPI_Bcast:");
586569

587570
Bodo_CTypes::CTypeEnum dtype = in_arr->dtype;
588571
bodo_array_type::arr_type_enum arr_type = in_arr->arr_type;
@@ -635,23 +618,22 @@ std::shared_ptr<array_info> scatter_array(
635618
out_arr = alloc_array_top_level(n_loc, -1, -1, arr_type, dtype, -1,
636619
0, num_categories);
637620

638-
CHECK_MPI(
639-
MPI_Scatterv_c(send_ptr, send_count_bytes.data(),
640-
send_disp_bytes.data(), mpi_typ,
641-
out_arr->data1(), n_recv_bytes, mpi_typ,
642-
mpi_root, comm),
643-
"_distributed.cpp::c_scatterv: MPI error on MPI_Scatterv:");
621+
CHECK_MPI(MPI_Genscatterv(send_ptr, send_count_bytes.data(),
622+
send_disp_bytes.data(), out_arr->data1(),
623+
n_recv_bytes, mpi_typ, mpi_root, comm),
624+
"_distributed.cpp::scatter_array: MPI error on "
625+
"MPI_Genscatterv:");
644626

645627
} else {
646628
MPI_Datatype mpi_typ = get_MPI_typ(dtype);
647629
out_arr = alloc_array_top_level(n_loc, -1, -1, arr_type, dtype, -1,
648630
0, num_categories);
649631
char *data1_ptr = out_arr->data1();
650-
CHECK_MPI(
651-
MPI_Scatterv_c(in_arr->data1(), send_counts.data(),
652-
rows_disps.data(), mpi_typ, data1_ptr, n_loc,
653-
mpi_typ, mpi_root, comm),
654-
"_distributed.cpp::c_scatterv: MPI error on MPI_Scatterv:");
632+
CHECK_MPI(MPI_Genscatterv(in_arr->data1(), send_counts.data(),
633+
rows_disps.data(), data1_ptr, n_loc,
634+
mpi_typ, mpi_root, comm),
635+
"_distributed.cpp::scatter_array: MPI error on "
636+
"MPI_Genscatterv:");
655637
}
656638
// Set scale and precision for decimal type
657639
if (dtype == Bodo_CTypes::DECIMAL) {
@@ -664,14 +646,16 @@ std::shared_ptr<array_info> scatter_array(
664646
num_categories);
665647
char *data1_ptr = out_arr->data1();
666648
char *data2_ptr = out_arr->data2();
667-
CHECK_MPI(MPI_Scatterv_c(in_arr->data1(), send_counts.data(),
668-
rows_disps.data(), mpi_typ, data1_ptr, n_loc,
669-
mpi_typ, mpi_root, comm),
670-
"_distributed.cpp::c_scatterv: MPI error on MPI_Scatterv:");
671-
CHECK_MPI(MPI_Scatterv_c(in_arr->data2(), send_counts.data(),
672-
rows_disps.data(), mpi_typ, data2_ptr, n_loc,
673-
mpi_typ, mpi_root, comm),
674-
"_distributed.cpp::c_scatterv: MPI error on MPI_Scatterv:");
649+
CHECK_MPI(
650+
MPI_Genscatterv(in_arr->data1(), send_counts.data(),
651+
rows_disps.data(), data1_ptr, n_loc, mpi_typ,
652+
mpi_root, comm),
653+
"_distributed.cpp::scatter_array: MPI error on MPI_Genscatterv:");
654+
CHECK_MPI(
655+
MPI_Genscatterv(in_arr->data2(), send_counts.data(),
656+
rows_disps.data(), data2_ptr, n_loc, mpi_typ,
657+
mpi_root, comm),
658+
"_distributed.cpp::scatter_array: MPI error on MPI_Genscatterv:");
675659

676660
} else if (arr_type == bodo_array_type::STRING) {
677661
MPI_Datatype mpi_typ32 = get_MPI_typ(Bodo_CTypes::UINT32);
@@ -702,7 +686,7 @@ std::shared_ptr<array_info> scatter_array(
702686
}
703687
CHECK_MPI(MPI_Bcast(send_counts_chars.data(), n_pes, MPI_INT64_T,
704688
mpi_root, comm),
705-
"_distributed.h::c_bcast: MPI error on MPI_Bcast:");
689+
"_distributed.cpp::scatter_array: MPI error on MPI_Bcast:");
706690
calc_disp(rows_disps_chars, send_counts_chars);
707691
int64_t n_loc_chars =
708692
0 ? (is_intercomm && is_sender) : send_counts_chars[myrank];
@@ -713,21 +697,20 @@ std::shared_ptr<array_info> scatter_array(
713697
// Scatter string lengths
714698
std::vector<uint32_t> recv_arr_lens(n_loc);
715699
CHECK_MPI(
716-
MPI_Scatterv_c(send_arr_lens.data(), send_counts.data(),
717-
rows_disps.data(), mpi_typ32, recv_arr_lens.data(),
718-
n_loc, mpi_typ32, mpi_root, comm),
719-
"_distributed.cpp::c_scatterv: MPI error on MPI_Scatterv:");
700+
MPI_Genscatterv(send_arr_lens.data(), send_counts.data(),
701+
rows_disps.data(), recv_arr_lens.data(), n_loc,
702+
mpi_typ32, mpi_root, comm),
703+
"_distributed.cpp::scatter_array: MPI error on MPI_Genscatterv:");
720704
convert_len_arr_to_offset(recv_arr_lens.data(),
721705
(offset_t *)out_arr->data2(),
722706
(size_t)out_arr->length);
723707
recv_arr_lens.clear();
724708

725709
// Scatter string characters
726-
CHECK_MPI(
727-
MPI_Scatterv_c(in_arr->data1(), send_counts_chars.data(),
728-
rows_disps_chars.data(), mpi_typ8, out_arr->data1(),
729-
n_loc_chars, mpi_typ8, mpi_root, comm),
730-
"_distributed.cpp::c_scatterv: MPI error on MPI_Scatterv:");
710+
CHECK_MPI(MPI_Genscatterv(in_arr->data1(), send_counts_chars.data(),
711+
rows_disps_chars.data(), out_arr->data1(),
712+
n_loc_chars, mpi_typ8, mpi_root, comm),
713+
"scatter_array: MPI error on MPI_Genscatterv:");
731714
} else if (arr_type == bodo_array_type::DICT) {
732715
// broadcast the dictionary data (string array)
733716
std::shared_ptr<array_info> dict_arr = in_arr->child_arrays[0];
@@ -748,14 +731,16 @@ std::shared_ptr<array_info> scatter_array(
748731
num_categories);
749732
char *data1_ptr = out_arr->data1();
750733
char *data2_ptr = out_arr->data2();
751-
CHECK_MPI(MPI_Scatterv_c(in_arr->data1(), send_counts.data(),
752-
rows_disps.data(), utc_mpi_typ, data1_ptr,
753-
n_loc, utc_mpi_typ, mpi_root, comm),
754-
"_distributed.cpp::c_scatterv: MPI error on MPI_Scatterv:");
755-
CHECK_MPI(MPI_Scatterv_c(in_arr->data2(), send_counts.data(),
756-
rows_disps.data(), offset_mpi_typ, data2_ptr,
757-
n_loc, offset_mpi_typ, mpi_root, comm),
758-
"_distributed.cpp::c_scatterv: MPI error on MPI_Scatterv:");
734+
CHECK_MPI(
735+
MPI_Genscatterv(in_arr->data1(), send_counts.data(),
736+
rows_disps.data(), data1_ptr, n_loc, utc_mpi_typ,
737+
mpi_root, comm),
738+
"_distributed.cpp::scatter_array: MPI error on MPI_Genscatterv:");
739+
CHECK_MPI(
740+
MPI_Genscatterv(in_arr->data2(), send_counts.data(),
741+
rows_disps.data(), data2_ptr, n_loc, offset_mpi_typ,
742+
mpi_root, comm),
743+
"_distributed.cpp::scatter_array: MPI error on MPI_Genscatterv:");
759744
} else if (arr_type == bodo_array_type::ARRAY_ITEM) {
760745
MPI_Datatype mpi_typ32 = get_MPI_typ(Bodo_CTypes::UINT32);
761746

@@ -784,7 +769,7 @@ std::shared_ptr<array_info> scatter_array(
784769
}
785770
CHECK_MPI(MPI_Bcast(send_counts_items.data(), n_pes, MPI_INT64_T,
786771
mpi_root, comm),
787-
"_distributed.h::c_bcast: MPI error on MPI_Bcast:");
772+
"_distributed.cpp::scatter_array: MPI error on MPI_Bcast:");
788773
calc_disp(rows_disps_items, send_counts_items);
789774

790775
std::shared_ptr<array_info> out_inner =
@@ -796,10 +781,10 @@ std::shared_ptr<array_info> scatter_array(
796781
// Scatter string lengths
797782
std::vector<uint32_t> recv_arr_lens(n_loc);
798783
CHECK_MPI(
799-
MPI_Scatterv_c(send_arr_lens.data(), send_counts.data(),
800-
rows_disps.data(), mpi_typ32, recv_arr_lens.data(),
801-
n_loc, mpi_typ32, mpi_root, comm),
802-
"_distributed.cpp::c_scatterv: MPI error on MPI_Scatterv:");
784+
MPI_Genscatterv(send_arr_lens.data(), send_counts.data(),
785+
rows_disps.data(), recv_arr_lens.data(), n_loc,
786+
mpi_typ32, mpi_root, comm),
787+
"_distributed.cpp::scatter_array: MPI error on MPI_Genscatterv:");
803788
convert_len_arr_to_offset(recv_arr_lens.data(),
804789
(offset_t *)out_arr->data1(),
805790
(size_t)out_arr->length);
@@ -852,10 +837,10 @@ std::shared_ptr<array_info> scatter_array(
852837
}
853838

854839
CHECK_MPI(
855-
MPI_Scatterv_c(send_ptr, send_count_bytes.data(),
856-
send_disp_bytes.data(), mpi_typ, null_bitmask_o,
857-
n_recv_bytes, mpi_typ, mpi_root, comm),
858-
"_distributed.cpp::c_scatterv: MPI error on MPI_Scatterv:");
840+
MPI_Genscatterv(send_ptr, send_count_bytes.data(),
841+
send_disp_bytes.data(), null_bitmask_o,
842+
n_recv_bytes, mpi_typ, mpi_root, comm),
843+
"_distributed.cpp::scatter_array: MPI error on MPI_Genscatterv:");
859844
}
860845

861846
return out_arr;

0 commit comments

Comments
 (0)