Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1075,9 +1075,15 @@ namespace detail
* @brief Begin or end an ADIOS step.
*
* @param mode Whether to begin or end a step.
* @param calledExplicitly True if called due to a public API call.
* False if called from requireActiveStep.
* Some engines (BP5) require that every interaction happens within
* an active step, meaning that we need to call advance()
* implicitly at times. When doing that, do not tag the dataset
* with __openPMD_internal/useSteps (yet).
* @return AdvanceStatus
*/
AdvanceStatus advance(AdvanceMode mode);
AdvanceStatus advance(AdvanceMode mode, bool calledExplicitly);

/*
* Delete all buffered actions without running them.
Expand Down Expand Up @@ -1201,7 +1207,6 @@ namespace detail
Undecided
};
StreamStatus streamStatus = StreamStatus::OutsideOfStep;
adios2::StepStatus m_lastStepStatus = adios2::StepStatus::OK;

/**
* See documentation for StreamStatus::Parsing.
Expand Down
89 changes: 67 additions & 22 deletions src/IO/ADIOS/ADIOS2IOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ void ADIOS2IOHandlerImpl::writeAttribute(
auto prefix = filePositionToString(pos);

auto &filedata = getFileData(file, IfFileNotOpen::ThrowError);
filedata.requireActiveStep();
filedata.invalidateAttributesMap();
m_dirty.emplace(std::move(file));

Expand Down Expand Up @@ -703,7 +704,13 @@ void ADIOS2IOHandlerImpl::getBufferView(
Writable *writable, Parameter<Operation::GET_BUFFER_VIEW> &parameters)
{
// @todo check access mode
if (m_engineType != "bp4")
std::string optInEngines[] = {"bp4", "bp5", "file", "filestream"};
if (std::none_of(
begin(optInEngines),
end(optInEngines),
[this](std::string const &engine) {
return engine == this->m_engineType;
}))
{
parameters.out->backendManagedBuffer = false;
return;
Expand Down Expand Up @@ -764,6 +771,7 @@ void ADIOS2IOHandlerImpl::readAttribute(
auto file = refreshFileFromParent(writable, /* preferParentFile = */ false);
auto pos = setAndGetFilePosition(writable);
detail::BufferedActions &ba = getFileData(file, IfFileNotOpen::ThrowError);
ba.requireActiveStep();
switch (attributeLayout())
{
using AL = AttributeLayout;
Expand Down Expand Up @@ -1006,7 +1014,8 @@ void ADIOS2IOHandlerImpl::advance(
{
auto file = m_files[writable];
auto &ba = getFileData(file, IfFileNotOpen::ThrowError);
*parameters.status = ba.advance(parameters.mode);
*parameters.status =
ba.advance(parameters.mode, /* calledExplicitly = */ true);
}

void ADIOS2IOHandlerImpl::closePath(
Expand Down Expand Up @@ -1470,6 +1479,7 @@ namespace detail

auto &filedata = impl->getFileData(
file, ADIOS2IOHandlerImpl::IfFileNotOpen::ThrowError);
filedata.requireActiveStep();
filedata.invalidateAttributesMap();
adios2::IO IO = filedata.m_IO;
impl->m_dirty.emplace(std::move(file));
Expand Down Expand Up @@ -2161,9 +2171,18 @@ namespace detail
{
(void)impl;
static std::set<std::string> streamingEngines = {
"sst", "insitumpi", "inline", "staging", "nullcore", "ssc"};
"sst",
"insitumpi",
"inline",
"staging",
"nullcore",
"ssc",
"filestream",
"bp5"};
// diskStreamingEngines is a subset of streamingEngines
static std::set<std::string> diskStreamingEngines{"bp5", "filestream"};
static std::set<std::string> fileEngines = {
"bp5", "bp4", "bp3", "hdf5", "file"};
"bp4", "bp3", "hdf5", "file"};

// step/variable-based iteration encoding requires the new schema
if (m_impl->m_iterationEncoding == IterationEncoding::variableBased)
Expand All @@ -2189,7 +2208,14 @@ namespace detail
{
isStreaming = true;
optimizeAttributesStreaming =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can think about removing this optimization, it becomes irrelevant in variable-based iteration encoding and there is no harm in using that encoding for streaming.
Not relevant for this PR though.

schema() == SupportedSchema::s_0000_00_00;
// Optimizing attributes in streaming mode is not needed in
// the variable-based ADIOS2 schema
schema() == SupportedSchema::s_0000_00_00 &&
// Also, it should only be done when truly streaming, not
// when using a disk-based engine that behaves like a
// streaming engine (otherwise attributes might vanish)
diskStreamingEngines.find(m_engineType) ==
diskStreamingEngines.end();
streamStatus = StreamStatus::OutsideOfStep;
}
else
Expand All @@ -2206,7 +2232,6 @@ namespace detail
* file being read.
*/
streamStatus = StreamStatus::Undecided;
// @todo no?? should be default in both modes
delayOpeningTheFirstStep = true;
break;
case adios2::Mode::Write:
Expand Down Expand Up @@ -2511,9 +2536,22 @@ namespace detail
adios2::Engine &BufferedActions::requireActiveStep()
{
adios2::Engine &eng = getEngine();
/*
* If streamStatus is Parsing, do NOT open the step.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should have added this comment earlier already, just took the opportunity now

*/
if (streamStatus == StreamStatus::OutsideOfStep)
{
m_lastStepStatus = eng.BeginStep();
switch (
advance(AdvanceMode::BEGINSTEP, /* calledExplicitly = */ false))
{
case AdvanceStatus::OVER:
throw std::runtime_error(
"[ADIOS2] Operation requires active step but no step is "
"left.");
case AdvanceStatus::OK:
// pass
break;
}
if (m_mode == adios2::Mode::Read &&
attributeLayout() == AttributeLayout::ByAdiosVariables)
{
Expand Down Expand Up @@ -2665,7 +2703,8 @@ namespace detail
/* flushUnconditionally = */ false);
}

AdvanceStatus BufferedActions::advance(AdvanceMode mode)
AdvanceStatus
BufferedActions::advance(AdvanceMode mode, bool calledExplicitly)
{
if (streamStatus == StreamStatus::Undecided)
{
Expand All @@ -2681,8 +2720,20 @@ namespace detail
return AdvanceStatus::OK;
}

m_IO.DefineAttribute<bool_representation>(
ADIOS2Defaults::str_usesstepsAttribute, 1);
/*
* If advance() is called implicitly (by requireActiveStep()), the
* Series is not necessarily using steps (logically).
* But in some ADIOS2 engines, at least one step must be opened
* (physically) to do anything.
* The usessteps tag should only be set when the Series is *logically*
* using steps.
*/
if (calledExplicitly)
{
m_IO.DefineAttribute<bool_representation>(
ADIOS2Defaults::str_usesstepsAttribute, 1);
}

switch (mode)
{
case AdvanceMode::ENDSTEP: {
Expand Down Expand Up @@ -2715,28 +2766,22 @@ namespace detail
return AdvanceStatus::OK;
}
case AdvanceMode::BEGINSTEP: {
adios2::StepStatus adiosStatus = m_lastStepStatus;
adios2::StepStatus adiosStatus{};

// Step might have been opened implicitly already
// by requireActiveStep()
// In that case, streamStatus is DuringStep and Adios
// return status is stored in m_lastStepStatus
if (streamStatus != StreamStatus::DuringStep)
{
flush(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flushing here was wrong but harmless before. This dormant bug has woken up with this PR, so change it.

FlushLevel::UserFlush,
[&adiosStatus](BufferedActions &, adios2::Engine &engine) {
adiosStatus = engine.BeginStep();
},
/* writeAttributes = */ false,
/* flushUnconditionally = */ true);
adiosStatus = getEngine().BeginStep();
if (adiosStatus == adios2::StepStatus::OK &&
m_mode == adios2::Mode::Read &&
attributeLayout() == AttributeLayout::ByAdiosVariables)
{
preloadAttributes.preloadAttributes(m_IO, m_engine.value());
}
}
else
{
adiosStatus = adios2::StepStatus::OK;
}
AdvanceStatus res = AdvanceStatus::OK;
switch (adiosStatus)
{
Expand Down
119 changes: 119 additions & 0 deletions test/SerialIOTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5145,6 +5145,26 @@ TEST_CASE("iterate_nonstreaming_series", "[serial][adios2]")
backend.extension,
false,
backend.jsonBaseConfig());
#if openPMD_HAVE_ADIOS2 && \
ADIOS2_VERSION_MAJOR * 1000000000 + ADIOS2_VERSION_MINOR * 100000000 + \
ADIOS2_VERSION_PATCH * 1000000 + ADIOS2_VERSION_TWEAK >= \
2701001223
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that our CI does not yet have any ADIOS2 versions that satisfy these conditions

if (backend.extension == "bp")
{
iterate_nonstreaming_series(
"../samples/iterate_nonstreaming_series_filebased_bp5_%T." +
backend.extension,
false,
json::merge(
backend.jsonBaseConfig(), "adios2.engine = \"bp5\""));
iterate_nonstreaming_series(
"../samples/iterate_nonstreaming_series_groupbased_bp5." +
backend.extension,
false,
json::merge(
backend.jsonBaseConfig(), "adios2.engine = \"bp5\""));
}
#endif
}
#if openPMD_HAVE_ADIOS2
iterate_nonstreaming_series(
Expand All @@ -5154,6 +5174,105 @@ TEST_CASE("iterate_nonstreaming_series", "[serial][adios2]")
#endif
}

#if openPMD_HAVE_ADIOS2 && \
ADIOS2_VERSION_MAJOR * 1000000000 + ADIOS2_VERSION_MINOR * 100000000 + \
ADIOS2_VERSION_PATCH * 1000000 + ADIOS2_VERSION_TWEAK >= \
2701001223
void adios2_bp5_no_steps(bool usesteps)
{
std::string const config = R"END(
{
"adios2":
{
"new_attribute_layout": true,
"schema": 20210209
}
})END";
{
adios2::ADIOS adios;
auto IO = adios.DeclareIO("IO");
IO.SetEngine("bp5");
auto engine =
IO.Open("../samples/bp5_no_steps.bp", adios2::Mode::Write);
if (usesteps)
{
engine.BeginStep();
}

// write default openPMD attributes
auto writeAttribute = [&IO](std::string const &name, auto value) {
IO.DefineAttribute(name, value);
};
IO.DefineAttribute("/basePath", std::string("/data/%T/"));
IO.DefineAttribute("/date", std::string("2021-02-22 11:14:00 +0000"));
IO.DefineAttribute("/iterationEncoding", std::string("groupBased"));
IO.DefineAttribute("/iterationFormat", std::string("/data/%T/"));
IO.DefineAttribute("/meshesPath", std::string("meshes/"));
IO.DefineAttribute("/openPMD", std::string("1.1.0"));
IO.DefineAttribute("/openPMDextension", uint32_t(0));
IO.DefineAttribute("/software", std::string("openPMD-api"));
IO.DefineAttribute("/softwareVersion", std::string("0.15.0-dev"));

IO.DefineAttribute("/data/0/dt", double(1));
IO.DefineAttribute(
"/data/0/meshes/theta/axisLabels",
std::vector<std::string>{"x"}.data(),
1);
IO.DefineAttribute("/data/0/meshes/theta/dataOrder", std::string("C"));
IO.DefineAttribute(
"/data/0/meshes/theta/geometry", std::string("cartesian"));
IO.DefineAttribute("/data/0/meshes/theta/gridGlobalOffset", double(0));
IO.DefineAttribute("/data/0/meshes/theta/gridSpacing", double(1));
IO.DefineAttribute("/data/0/meshes/theta/gridUnitSI", double(1));
IO.DefineAttribute("/data/0/meshes/theta/position", double(0));
IO.DefineAttribute("/data/0/meshes/theta/timeOffset", double(0));
IO.DefineAttribute(
"/data/0/meshes/theta/unitDimension",
std::vector<double>(7, 0).data(),
7);
IO.DefineAttribute("/data/0/meshes/theta/unitSI", double(1));
IO.DefineAttribute("/data/0/time", double(0));
IO.DefineAttribute("/data/0/timeUnitSI", double(1));

IO.DefineAttribute<uint64_t>(
"__openPMD_internal/openPMD2_adios2_schema", 0);
IO.DefineAttribute<unsigned char>("__openPMD_internal/useSteps", 0);

std::vector<int> data{0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
engine.Put(
IO.DefineVariable<int>("/data/0/meshes/theta", {10}, {0}, {10}),
data.data());

if (usesteps)
{
engine.EndStep();
}
engine.Close();
}

{
Series read(
"../samples/bp5_no_steps.bp",
Access::READ_ONLY,
"adios2.engine.type = \"bp5\"");
auto data = read.iterations[0]
.meshes["theta"][RecordComponent::SCALAR]
.loadChunk<int>({0}, {10});
read.flush();
for (size_t i = 0; i < 10; ++i)
{
REQUIRE(data.get()[i] == int(i));
}
}
}

TEST_CASE("adios2_bp5_no_steps", "[serial][adios2]")
{
adios2_bp5_no_steps(/* usesteps = */ false);
adios2_bp5_no_steps(/* usesteps = */ true);
}
#endif

void extendDataset(std::string const &ext, std::string const &jsonConfig)
{
std::string filename = "../samples/extendDataset." + ext;
Expand Down