Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Current develop

### Added (new features/APIs/variables/...)
- [[PR 1271]](https://github.com/parthenon-hpc-lab/parthenon/pull/1271) Add option of specifying number of teams used to fill each boundary buffer
- [[PR 1258]](https://github.com/parthenon-hpc-lab/parthenon/pull/1258) Add "corehdf" version of hdf5 output that dumps everything
- [[PR 1162]](https://github.com/parthenon-hpc-lab/parthenon/pull/1162) Add dev container (e.g., GitHub Codepsacer or VSCode)

Expand Down
159 changes: 101 additions & 58 deletions src/bvals/comms/boundary_communication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

#include "tasks/tasks.hpp"
#include "utils/error_checking.hpp"
#include "utils/indexer.hpp"
#include "utils/loop_utils.hpp"

namespace parthenon {
Expand All @@ -53,8 +54,7 @@ TaskStatus SendBoundBufs(std::shared_ptr<MeshData<Real>> &md) {
auto &cache = md->GetBvarsCache().GetSubCache(bound_type, true);

if (cache.buf_vec.size() == 0)
InitializeBufferCache<bound_type>(md, &(pmesh->boundary_comm_map), &cache, SendKey,
true);
InitializeBufferCache<bound_type>(md, &(pmesh->boundary_comm_map), &cache, SendKey);

auto [rebuild, nbound, other_communication_unfinished] =
CheckSendBufferCacheForRebuild<bound_type, true>(md);
Expand Down Expand Up @@ -89,18 +89,28 @@ TaskStatus SendBoundBufs(std::shared_ptr<MeshData<Real>> &md) {
// Load buffer data
auto &bnd_info = cache.bnd_info;
PARTHENON_DEBUG_REQUIRE(bnd_info.size() == nbound, "Need same size for boundary info");
const int nteams_per_buffer = pmesh->nteams_per_boundary_buffer;
const int work_chunk_size = pmesh->boundary_buffer_work_chunk_size;
auto &sending_nonzero_flags = cache.sending_non_zero_flags;
auto &sending_nonzero_flags_h = cache.sending_non_zero_flags_h;
if (sending_nonzero_flags.size() != (nbound * nteams_per_buffer)) {
sending_nonzero_flags =
ParArray1D<bool>("sending_nonzero_flags", nbound * nteams_per_buffer);
sending_nonzero_flags_h = Kokkos::create_mirror_view(sending_nonzero_flags);
}

Kokkos::parallel_for(
PARTHENON_AUTO_LABEL,
Kokkos::TeamPolicy<>(parthenon::DevExecSpace(), nbound, Kokkos::AUTO),
Kokkos::TeamPolicy<>(parthenon::DevExecSpace(), nbound * nteams_per_buffer,
Kokkos::AUTO),
KOKKOS_LAMBDA(parthenon::team_mbr_t team_member) {
const int b = team_member.league_rank();
const int b = team_member.league_rank() / nteams_per_buffer;
const int bteam = team_member.league_rank() % nteams_per_buffer;
const int iflag = team_member.league_rank();

if (!bnd_info(b).allocated || bnd_info(b).same_to_same) {
Kokkos::single(Kokkos::PerTeam(team_member),
[&]() { sending_nonzero_flags(b) = false; });
[&]() { sending_nonzero_flags(iflag) = false; });
return;
}
Real threshold = bnd_info(b).var.allocation_threshold;
Expand All @@ -110,8 +120,17 @@ TaskStatus SendBoundBufs(std::shared_ptr<MeshData<Real>> &md) {
auto &idxer = bnd_info(b).idxer[it];
const int iel = static_cast<int>(bnd_info(b).topo_idx[it]) % 3;
const int Ni = idxer.template EndIdx<5>() - idxer.template StartIdx<5>() + 1;
const int n_units = idxer.size() / Ni;
SplitFlatIndexRangeAmongTeams split(nteams_per_buffer, work_chunk_size,
n_units);
const auto [start, end] = split.GetIdxRange(bteam);
if (start >= end) {
idx_offset += idxer.size();
continue;
}
// TODO(LFR): Finish threading index splitting through reductions
Kokkos::parallel_reduce(
Kokkos::TeamThreadRange<>(team_member, idxer.size() / Ni),
Kokkos::TeamThreadRange<>(team_member, start, end),
[&](const int idx, bool &lnon_zero) {
const auto [t, u, v, k, j, i] = idxer(idx * Ni);
Real *var = &bnd_info(b).var(iel, t, u, v, k, j, i);
Expand All @@ -135,7 +154,7 @@ TaskStatus SendBoundBufs(std::shared_ptr<MeshData<Real>> &md) {
idx_offset += idxer.size();
}
Kokkos::single(Kokkos::PerTeam(team_member), [&]() {
sending_nonzero_flags(b) = non_zero[0] || non_zero[1] || non_zero[2];
sending_nonzero_flags(iflag) = non_zero[0] || non_zero[1] || non_zero[2];
});
});

Expand All @@ -149,10 +168,20 @@ TaskStatus SendBoundBufs(std::shared_ptr<MeshData<Real>> &md) {

for (int ibuf = 0; ibuf < cache.buf_vec.size(); ++ibuf) {
auto &buf = *cache.buf_vec[ibuf];
if (sending_nonzero_flags_h(ibuf) || !Globals::sparse_config.enabled)
if (!Globals::sparse_config.enabled) {
buf.Send();
else
buf.SendNull();
} else {
// Reduce flags over all of the teams that contributed to filling a given buffer
bool sending_nonz{false};
for (int i = ibuf * nteams_per_buffer; i < (ibuf + 1) * nteams_per_buffer; ++i)
sending_nonz = sending_nonz || sending_nonzero_flags_h(i);

if (sending_nonz) {
buf.Send();
} else {
buf.SendNull();
}
}
}

return TaskStatus::complete;
Expand All @@ -175,8 +204,8 @@ TaskStatus StartReceiveBoundBufs(std::shared_ptr<MeshData<Real>> &md) {
Mesh *pmesh = md->GetMeshPointer();
auto &cache = md->GetBvarsCache().GetSubCache(bound_type, false);
if (cache.buf_vec.size() == 0)
InitializeBufferCache<bound_type>(md, &(pmesh->boundary_comm_map), &cache, ReceiveKey,
false);
InitializeBufferCache<bound_type>(md, &(pmesh->boundary_comm_map), &cache,
ReceiveKey);

std::for_each(std::begin(cache.buf_vec), std::end(cache.buf_vec),
[](auto pbuf) { pbuf->TryStartReceive(); });
Expand Down Expand Up @@ -204,8 +233,8 @@ TaskStatus ReceiveBoundBufs(std::shared_ptr<MeshData<Real>> &md) {
Mesh *pmesh = md->GetMeshPointer();
auto &cache = md->GetBvarsCache().GetSubCache(bound_type, false);
if (cache.buf_vec.size() == 0)
InitializeBufferCache<bound_type>(md, &(pmesh->boundary_comm_map), &cache, ReceiveKey,
false);
InitializeBufferCache<bound_type>(md, &(pmesh->boundary_comm_map), &cache,
ReceiveKey);

bool all_received = true;
std::for_each(
Expand Down Expand Up @@ -270,11 +299,16 @@ TaskStatus SetBounds(std::shared_ptr<MeshData<Real>> &md) {
}
// const Real threshold = Globals::sparse_config.allocation_threshold;
auto &bnd_info = cache.bnd_info;
const int nteams_per_buffer = pmesh->nteams_per_boundary_buffer;
const int work_chunk_size = pmesh->boundary_buffer_work_chunk_size;

Kokkos::parallel_for(
PARTHENON_AUTO_LABEL,
Kokkos::TeamPolicy<>(parthenon::DevExecSpace(), nbound, Kokkos::AUTO),
Kokkos::TeamPolicy<>(parthenon::DevExecSpace(), nbound * nteams_per_buffer,
Kokkos::AUTO),
KOKKOS_LAMBDA(parthenon::team_mbr_t team_member) {
const int b = team_member.league_rank();
const int b = team_member.league_rank() / nteams_per_buffer;
const int bteam = team_member.league_rank() % nteams_per_buffer;
if (bnd_info(b).same_to_same) return;
int idx_offset = 0;
for (int it = 0; it < bnd_info(b).ntopological_elements; ++it) {
Expand All @@ -286,48 +320,57 @@ TaskStatus SetBounds(std::shared_ptr<MeshData<Real>> &md) {
Real fac = ftemp; // Can't capture structured bindings
const int iel = static_cast<int>(tel) % 3;
const int Ni = idxer.template EndIdx<5>() - idxer.template StartIdx<5>() + 1;
if (bnd_info(b).buf_allocated && bnd_info(b).allocated) {
Kokkos::parallel_for(
Kokkos::TeamThreadRange<>(team_member, idxer.size() / Ni),
[&](const int idx) {
Real *buf = &bnd_info(b).buf(idx * Ni + idx_offset);
const auto [t, u, v, k, j, i] = idxer(idx * Ni);
// Have to do this because of some weird issue about structure bindings
// being captured
const int tt = t;
const int uu = u;
const int vv = v;
const int kk = k;
const int jj = j;
const int ii = i;
Kokkos::parallel_for(
Kokkos::ThreadVectorRange<>(team_member, Ni), [&](int m) {
const auto [il, jl, kl] =
lcoord_trans.InverseTransform({ii + m, jj, kk});
if (idxer.IsActive(kl, jl, il))
var(iel, tt, uu, vv, kl, jl, il) = fac * buf[m];
});
});
} else if (bnd_info(b).allocated && bound_type != BoundaryType::flxcor_recv) {
const Real default_val = bnd_info(b).var.sparse_default_val;
Kokkos::parallel_for(
Kokkos::TeamThreadRange<>(team_member, idxer.size() / Ni),
[&](const int idx) {
const auto [t, u, v, k, j, i] = idxer(idx * Ni);
const int tt = t;
const int uu = u;
const int vv = v;
const int kk = k;
const int jj = j;
const int ii = i;
Kokkos::parallel_for(
Kokkos::ThreadVectorRange<>(team_member, Ni), [&](int m) {
const auto [il, jl, kl] =
lcoord_trans.InverseTransform({ii + m, jj, kk});
if (idxer.IsActive(kl, jl, il))
var(iel, tt, uu, vv, kl, jl, il) = default_val;
});
});
if (bnd_info(b).allocated) {
const int n_units = idxer.size() / Ni;
SplitFlatIndexRangeAmongTeams split(nteams_per_buffer, work_chunk_size,
n_units);
const auto [start, end] = split.GetIdxRange(bteam);
if (start >= end) {
idx_offset += idxer.size();
continue;
}

if (bnd_info(b).buf_allocated) {
Kokkos::parallel_for(
Kokkos::TeamThreadRange<>(team_member, start, end), [&](const int idx) {
Real *buf = &bnd_info(b).buf(idx * Ni + idx_offset);
const auto [t, u, v, k, j, i] = idxer(idx * Ni);
// Have to do this because of some weird issue about structure
// bindings being captured
const int tt = t;
const int uu = u;
const int vv = v;
const int kk = k;
const int jj = j;
const int ii = i;
Kokkos::parallel_for(
Kokkos::ThreadVectorRange<>(team_member, Ni), [&](int m) {
const auto [il, jl, kl] =
lcoord_trans.InverseTransform({ii + m, jj, kk});
if (idxer.IsActive(kl, jl, il))
var(iel, tt, uu, vv, kl, jl, il) = fac * buf[m];
});
});
} else if (bound_type != BoundaryType::flxcor_recv) {
const Real default_val = bnd_info(b).var.sparse_default_val;
Kokkos::parallel_for(
Kokkos::TeamThreadRange<>(team_member, start, end), [&](const int idx) {
const auto [t, u, v, k, j, i] = idxer(idx * Ni);
const int tt = t;
const int uu = u;
const int vv = v;
const int kk = k;
const int jj = j;
const int ii = i;
Kokkos::parallel_for(
Kokkos::ThreadVectorRange<>(team_member, Ni), [&](int m) {
const auto [il, jl, kl] =
lcoord_trans.InverseTransform({ii + m, jj, kk});
if (idxer.IsActive(kl, jl, il))
var(iel, tt, uu, vv, kl, jl, il) = default_val;
});
});
}
}
idx_offset += idxer.size();
}
Expand Down
11 changes: 1 addition & 10 deletions src/bvals/comms/bvals_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ inline Mesh::channel_key_t ReceiveKey(const MeshBlock *pmb, const NeighborBlock
// it (LFR).
template <BoundaryType bound_type, class COMM_MAP, class F>
void InitializeBufferCache(std::shared_ptr<MeshData<Real>> &md, COMM_MAP *comm_map,
BvarsSubCache_t *pcache, F KeyFunc, bool initialize_flags) {
BvarsSubCache_t *pcache, F KeyFunc) {
using namespace loops;
using namespace loops::shorthands;
Mesh *pmesh = md->GetMeshPointer();
Expand Down Expand Up @@ -126,15 +126,6 @@ void InitializeBufferCache(std::shared_ptr<MeshData<Real>> &md, COMM_MAP *comm_m
pcache->buf_vec.push_back(&((*comm_map)[std::get<2>(t)]));
(pcache->idx_vec)[std::get<1>(t)] = buff_idx++;
});

const int nbound = pcache->buf_vec.size();
if (initialize_flags && nbound > 0) {
if (nbound != pcache->sending_non_zero_flags.size()) {
pcache->sending_non_zero_flags = ParArray1D<bool>("sending_nonzero_flags", nbound);
pcache->sending_non_zero_flags_h =
Kokkos::create_mirror_view(pcache->sending_non_zero_flags);
}
}
}

template <BoundaryType BOUND_TYPE, bool SENDER>
Expand Down
4 changes: 4 additions & 0 deletions src/mesh/mesh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ Mesh::Mesh(ParameterInput *pin, ApplicationInput *app_in, Packages_t &packages,
nbnew(), nbdel(), step_since_lb(), gflag(), packages(packages),
resolved_packages(ResolvePackages(packages)),
default_pack_size_(pin->GetOrAddInteger("parthenon/mesh", "pack_size", -1)),
nteams_per_boundary_buffer(
pin->GetOrAddInteger("parthenon/mesh", "nteams_per_boundary_buffer", 1)),
boundary_buffer_work_chunk_size(
pin->GetOrAddInteger("parthenon/mesh", "boundary_buffer_work_chunk_size", 1)),
// private members:
num_mesh_threads_(pin->GetOrAddInteger("parthenon/mesh", "num_threads", 1)),
use_uniform_meshgen_fn_{true, true, true, true}, lb_flag_(true), lb_automatic_(),
Expand Down
2 changes: 2 additions & 0 deletions src/mesh/mesh.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ class Mesh {
using comm_buf_map_t = std::unordered_map<channel_key_t, comm_buf_t>;
comm_buf_map_t boundary_comm_map;
TagMap tag_map;
int nteams_per_boundary_buffer;
int boundary_buffer_work_chunk_size;

#ifdef MPI_PARALLEL
MPI_Comm GetMPIComm(const std::string &label) const { return mpi_comm_map_.at(label); }
Expand Down
29 changes: 29 additions & 0 deletions src/utils/indexer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#ifndef UTILS_INDEXER_HPP_
#define UTILS_INDEXER_HPP_

#include <algorithm>
#include <array>
#include <string>
#include <tuple>
Expand Down Expand Up @@ -207,5 +208,33 @@ using Indexer8D = Indexer<int, int, int, int, int, int, int, int>;

using SpatiallyMaskedIndexer6D = SpatiallyMaskedIndexer<int, int, int, int, int, int>;

class SplitFlatIndexRangeAmongTeams {
public:
SplitFlatIndexRangeAmongTeams(int nteams, int work_chunk_size, int total_work)
: nteams(nteams), work_chunk_size(work_chunk_size), total_work(total_work) {
n_work_units_tot =
total_work / work_chunk_size + ((total_work % work_chunk_size) > 0);
n_work_per_team = n_work_units_tot / nteams;
n_extra_work_tot = n_work_units_tot % nteams;
}

auto GetIdxRange(int team) {
int start =
(team * n_work_per_team + std::min(team, n_extra_work_tot)) * work_chunk_size;
int end = ((team + 1) * n_work_per_team + std::min(team + 1, n_extra_work_tot)) *
work_chunk_size;

return std::make_pair(std::min(start, total_work), std::min(end, total_work));
}

private:
int nteams;
int work_chunk_size;
int total_work;
int n_work_units_tot;
int n_work_per_team;
int n_extra_work_tot;
};

} // namespace parthenon
#endif // UTILS_INDEXER_HPP_
Loading