Skip to content

Commit 29c587b

Browse files
Fixes for BP5 engine (#1215)
* Enable Span-based API in BP5 engine * Bp5 fixes * Add Test for BP5 dataset without steps
1 parent cbb076f commit 29c587b

File tree

3 files changed

+193
-24
lines changed

3 files changed

+193
-24
lines changed

include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1075,9 +1075,15 @@ namespace detail
10751075
* @brief Begin or end an ADIOS step.
10761076
*
10771077
* @param mode Whether to begin or end a step.
1078+
* @param calledExplicitly True if called due to a public API call.
1079+
* False if called from requireActiveStep.
1080+
* Some engines (BP5) require that every interaction happens within
1081+
* an active step, meaning that we need to call advance()
1082+
* implicitly at times. When doing that, do not tag the dataset
1083+
* with __openPMD_internal/useSteps (yet).
10781084
* @return AdvanceStatus
10791085
*/
1080-
AdvanceStatus advance(AdvanceMode mode);
1086+
AdvanceStatus advance(AdvanceMode mode, bool calledExplicitly);
10811087

10821088
/*
10831089
* Delete all buffered actions without running them.
@@ -1201,7 +1207,6 @@ namespace detail
12011207
Undecided
12021208
};
12031209
StreamStatus streamStatus = StreamStatus::OutsideOfStep;
1204-
adios2::StepStatus m_lastStepStatus = adios2::StepStatus::OK;
12051210

12061211
/**
12071212
* See documentation for StreamStatus::Parsing.

src/IO/ADIOS/ADIOS2IOHandler.cpp

Lines changed: 67 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,7 @@ void ADIOS2IOHandlerImpl::writeAttribute(
605605
auto prefix = filePositionToString(pos);
606606

607607
auto &filedata = getFileData(file, IfFileNotOpen::ThrowError);
608+
filedata.requireActiveStep();
608609
filedata.invalidateAttributesMap();
609610
m_dirty.emplace(std::move(file));
610611

@@ -703,7 +704,13 @@ void ADIOS2IOHandlerImpl::getBufferView(
703704
Writable *writable, Parameter<Operation::GET_BUFFER_VIEW> &parameters)
704705
{
705706
// @todo check access mode
706-
if (m_engineType != "bp4")
707+
std::string optInEngines[] = {"bp4", "bp5", "file", "filestream"};
708+
if (std::none_of(
709+
begin(optInEngines),
710+
end(optInEngines),
711+
[this](std::string const &engine) {
712+
return engine == this->m_engineType;
713+
}))
707714
{
708715
parameters.out->backendManagedBuffer = false;
709716
return;
@@ -764,6 +771,7 @@ void ADIOS2IOHandlerImpl::readAttribute(
764771
auto file = refreshFileFromParent(writable, /* preferParentFile = */ false);
765772
auto pos = setAndGetFilePosition(writable);
766773
detail::BufferedActions &ba = getFileData(file, IfFileNotOpen::ThrowError);
774+
ba.requireActiveStep();
767775
switch (attributeLayout())
768776
{
769777
using AL = AttributeLayout;
@@ -1006,7 +1014,8 @@ void ADIOS2IOHandlerImpl::advance(
10061014
{
10071015
auto file = m_files[writable];
10081016
auto &ba = getFileData(file, IfFileNotOpen::ThrowError);
1009-
*parameters.status = ba.advance(parameters.mode);
1017+
*parameters.status =
1018+
ba.advance(parameters.mode, /* calledExplicitly = */ true);
10101019
}
10111020

10121021
void ADIOS2IOHandlerImpl::closePath(
@@ -1470,6 +1479,7 @@ namespace detail
14701479

14711480
auto &filedata = impl->getFileData(
14721481
file, ADIOS2IOHandlerImpl::IfFileNotOpen::ThrowError);
1482+
filedata.requireActiveStep();
14731483
filedata.invalidateAttributesMap();
14741484
adios2::IO IO = filedata.m_IO;
14751485
impl->m_dirty.emplace(std::move(file));
@@ -2161,9 +2171,18 @@ namespace detail
21612171
{
21622172
(void)impl;
21632173
static std::set<std::string> streamingEngines = {
2164-
"sst", "insitumpi", "inline", "staging", "nullcore", "ssc"};
2174+
"sst",
2175+
"insitumpi",
2176+
"inline",
2177+
"staging",
2178+
"nullcore",
2179+
"ssc",
2180+
"filestream",
2181+
"bp5"};
2182+
// diskStreamingEngines is a subset of streamingEngines
2183+
static std::set<std::string> diskStreamingEngines{"bp5", "filestream"};
21652184
static std::set<std::string> fileEngines = {
2166-
"bp5", "bp4", "bp3", "hdf5", "file"};
2185+
"bp4", "bp3", "hdf5", "file"};
21672186

21682187
// step/variable-based iteration encoding requires the new schema
21692188
if (m_impl->m_iterationEncoding == IterationEncoding::variableBased)
@@ -2189,7 +2208,14 @@ namespace detail
21892208
{
21902209
isStreaming = true;
21912210
optimizeAttributesStreaming =
2192-
schema() == SupportedSchema::s_0000_00_00;
2211+
// Optimizing attributes in streaming mode is not needed in
2212+
// the variable-based ADIOS2 schema
2213+
schema() == SupportedSchema::s_0000_00_00 &&
2214+
// Also, it should only be done when truly streaming, not
2215+
// when using a disk-based engine that behaves like a
2216+
// streaming engine (otherwise attributes might vanish)
2217+
diskStreamingEngines.find(m_engineType) ==
2218+
diskStreamingEngines.end();
21932219
streamStatus = StreamStatus::OutsideOfStep;
21942220
}
21952221
else
@@ -2206,7 +2232,6 @@ namespace detail
22062232
* file being read.
22072233
*/
22082234
streamStatus = StreamStatus::Undecided;
2209-
// @todo no?? should be default in both modes
22102235
delayOpeningTheFirstStep = true;
22112236
break;
22122237
case adios2::Mode::Write:
@@ -2511,9 +2536,22 @@ namespace detail
25112536
adios2::Engine &BufferedActions::requireActiveStep()
25122537
{
25132538
adios2::Engine &eng = getEngine();
2539+
/*
2540+
* If streamStatus is Parsing, do NOT open the step.
2541+
*/
25142542
if (streamStatus == StreamStatus::OutsideOfStep)
25152543
{
2516-
m_lastStepStatus = eng.BeginStep();
2544+
switch (
2545+
advance(AdvanceMode::BEGINSTEP, /* calledExplicitly = */ false))
2546+
{
2547+
case AdvanceStatus::OVER:
2548+
throw std::runtime_error(
2549+
"[ADIOS2] Operation requires active step but no step is "
2550+
"left.");
2551+
case AdvanceStatus::OK:
2552+
// pass
2553+
break;
2554+
}
25172555
if (m_mode == adios2::Mode::Read &&
25182556
attributeLayout() == AttributeLayout::ByAdiosVariables)
25192557
{
@@ -2665,7 +2703,8 @@ namespace detail
26652703
/* flushUnconditionally = */ false);
26662704
}
26672705

2668-
AdvanceStatus BufferedActions::advance(AdvanceMode mode)
2706+
AdvanceStatus
2707+
BufferedActions::advance(AdvanceMode mode, bool calledExplicitly)
26692708
{
26702709
if (streamStatus == StreamStatus::Undecided)
26712710
{
@@ -2681,8 +2720,20 @@ namespace detail
26812720
return AdvanceStatus::OK;
26822721
}
26832722

2684-
m_IO.DefineAttribute<bool_representation>(
2685-
ADIOS2Defaults::str_usesstepsAttribute, 1);
2723+
/*
2724+
* If advance() is called implicitly (by requireActiveStep()), the
2725+
* Series is not necessarily using steps (logically).
2726+
* But in some ADIOS2 engines, at least one step must be opened
2727+
* (physically) to do anything.
2728+
* The usessteps tag should only be set when the Series is *logically*
2729+
* using steps.
2730+
*/
2731+
if (calledExplicitly)
2732+
{
2733+
m_IO.DefineAttribute<bool_representation>(
2734+
ADIOS2Defaults::str_usesstepsAttribute, 1);
2735+
}
2736+
26862737
switch (mode)
26872738
{
26882739
case AdvanceMode::ENDSTEP: {
@@ -2715,28 +2766,22 @@ namespace detail
27152766
return AdvanceStatus::OK;
27162767
}
27172768
case AdvanceMode::BEGINSTEP: {
2718-
adios2::StepStatus adiosStatus = m_lastStepStatus;
2769+
adios2::StepStatus adiosStatus{};
27192770

2720-
// Step might have been opened implicitly already
2721-
// by requireActiveStep()
2722-
// In that case, streamStatus is DuringStep and Adios
2723-
// return status is stored in m_lastStepStatus
27242771
if (streamStatus != StreamStatus::DuringStep)
27252772
{
2726-
flush(
2727-
FlushLevel::UserFlush,
2728-
[&adiosStatus](BufferedActions &, adios2::Engine &engine) {
2729-
adiosStatus = engine.BeginStep();
2730-
},
2731-
/* writeAttributes = */ false,
2732-
/* flushUnconditionally = */ true);
2773+
adiosStatus = getEngine().BeginStep();
27332774
if (adiosStatus == adios2::StepStatus::OK &&
27342775
m_mode == adios2::Mode::Read &&
27352776
attributeLayout() == AttributeLayout::ByAdiosVariables)
27362777
{
27372778
preloadAttributes.preloadAttributes(m_IO, m_engine.value());
27382779
}
27392780
}
2781+
else
2782+
{
2783+
adiosStatus = adios2::StepStatus::OK;
2784+
}
27402785
AdvanceStatus res = AdvanceStatus::OK;
27412786
switch (adiosStatus)
27422787
{

test/SerialIOTest.cpp

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5145,6 +5145,26 @@ TEST_CASE("iterate_nonstreaming_series", "[serial][adios2]")
51455145
backend.extension,
51465146
false,
51475147
backend.jsonBaseConfig());
5148+
#if openPMD_HAVE_ADIOS2 && \
5149+
ADIOS2_VERSION_MAJOR * 1000000000 + ADIOS2_VERSION_MINOR * 100000000 + \
5150+
ADIOS2_VERSION_PATCH * 1000000 + ADIOS2_VERSION_TWEAK >= \
5151+
2701001223
5152+
if (backend.extension == "bp")
5153+
{
5154+
iterate_nonstreaming_series(
5155+
"../samples/iterate_nonstreaming_series_filebased_bp5_%T." +
5156+
backend.extension,
5157+
false,
5158+
json::merge(
5159+
backend.jsonBaseConfig(), "adios2.engine = \"bp5\""));
5160+
iterate_nonstreaming_series(
5161+
"../samples/iterate_nonstreaming_series_groupbased_bp5." +
5162+
backend.extension,
5163+
false,
5164+
json::merge(
5165+
backend.jsonBaseConfig(), "adios2.engine = \"bp5\""));
5166+
}
5167+
#endif
51485168
}
51495169
#if openPMD_HAVE_ADIOS2
51505170
iterate_nonstreaming_series(
@@ -5154,6 +5174,105 @@ TEST_CASE("iterate_nonstreaming_series", "[serial][adios2]")
51545174
#endif
51555175
}
51565176

5177+
#if openPMD_HAVE_ADIOS2 && \
5178+
ADIOS2_VERSION_MAJOR * 1000000000 + ADIOS2_VERSION_MINOR * 100000000 + \
5179+
ADIOS2_VERSION_PATCH * 1000000 + ADIOS2_VERSION_TWEAK >= \
5180+
2701001223
5181+
void adios2_bp5_no_steps(bool usesteps)
5182+
{
5183+
std::string const config = R"END(
5184+
{
5185+
"adios2":
5186+
{
5187+
"new_attribute_layout": true,
5188+
"schema": 20210209
5189+
}
5190+
})END";
5191+
{
5192+
adios2::ADIOS adios;
5193+
auto IO = adios.DeclareIO("IO");
5194+
IO.SetEngine("bp5");
5195+
auto engine =
5196+
IO.Open("../samples/bp5_no_steps.bp", adios2::Mode::Write);
5197+
if (usesteps)
5198+
{
5199+
engine.BeginStep();
5200+
}
5201+
5202+
// write default openPMD attributes
5203+
auto writeAttribute = [&IO](std::string const &name, auto value) {
5204+
IO.DefineAttribute(name, value);
5205+
};
5206+
IO.DefineAttribute("/basePath", std::string("/data/%T/"));
5207+
IO.DefineAttribute("/date", std::string("2021-02-22 11:14:00 +0000"));
5208+
IO.DefineAttribute("/iterationEncoding", std::string("groupBased"));
5209+
IO.DefineAttribute("/iterationFormat", std::string("/data/%T/"));
5210+
IO.DefineAttribute("/meshesPath", std::string("meshes/"));
5211+
IO.DefineAttribute("/openPMD", std::string("1.1.0"));
5212+
IO.DefineAttribute("/openPMDextension", uint32_t(0));
5213+
IO.DefineAttribute("/software", std::string("openPMD-api"));
5214+
IO.DefineAttribute("/softwareVersion", std::string("0.15.0-dev"));
5215+
5216+
IO.DefineAttribute("/data/0/dt", double(1));
5217+
IO.DefineAttribute(
5218+
"/data/0/meshes/theta/axisLabels",
5219+
std::vector<std::string>{"x"}.data(),
5220+
1);
5221+
IO.DefineAttribute("/data/0/meshes/theta/dataOrder", std::string("C"));
5222+
IO.DefineAttribute(
5223+
"/data/0/meshes/theta/geometry", std::string("cartesian"));
5224+
IO.DefineAttribute("/data/0/meshes/theta/gridGlobalOffset", double(0));
5225+
IO.DefineAttribute("/data/0/meshes/theta/gridSpacing", double(1));
5226+
IO.DefineAttribute("/data/0/meshes/theta/gridUnitSI", double(1));
5227+
IO.DefineAttribute("/data/0/meshes/theta/position", double(0));
5228+
IO.DefineAttribute("/data/0/meshes/theta/timeOffset", double(0));
5229+
IO.DefineAttribute(
5230+
"/data/0/meshes/theta/unitDimension",
5231+
std::vector<double>(7, 0).data(),
5232+
7);
5233+
IO.DefineAttribute("/data/0/meshes/theta/unitSI", double(1));
5234+
IO.DefineAttribute("/data/0/time", double(0));
5235+
IO.DefineAttribute("/data/0/timeUnitSI", double(1));
5236+
5237+
IO.DefineAttribute<uint64_t>(
5238+
"__openPMD_internal/openPMD2_adios2_schema", 0);
5239+
IO.DefineAttribute<unsigned char>("__openPMD_internal/useSteps", 0);
5240+
5241+
std::vector<int> data{0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
5242+
engine.Put(
5243+
IO.DefineVariable<int>("/data/0/meshes/theta", {10}, {0}, {10}),
5244+
data.data());
5245+
5246+
if (usesteps)
5247+
{
5248+
engine.EndStep();
5249+
}
5250+
engine.Close();
5251+
}
5252+
5253+
{
5254+
Series read(
5255+
"../samples/bp5_no_steps.bp",
5256+
Access::READ_ONLY,
5257+
"adios2.engine.type = \"bp5\"");
5258+
auto data = read.iterations[0]
5259+
.meshes["theta"][RecordComponent::SCALAR]
5260+
.loadChunk<int>({0}, {10});
5261+
read.flush();
5262+
for (size_t i = 0; i < 10; ++i)
5263+
{
5264+
REQUIRE(data.get()[i] == int(i));
5265+
}
5266+
}
5267+
}
5268+
5269+
TEST_CASE("adios2_bp5_no_steps", "[serial][adios2]")
5270+
{
5271+
adios2_bp5_no_steps(/* usesteps = */ false);
5272+
adios2_bp5_no_steps(/* usesteps = */ true);
5273+
}
5274+
#endif
5275+
51575276
void extendDataset(std::string const &ext, std::string const &jsonConfig)
51585277
{
51595278
std::string filename = "../samples/extendDataset." + ext;

0 commit comments

Comments
 (0)