Skip to content

Commit 9f8d446

Browse files
committed
Have load_parameters_numpy merge all dataframes before writing
This commit makes load_parameters_numpy merge all polars Dataframe parameters before writing them to SCALARS.parquet, instead of reading it, merging it, then writing it for every single parameter.
1 parent 53dc19f commit 9f8d446

1 file changed

Lines changed: 21 additions & 1 deletion

File tree

src/ert/storage/local_ensemble.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import contextlib
45
import logging
56
import os
67
import time
@@ -660,10 +661,29 @@ def save_parameters_numpy(
660661
iens_active_index: npt.NDArray[np.int_],
661662
) -> None:
662663
config_node = self.experiment.parameter_configuration[param_group]
664+
complete_df: pl.DataFrame | None = None
665+
with contextlib.suppress(KeyError):
666+
complete_df = self._load_parameters_lazy(SCALAR_FILENAME).collect(
667+
engine="streaming"
668+
)
663669
for real, ds in config_node.create_storage_datasets(
664670
parameters, iens_active_index
665671
):
666-
self.save_parameters(ds, config_node.name, real)
672+
if isinstance(ds, pl.DataFrame):
673+
if complete_df is None:
674+
complete_df = ds
675+
else:
676+
complete_df = (
677+
complete_df.join(ds, on="realization", how="left")
678+
.unique(subset=["realization"], keep="first")
679+
.sort("realization")
680+
)
681+
else:
682+
self.save_parameters(ds, config_node.name, real)
683+
684+
group_path = self.mount_point / f"{_escape_filename(SCALAR_FILENAME)}.parquet"
685+
if complete_df is not None:
686+
self._storage._to_parquet_transaction(group_path, complete_df)
667687

668688
def load_scalars(
669689
self, group: str | None = None, realizations: npt.NDArray[np.int_] | None = None

0 commit comments

Comments
 (0)