From 02e43eb4cdf3a4ff484999b7193b28ac4116541b Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Mon, 5 Jan 2026 14:01:06 +0100 Subject: [PATCH 1/8] Move _Proc and ErtServerConnectionInfo to ert_server.py --- src/ert/gui/tools/plot/plot_window.py | 2 +- src/ert/services/__init__.py | 4 +- src/ert/services/_base_service.py | 169 +----------------- src/ert/services/_storage_main.py | 2 +- src/ert/services/ert_server.py | 166 ++++++++++++++++- src/everest/detached/everserver.py | 2 +- .../unit_tests/services/test_base_service.py | 6 +- 7 files changed, 176 insertions(+), 175 deletions(-) diff --git a/src/ert/gui/tools/plot/plot_window.py b/src/ert/gui/tools/plot/plot_window.py index 2a5f6780efe..8b582f5620b 100644 --- a/src/ert/gui/tools/plot/plot_window.py +++ b/src/ert/gui/tools/plot/plot_window.py @@ -25,7 +25,7 @@ from ert.config.field import Field from ert.dark_storage.common import get_storage_api_version from ert.gui.ertwidgets import CopyButton, showWaitCursorWhileWaiting -from ert.services._base_service import ServerBootFail +from ert.services import ServerBootFail from ert.utils import log_duration from .customize import PlotCustomizer diff --git a/src/ert/services/__init__.py b/src/ert/services/__init__.py index b7572d81ad2..e37b7d6c765 100644 --- a/src/ert/services/__init__.py +++ b/src/ert/services/__init__.py @@ -1,4 +1,4 @@ -from .ert_server import ErtServer +from .ert_server import BaseServiceExit, ErtServer, ServerBootFail from .webviz_ert_service import WebvizErt -__all__ = ["ErtServer", "WebvizErt"] +__all__ = ["BaseServiceExit", "ErtServer", "ServerBootFail", "WebvizErt"] diff --git a/src/ert/services/_base_service.py b/src/ert/services/_base_service.py index 08a100768e4..48c146b1596 100644 --- a/src/ert/services/_base_service.py +++ b/src/ert/services/_base_service.py @@ -5,23 +5,19 @@ from __future__ import annotations -import contextlib -import io import json import os -import signal import sys import threading import types -from collections.abc import Callable, Mapping, Sequence +from collections.abc import Mapping, Sequence from logging import Logger, getLogger from pathlib import Path -from select import PIPE_BUF, select -from subprocess import Popen, TimeoutExpired from tempfile import NamedTemporaryFile from time import sleep -from types import FrameType -from typing import TYPE_CHECKING, Any, Generic, Self, TypedDict, TypeVar +from typing import TYPE_CHECKING, Any, Generic, Self, TypeVar + +from ert.services.ert_server import ErtServerConnectionInfo, _Proc if TYPE_CHECKING: pass @@ -29,35 +25,6 @@ T = TypeVar("T", bound="BaseService") -class ErtServerConnectionInfo(TypedDict): - urls: list[str] - authtoken: str - host: str - port: str - cert: str - auth: str - - -SERVICE_CONF_PATHS: set[str] = set() - - -class BaseServiceExit(OSError): - pass - - -def cleanup_service_files(signum: int, frame: FrameType | None) -> None: - for file_path in SERVICE_CONF_PATHS: - file = Path(file_path) - if file.exists(): - file.unlink() - raise BaseServiceExit(f"Signal {signum} received.") - - -if threading.current_thread() is threading.main_thread(): - signal.signal(signal.SIGTERM, cleanup_service_files) - signal.signal(signal.SIGINT, cleanup_service_files) - - def local_exec_args(script_args: str | list[str]) -> list[str]: """ Convenience function that returns the exec_args for executing a Python @@ -96,134 +63,6 @@ def __exit__( return exc_type is None -class _Proc(threading.Thread): - def __init__( - self, - service_name: str, - exec_args: Sequence[str], - timeout: int, - on_connection_info_received: Callable[ - [ErtServerConnectionInfo | Exception | None], None - ], - project: Path, - ) -> None: - super().__init__() - - self._shutdown = threading.Event() - - self._service_name = service_name - self._exec_args = exec_args - self._timeout = timeout - self._propagate_connection_info_from_childproc = on_connection_info_received - self._service_config_path = project / f"{self._service_name}_server.json" - - fd_read, fd_write = os.pipe() - self._comm_pipe = os.fdopen(fd_read) - - env = os.environ.copy() - env["ERT_COMM_FD"] = str(fd_write) - - SERVICE_CONF_PATHS.add(str(self._service_config_path)) - - # The process is waited for in _do_shutdown() - self._childproc = Popen( - self._exec_args, - pass_fds=(fd_write,), - env=env, - close_fds=True, - ) - os.close(fd_write) - - def run(self) -> None: - comm = self._read_connection_info_from_process(self._childproc) - - if comm is None: - self._propagate_connection_info_from_childproc(TimeoutError()) - return # _read_conn_info() has already cleaned up in this case - - conn_info: ErtServerConnectionInfo | Exception | None = None - try: - conn_info = json.loads(comm) - except json.JSONDecodeError: - conn_info = ServerBootFail() - except Exception as exc: - conn_info = exc - - try: - self._propagate_connection_info_from_childproc(conn_info) - - while True: - if self._childproc.poll() is not None: - break - if self._shutdown.wait(1): - self._do_shutdown() - break - - except Exception as e: - print(str(e)) - self.logger.exception(e) - - finally: - self._ensure_connection_info_file_is_deleted() - - def shutdown(self) -> int: - """Shutdown the server.""" - self._shutdown.set() - self.join() - - return self._childproc.returncode - - def _read_connection_info_from_process(self, proc: Popen[bytes]) -> str | None: - comm_buf = io.StringIO() - first_iter = True - while first_iter or proc.poll() is None: - first_iter = False - ready = select([self._comm_pipe], [], [], self._timeout) - - # Timeout reached, exit with a failure - if ready == ([], [], []): - self._do_shutdown() - self._ensure_connection_info_file_is_deleted() - return None - - x = self._comm_pipe.read(PIPE_BUF) - if not x: # EOF - break - comm_buf.write(x) - return comm_buf.getvalue() - - def _do_shutdown(self) -> None: - if self._childproc is None: - return - try: - self._childproc.terminate() - self._childproc.wait(10) # Give it 10s to shut down cleanly.. - except TimeoutExpired: - try: - self._childproc.kill() # ... then kick it harder... - self._childproc.wait(self._timeout) # ... and wait again - except TimeoutExpired: - self.logger.error( - f"waiting for child-process exceeded timeout {self._timeout}s" - ) - - def _ensure_connection_info_file_is_deleted(self) -> None: - """ - Ensure that the JSON connection information file is deleted - """ - with contextlib.suppress(OSError): - if self._service_config_path.exists(): - self._service_config_path.unlink() - - @property - def logger(self) -> Logger: - return getLogger(f"ert.shared.{self._service_name}") - - -class ServerBootFail(RuntimeError): - pass - - class BaseService: """ BaseService provides a block-only-when-needed mechanism for starting and diff --git a/src/ert/services/_storage_main.py b/src/ert/services/_storage_main.py index 690f5785f69..058f2404e3b 100644 --- a/src/ert/services/_storage_main.py +++ b/src/ert/services/_storage_main.py @@ -30,7 +30,7 @@ from ert.logging import STORAGE_LOG_CONFIG from ert.plugins import setup_site_logging -from ert.services._base_service import BaseServiceExit +from ert.services import BaseServiceExit from ert.shared import __file__ as ert_shared_path from ert.shared import find_available_socket, get_machine_name from ert.trace import tracer diff --git a/src/ert/services/ert_server.py b/src/ert/services/ert_server.py index d06bbc29a80..ea1303d0227 100644 --- a/src/ert/services/ert_server.py +++ b/src/ert/services/ert_server.py @@ -1,23 +1,183 @@ from __future__ import annotations +import contextlib +import io import json import logging import os +import signal import sys import threading import types -from collections.abc import Mapping +from collections.abc import Callable, Mapping, Sequence from pathlib import Path +from select import PIPE_BUF, select +from subprocess import Popen, TimeoutExpired from tempfile import NamedTemporaryFile from time import sleep -from typing import Any, cast +from typing import Any, TypedDict, cast import requests from ert.dark_storage.client import Client, ErtClientConnectionInfo -from ert.services._base_service import ErtServerConnectionInfo, _Proc from ert.trace import get_traceparent +SERVICE_CONF_PATHS: set[str] = set() + + +class ErtServerConnectionInfo(TypedDict): + urls: list[str] + authtoken: str + host: str + port: str + cert: str + auth: str + + +class BaseServiceExit(OSError): + pass + + +def cleanup_service_files(signum: int, frame: types.FrameType | None) -> None: + for file_path in SERVICE_CONF_PATHS: + file = Path(file_path) + if file.exists(): + file.unlink() + raise BaseServiceExit(f"Signal {signum} received.") + + +if threading.current_thread() is threading.main_thread(): + signal.signal(signal.SIGTERM, cleanup_service_files) + signal.signal(signal.SIGINT, cleanup_service_files) + + +class ServerBootFail(RuntimeError): + pass + + +class _Proc(threading.Thread): + def __init__( + self, + service_name: str, + exec_args: Sequence[str], + timeout: int, + on_connection_info_received: Callable[ + [ErtServerConnectionInfo | Exception | None], None + ], + project: Path, + ) -> None: + super().__init__() + + self._shutdown = threading.Event() + + self._service_name = service_name + self._exec_args = exec_args + self._timeout = timeout + self._propagate_connection_info_from_childproc = on_connection_info_received + self._service_config_path = project / f"{self._service_name}_server.json" + + fd_read, fd_write = os.pipe() + self._comm_pipe = os.fdopen(fd_read) + + env = os.environ.copy() + env["ERT_COMM_FD"] = str(fd_write) + + SERVICE_CONF_PATHS.add(str(self._service_config_path)) + + # The process is waited for in _do_shutdown() + self._childproc = Popen( + self._exec_args, + pass_fds=(fd_write,), + env=env, + close_fds=True, + ) + os.close(fd_write) + + def run(self) -> None: + comm = self._read_connection_info_from_process(self._childproc) + + if comm is None: + self._propagate_connection_info_from_childproc(TimeoutError()) + return # _read_conn_info() has already cleaned up in this case + + conn_info: ErtServerConnectionInfo | Exception | None = None + try: + conn_info = json.loads(comm) + except json.JSONDecodeError: + conn_info = ServerBootFail() + except Exception as exc: + conn_info = exc + + try: + self._propagate_connection_info_from_childproc(conn_info) + + while True: + if self._childproc.poll() is not None: + break + if self._shutdown.wait(1): + self._do_shutdown() + break + + except Exception as e: + print(str(e)) + self.logger.exception(e) + + finally: + self._ensure_connection_info_file_is_deleted() + + def shutdown(self) -> int: + """Shutdown the server.""" + self._shutdown.set() + self.join() + + return self._childproc.returncode + + def _read_connection_info_from_process(self, proc: Popen[bytes]) -> str | None: + comm_buf = io.StringIO() + first_iter = True + while first_iter or proc.poll() is None: + first_iter = False + ready = select([self._comm_pipe], [], [], self._timeout) + + # Timeout reached, exit with a failure + if ready == ([], [], []): + self._do_shutdown() + self._ensure_connection_info_file_is_deleted() + return None + + x = self._comm_pipe.read(PIPE_BUF) + if not x: # EOF + break + comm_buf.write(x) + return comm_buf.getvalue() + + def _do_shutdown(self) -> None: + if self._childproc is None: + return + try: + self._childproc.terminate() + self._childproc.wait(10) # Give it 10s to shut down cleanly.. + except TimeoutExpired: + try: + self._childproc.kill() # ... then kick it harder... + self._childproc.wait(self._timeout) # ... and wait again + except TimeoutExpired: + self.logger.error( + f"waiting for child-process exceeded timeout {self._timeout}s" + ) + + def _ensure_connection_info_file_is_deleted(self) -> None: + """ + Ensure that the JSON connection information file is deleted + """ + with contextlib.suppress(OSError): + if self._service_config_path.exists(): + self._service_config_path.unlink() + + @property + def logger(self) -> logging.Logger: + return logging.getLogger("ert.shared.storage") + class ErtServerContext: def __init__(self, service: ErtServer) -> None: diff --git a/src/everest/detached/everserver.py b/src/everest/detached/everserver.py index a28d5fb4196..eee6dda6982 100644 --- a/src/everest/detached/everserver.py +++ b/src/everest/detached/everserver.py @@ -13,7 +13,7 @@ from ert.plugins.plugin_manager import ErtPluginManager from ert.services import ErtServer -from ert.services._base_service import BaseServiceExit +from ert.services.ert_server import BaseServiceExit from ert.storage import ExperimentStatus from ert.storage.local_experiment import ExperimentState from ert.trace import tracer diff --git a/tests/ert/unit_tests/services/test_base_service.py b/tests/ert/unit_tests/services/test_base_service.py index cc7c18b4666..b7473585842 100644 --- a/tests/ert/unit_tests/services/test_base_service.py +++ b/tests/ert/unit_tests/services/test_base_service.py @@ -9,11 +9,13 @@ import pytest from ert.services._base_service import ( - SERVICE_CONF_PATHS, BaseService, + local_exec_args, +) +from ert.services.ert_server import ( + SERVICE_CONF_PATHS, ServerBootFail, cleanup_service_files, - local_exec_args, ) From cc25539b4efcd0c2d7dce434ed50f652311ecc96 Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Tue, 6 Jan 2026 09:27:46 +0100 Subject: [PATCH 2/8] Pass through timeout to _Proc Should not be hardcoded to 120 --- src/ert/services/ert_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ert/services/ert_server.py b/src/ert/services/ert_server.py index ea1303d0227..551e828d6d8 100644 --- a/src/ert/services/ert_server.py +++ b/src/ert/services/ert_server.py @@ -251,7 +251,7 @@ def __init__( self._thread_that_starts_server_process = _Proc( service_name="storage", exec_args=run_storage_main_cmd, - timeout=120, + timeout=timeout, on_connection_info_received=self.on_connection_info_received_from_server_process, project=Path(self._storage_path), ) From ddfe0c6fe603c32ff6f92c251837e9a194d57124 Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Tue, 6 Jan 2026 09:42:42 +0100 Subject: [PATCH 3/8] Specify some file paths as globals For clarity and easier testing, prefer singleton global string var over in-place hardcoding of strings. --- src/ert/services/ert_server.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/ert/services/ert_server.py b/src/ert/services/ert_server.py index 551e828d6d8..28f67fd1c7c 100644 --- a/src/ert/services/ert_server.py +++ b/src/ert/services/ert_server.py @@ -179,6 +179,10 @@ def logger(self) -> logging.Logger: return logging.getLogger("ert.shared.storage") +_ERT_SERVER_CONNECTION_INFO_FILE = "storage_server.json" +_ERT_SERVER_EXECUTABLE_FILE = str(Path(__file__).parent / "_storage_main.py") + + class ErtServerContext: def __init__(self, service: ErtServer) -> None: self._service = service @@ -230,7 +234,7 @@ def __init__( run_storage_main_cmd = [ sys.executable, - str(Path(__file__).parent / "_storage_main.py"), + _ERT_SERVER_EXECUTABLE_FILE, "--project", storage_path, ] @@ -378,12 +382,12 @@ def connect( timeout = 240 t = -1 while t < timeout: - storage_server_path = path / "storage_server.json" + storage_server_path = path / _ERT_SERVER_CONNECTION_INFO_FILE if ( storage_server_path.exists() and storage_server_path.stat().st_size > 0 ): - with (path / "storage_server.json").open() as f: + with (path / _ERT_SERVER_CONNECTION_INFO_FILE).open() as f: storage_server_content = json.load(f) return ErtServer( @@ -437,9 +441,9 @@ def on_connection_info_received_from_server_process( if self._storage_path is not None: if not Path(self._storage_path).exists(): raise RuntimeError(f"No storage exists at : {self._storage_path}") - path = f"{self._storage_path}/storage_server.json" + path = f"{self._storage_path}/{_ERT_SERVER_CONNECTION_INFO_FILE}" else: - path = "storage_server.json" + path = _ERT_SERVER_CONNECTION_INFO_FILE if isinstance(info, Mapping): with NamedTemporaryFile(dir=f"{self._storage_path}", delete=False) as f: From 52b9c0eeeb5660e6715ecd35ac828d495d2f41ba Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Tue, 6 Jan 2026 09:48:14 +0100 Subject: [PATCH 4/8] Adapt base service tests to cover ErtServer To not lose test coverage when removing BaseService --- .../unit_tests/services/test_base_service.py | 64 ++++++++++--------- 1 file changed, 35 insertions(+), 29 deletions(-) diff --git a/tests/ert/unit_tests/services/test_base_service.py b/tests/ert/unit_tests/services/test_base_service.py index b7473585842..1439137716f 100644 --- a/tests/ert/unit_tests/services/test_base_service.py +++ b/tests/ert/unit_tests/services/test_base_service.py @@ -8,27 +8,29 @@ import pytest +from ert.services import ert_server from ert.services._base_service import ( - BaseService, local_exec_args, ) from ert.services.ert_server import ( + _ERT_SERVER_CONNECTION_INFO_FILE, SERVICE_CONF_PATHS, + ErtServer, ServerBootFail, cleanup_service_files, ) -class _DummyService(BaseService): +class _DummyService(ErtServer): service_name = "dummy" - def __init__(self, exec_args, *args, **kwargs) -> None: - super().__init__(exec_args=exec_args, timeout=10, *args, **kwargs) # noqa: B026 + def __init__(self, *args, **kwargs) -> None: + super().__init__(*args, **({"storage_path": ".", "timeout": 10} | kwargs)) def start(self): """Helper function for non-singleton testing""" - assert self._proc is not None - self._proc.start() + assert self._thread_that_starts_server_process is not None + self._thread_that_starts_server_process.start() def join(self): """Helper function for non-singleton testing""" @@ -64,8 +66,9 @@ def server_script(monkeypatch, tmp_path: Path, request): @pytest.fixture -def server(server_script): - proc = _DummyService([str(server_script)]) +def server(monkeypatch, server_script): + monkeypatch.setattr(ert_server, "_ERT_SERVER_EXECUTABLE_FILE", server_script) + proc = _DummyService() proc.start() yield proc proc.shutdown() @@ -95,8 +98,8 @@ def test_shutdown_after_finish(server): """ ) def test_info_slow(server): - # fetch_conn_info() should block until this value is available - assert server.fetch_conn_info() == {"authtoken": "test123", "urls": ["url"]} + # fetch_connection_info() should block until this value is available + assert server.fetch_connection_info() == {"authtoken": "test123", "urls": ["url"]} @pytest.mark.script( @@ -106,7 +109,7 @@ def test_info_slow(server): ) def test_authtoken_wrong_json(server): with pytest.raises(ServerBootFail): - server.fetch_conn_info() + server.fetch_connection_info() @pytest.mark.script( @@ -118,9 +121,9 @@ def test_authtoken_wrong_json(server): """ ) def test_long_lived(server, tmp_path): - assert server.fetch_conn_info() == {"authtoken": "test123", "urls": ["url"]} + assert server.fetch_connection_info() == {"authtoken": "test123", "urls": ["url"]} assert server.shutdown() == -signal.SIGTERM - assert not (tmp_path / "dummy_server.json").exists() + assert not (tmp_path / _ERT_SERVER_CONNECTION_INFO_FILE).exists() @pytest.mark.integration_test @@ -133,7 +136,7 @@ def test_long_lived(server, tmp_path): def test_not_respond(server): server._timeout = 1 with pytest.raises(TimeoutError): - server.fetch_conn_info() + server.fetch_connection_info() assert server.shutdown() == -signal.SIGTERM @@ -144,7 +147,7 @@ def test_not_respond(server): ) def test_authtoken_fail(server): with pytest.raises(ServerBootFail): - server.fetch_conn_info() + server.fetch_connection_info() @pytest.mark.script( @@ -155,9 +158,9 @@ def test_authtoken_fail(server): """ ) def test_json_created(server): - server.fetch_conn_info() # wait for it to start + server.fetch_connection_info() # wait for it to start - assert Path("dummy_server.json").read_text(encoding="utf-8") + assert Path(_ERT_SERVER_CONNECTION_INFO_FILE).read_text(encoding="utf-8") @pytest.mark.integration_test @@ -172,10 +175,10 @@ def test_json_deleted(server): _BaseService is responsible for deleting the JSON file after the subprocess is finished running. """ - server.fetch_conn_info() # wait for it to start + server.fetch_connection_info() # wait for it to start time.sleep(2) # ensure subprocess is done before calling shutdown() - assert not os.path.exists("dummy_server.json") + assert not os.path.exists(_ERT_SERVER_CONNECTION_INFO_FILE) @pytest.mark.script( @@ -185,12 +188,13 @@ def test_json_deleted(server): time.sleep(10) # ensure "server" doesn't exit before test """ ) -def test_singleton_start(server_script, tmp_path): - with _DummyService.start_server(exec_args=[str(server_script)]) as service: +def test_singleton_start(monkeypatch, server_script, tmp_path): + monkeypatch.setattr(ert_server, "_ERT_SERVER_EXECUTABLE_FILE", server_script) + with _DummyService.start_server(".", timeout=10) as service: assert service.wait_until_ready() - assert (tmp_path / "dummy_server.json").exists() + assert (tmp_path / _ERT_SERVER_CONNECTION_INFO_FILE).exists() - assert not (tmp_path / "dummy_server.json").exists() + assert not (tmp_path / _ERT_SERVER_CONNECTION_INFO_FILE).exists() @pytest.mark.integration_test @@ -201,8 +205,9 @@ def test_singleton_start(server_script, tmp_path): os.close(fd) """ ) -def test_singleton_connect(tmp_path, server_script): - with _DummyService.start_server(exec_args=[str(server_script)]) as server: +def test_singleton_connect(monkeypatch, tmp_path, server_script): + monkeypatch.setattr(ert_server, "_ERT_SERVER_EXECUTABLE_FILE", server_script) + with _DummyService().start_server(".", timeout=10) as server: client = _DummyService.connect(project=tmp_path, timeout=30) assert server is client @@ -215,7 +220,7 @@ def test_singleton_connect(tmp_path, server_script): time.sleep(10) # ensure "server" doesn't exit before test """ ) -def test_singleton_connect_early(server_script, tmp_path): +def test_singleton_connect_early(server_script, tmp_path, monkeypatch): """ Tests that a connection can be attempted even if it's started _before_ the server exists @@ -238,16 +243,17 @@ def run(self): client_thread.start() start_event.wait() # Client thread has started - with _DummyService.start_server(exec_args=[str(server_script)]) as server: + monkeypatch.setattr(ert_server, "_ERT_SERVER_EXECUTABLE_FILE", server_script) + with _DummyService.start_server(".") as server: ready_event.wait() # Client thread has connected to server assert not getattr(client_thread, "exception", None), ( f"Exception from connect: {client_thread.exception}" ) client = client_thread.client assert client is not server - assert client.fetch_conn_info() == server.fetch_conn_info() + assert client.fetch_connection_info() == server.fetch_connection_info() - assert not (tmp_path / "dummy_server.json").exists() + assert not (tmp_path / _ERT_SERVER_CONNECTION_INFO_FILE).exists() @pytest.mark.parametrize( From ea5ff4c8f270e59df97d472d533d0a96b48882a2 Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Tue, 6 Jan 2026 09:53:49 +0100 Subject: [PATCH 5/8] Rename test_base_service.py to test_ert_server.py --- .../services/{test_base_service.py => test_ert_server.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/ert/unit_tests/services/{test_base_service.py => test_ert_server.py} (100%) diff --git a/tests/ert/unit_tests/services/test_base_service.py b/tests/ert/unit_tests/services/test_ert_server.py similarity index 100% rename from tests/ert/unit_tests/services/test_base_service.py rename to tests/ert/unit_tests/services/test_ert_server.py From f96b3526dd1edd7120782fba4d87e42657c09281 Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Tue, 6 Jan 2026 10:07:30 +0100 Subject: [PATCH 6/8] Move local_exec_args to test file --- src/ert/services/_base_service.py | 22 ------------- .../unit_tests/services/test_ert_server.py | 33 +++++++++++++++++-- 2 files changed, 30 insertions(+), 25 deletions(-) diff --git a/src/ert/services/_base_service.py b/src/ert/services/_base_service.py index 48c146b1596..66f83eaa716 100644 --- a/src/ert/services/_base_service.py +++ b/src/ert/services/_base_service.py @@ -7,7 +7,6 @@ import json import os -import sys import threading import types from collections.abc import Mapping, Sequence @@ -25,27 +24,6 @@ T = TypeVar("T", bound="BaseService") -def local_exec_args(script_args: str | list[str]) -> list[str]: - """ - Convenience function that returns the exec_args for executing a Python - script in the directory of '_base_service.py'. - - This is done instead of using 'python -m [module path]' due to the '-m' flag - adding the user's current working directory to sys.path. Executing a Python - script by itself will add the directory of the script rather than the - current working directory, thus we avoid accidentally importing user's - directories that just happen to have the same names as the ones we use. - """ - if isinstance(script_args, str): - script = script_args - rest: list[str] = [] - else: - script = script_args[0] - rest = script_args[1:] - script = f"_{script}_main.py" - return [sys.executable, str(Path(__file__).parent / script), *rest] - - class _Context(Generic[T]): def __init__(self, service: T) -> None: self._service = service diff --git a/tests/ert/unit_tests/services/test_ert_server.py b/tests/ert/unit_tests/services/test_ert_server.py index 1439137716f..5d179b1886a 100644 --- a/tests/ert/unit_tests/services/test_ert_server.py +++ b/tests/ert/unit_tests/services/test_ert_server.py @@ -1,3 +1,4 @@ +import importlib import os import signal import sys @@ -9,9 +10,6 @@ import pytest from ert.services import ert_server -from ert.services._base_service import ( - local_exec_args, -) from ert.services.ert_server import ( _ERT_SERVER_CONNECTION_INFO_FILE, SERVICE_CONF_PATHS, @@ -21,6 +19,35 @@ ) +def local_exec_args(script_args: str | list[str]) -> list[str]: + """ + Convenience function that returns the exec_args for executing a Python + script in the directory of '_base_service.py'. + + This is done instead of using 'python -m [module path]' due to the '-m' flag + adding the user's current working directory to sys.path. Executing a Python + script by itself will add the directory of the script rather than the + current working directory, thus we avoid accidentally importing user's + directories that just happen to have the same names as the ones we use. + """ + if isinstance(script_args, str): + script = script_args + rest: list[str] = [] + else: + script = script_args[0] + rest = script_args[1:] + script = f"_{script}_main.py" + + services_spec = importlib.util.find_spec("ert.services") + if services_spec is None or services_spec.origin is None: + raise RuntimeError("Cannot find module ert.services") + + # PS: origin points to the __init__.py file + services_folder = Path(services_spec.origin).parent + + return [sys.executable, str(services_folder / script), *rest] + + class _DummyService(ErtServer): service_name = "dummy" From fcfeace06b8c5440fcf2503fd78280cec2057313 Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Wed, 7 Jan 2026 10:45:00 +0100 Subject: [PATCH 7/8] Remove WebvizErt and BaseService --- src/ert/__main__.py | 68 +----- src/ert/dark_storage/compute/__init__.py | 0 src/ert/dark_storage/compute/misfits.py | 42 ---- src/ert/dark_storage/endpoints/__init__.py | 2 - .../endpoints/compute/__init__.py | 0 .../dark_storage/endpoints/compute/misfits.py | 95 -------- src/ert/dark_storage/endpoints/experiments.py | 3 - .../dark_storage/json_schema/experiment.py | 1 - src/ert/services/__init__.py | 3 +- src/ert/services/_base_service.py | 204 ------------------ src/ert/services/_storage_main.py | 2 +- src/ert/services/webviz_ert_service.py | 20 -- src/ert/shared/storage/extraction.py | 42 ---- .../dark_storage/test_http_endpoints.py | 23 -- .../unit_tests/services/test_ert_server.py | 2 +- tests/ert/unit_tests/storage/conftest.py | 3 - 16 files changed, 9 insertions(+), 501 deletions(-) delete mode 100644 src/ert/dark_storage/compute/__init__.py delete mode 100644 src/ert/dark_storage/endpoints/compute/__init__.py delete mode 100644 src/ert/dark_storage/endpoints/compute/misfits.py delete mode 100644 src/ert/services/_base_service.py delete mode 100644 src/ert/services/webviz_ert_service.py delete mode 100644 src/ert/shared/storage/extraction.py diff --git a/src/ert/__main__.py b/src/ert/__main__.py index 6523a3ec2d8..24c6644f862 100644 --- a/src/ert/__main__.py +++ b/src/ert/__main__.py @@ -36,7 +36,7 @@ from ert.namespace import Namespace from ert.plugins import ErtRuntimePlugins, get_site_plugins, setup_site_logging from ert.run_models.multiple_data_assimilation import MultipleDataAssimilationConfig -from ert.services import ErtServer, WebvizErt +from ert.services import ErtServer from ert.services._storage_main import add_parser_options as ert_api_add_parser_options from ert.shared.status.utils import get_ert_memory_usage from ert.storage import ErtStorageException, ErtStoragePermissionError @@ -63,67 +63,24 @@ def run_ert_storage(args: Namespace, _: ErtRuntimePlugins | None = None) -> None def run_webviz_ert(args: Namespace, _: ErtRuntimePlugins | None = None) -> None: - try: - import webviz_ert # type: ignore # noqa - except ImportError as err: - raise ValueError( - "Running `ert vis` requires that webviz_ert is installed" - ) from err - - kwargs: dict[str, Any] = {"verbose": args.verbose} - ert_config = ErtConfig.with_plugins(get_site_plugins()).from_file(args.config) - - os.chdir(ert_config.config_path) - ens_path = ert_config.ens_path - - # Changing current working directory means we need to - # only use the base name of the config file path - kwargs["ert_config"] = os.path.basename(args.config) - kwargs["project"] = os.path.abspath(ens_path) - yellow = "\x1b[33m" green = "\x1b[32m" bold = "\x1b[1m" reset = "\x1b[0m" - try: - with ErtServer.init_service(project=Path(ens_path).absolute()) as storage: - storage.wait_until_ready() - print( - f""" + print( + f""" --------------------------------------------------------------- - {yellow}{bold}Webviz-ERT is deprecated and will be removed in the near future{reset} + {yellow}{bold}Webviz-ERT is removed. {green}{bold}Plotting capabilities provided by Webviz-ERT are now available using the ERT plotter{reset} - --------------------------------------------------------------- - - Starting up Webviz-ERT. This might take more than a minute. - --------------------------------------------------------------- """ - ) - logger.info("Show Webviz-ert deprecation warning") - webviz_kwargs = { - "experimental_mode": args.experimental_mode, - "verbose": args.verbose, - "title": kwargs.get("ert_config", "ERT - Visualization tool"), - "project": kwargs.get("project", os.getcwd()), - } - with WebvizErt.start_server(**webviz_kwargs) as webviz_ert_server: - webviz_ert_server.wait() - except PermissionError as pe: - print(f"Error: {pe}", file=sys.stderr) - print( - "Cannot start or connect to storage service due to permission issues.", - file=sys.stderr, - ) - print( - "This is most likely due to another user starting ERT using this storage", - file=sys.stderr, - ) + ) + logger.info("Show Webviz-ert removal warning") def strip_error_message_and_raise_exception(validated: ValidationStatus) -> None: @@ -317,19 +274,6 @@ def get_ert_parser(parser: ArgumentParser | None = None) -> ArgumentParser: ert_api_parser.set_defaults(func=run_ert_storage) ert_api_add_parser_options(ert_api_parser) - ert_vis_parser = subparsers.add_parser( - "vis", - description="Launch webviz-driven visualization tool.", - ) - ert_vis_parser.set_defaults(func=run_webviz_ert) - ert_vis_parser.add_argument("--name", "-n", type=str, default="Webviz-ERT") - ert_vis_parser.add_argument( - "--experimental-mode", - action="store_true", - help="Feature flag for enabling experimental plugins", - ) - ert_api_add_parser_options(ert_vis_parser) # ert vis shares args with ert api - # test_run_parser test_run_description = f"Run '{TEST_RUN_MODE}' in cli" test_run_parser = subparsers.add_parser( diff --git a/src/ert/dark_storage/compute/__init__.py b/src/ert/dark_storage/compute/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/src/ert/dark_storage/compute/misfits.py b/src/ert/dark_storage/compute/misfits.py index ad09229cfa0..e69de29bb2d 100644 --- a/src/ert/dark_storage/compute/misfits.py +++ b/src/ert/dark_storage/compute/misfits.py @@ -1,42 +0,0 @@ -from collections.abc import Mapping - -import numpy as np -import numpy.typing as npt -import pandas as pd - - -def _calculate_signed_chi_squared_misfit( - obs_value: npt.NDArray[np.float64], - response_value: npt.NDArray[np.float64], - obs_std: npt.NDArray[np.float64], -) -> list[float]: - """The signed version is intended for visualization. For data assimiliation one - would normally use the normal chi-square""" - residual = response_value - obs_value - return (np.sign(residual) * residual * residual / (obs_std * obs_std)).tolist() - - -def calculate_signed_chi_squared_misfits( - reponses_dict: Mapping[int, pd.DataFrame], - observation: pd.DataFrame, - summary_misfits: bool = False, -) -> pd.DataFrame: - """ - Compute misfits from reponses_dict (real_id, values in dataframe) - and observation - """ - misfits_dict = {} - for realization_index in reponses_dict: - misfits_dict[realization_index] = _calculate_signed_chi_squared_misfit( - observation["values"], - reponses_dict[realization_index] - .loc[:, observation.index] - .to_numpy() - .flatten(), - observation["errors"], - ) - - df = pd.DataFrame(data=misfits_dict, index=observation.index) - if summary_misfits: - df = pd.DataFrame([df.abs().sum(axis=0)], columns=df.columns, index=[0]) - return df.T diff --git a/src/ert/dark_storage/endpoints/__init__.py b/src/ert/dark_storage/endpoints/__init__.py index 7c031abef5f..19b2fd07070 100644 --- a/src/ert/dark_storage/endpoints/__init__.py +++ b/src/ert/dark_storage/endpoints/__init__.py @@ -1,6 +1,5 @@ from fastapi import APIRouter -from .compute.misfits import router as misfits_router from .ensembles import router as ensembles_router from .experiment_server import router as experiment_server_router from .experiments import router as experiments_router @@ -15,7 +14,6 @@ router.include_router(ensembles_router) router.include_router(observations_router) router.include_router(updates_router) -router.include_router(misfits_router) router.include_router(parameters_router) router.include_router(responses_router) router.include_router(experiment_server_router) diff --git a/src/ert/dark_storage/endpoints/compute/__init__.py b/src/ert/dark_storage/endpoints/compute/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/src/ert/dark_storage/endpoints/compute/misfits.py b/src/ert/dark_storage/endpoints/compute/misfits.py deleted file mode 100644 index 1ba384240fb..00000000000 --- a/src/ert/dark_storage/endpoints/compute/misfits.py +++ /dev/null @@ -1,95 +0,0 @@ -import json -from datetime import datetime -from typing import Annotated, Any -from uuid import UUID - -import pandas as pd -from dateutil.parser import parse -from fastapi import APIRouter, Depends, Query, status -from fastapi.responses import Response - -from ert.dark_storage import exceptions as exc -from ert.dark_storage.common import get_storage -from ert.dark_storage.compute.misfits import calculate_signed_chi_squared_misfits -from ert.dark_storage.endpoints.observations import ( - _get_observations, -) -from ert.dark_storage.endpoints.responses import data_for_response -from ert.storage import Storage - -router = APIRouter(tags=["misfits"]) -DEFAULT_STORAGEREADER = Depends(get_storage) - - -@router.get( - "/compute/misfits", - responses={ - status.HTTP_200_OK: { - "content": {"text/csv": {}}, - } - }, -) -async def get_response_misfits( - *, - storage: Storage = DEFAULT_STORAGEREADER, - ensemble_id: UUID, - response_name: str, - realization_index: int | None = None, - summary_misfits: bool = False, - filter_on: Annotated[ - str | None, Query(description="JSON string with filters") - ] = None, -) -> Response: - ensemble = storage.get_ensemble(ensemble_id) - dataframe = data_for_response( - ensemble, - response_name, - json.loads(filter_on) if filter_on is not None else None, - ) - if realization_index is not None: - dataframe = pd.DataFrame(dataframe.loc[realization_index]).T - - response_dict = {} - for index, data in dataframe.iterrows(): - data_df = pd.DataFrame(data).T - response_dict[index] = data_df - - experiment = ensemble.experiment - response_type = experiment.response_key_to_response_type[response_name] - obs_keys = experiment.response_key_to_observation_key[response_type].get( - response_name, [] - ) - obs = _get_observations( - ensemble.experiment, - obs_keys, - json.loads(filter_on) if filter_on is not None else None, - ) - - if not obs_keys: - raise ValueError(f"No observations for key {response_name}") - if not obs: - raise ValueError(f"Cant fetch observations for key {response_name}") - o = obs[0] - - def parse_index(x: Any) -> int | datetime: - try: - return int(x) - except ValueError: - return parse(x) - - observation_df = pd.DataFrame( - data={"values": o["values"], "errors": o["errors"]}, - index=[parse_index(x) for x in o["x_axis"]], - ) - try: - result_df = calculate_signed_chi_squared_misfits( - response_dict, observation_df, summary_misfits - ) - except Exception as misfits_exc: - raise exc.UnprocessableError( - f"Unable to compute misfits: {misfits_exc}" - ) from misfits_exc - return Response( - content=result_df.to_csv().encode(), - media_type="text/csv", - ) diff --git a/src/ert/dark_storage/endpoints/experiments.py b/src/ert/dark_storage/endpoints/experiments.py index 21ce7f4adf3..ea119cc54a5 100644 --- a/src/ert/dark_storage/endpoints/experiments.py +++ b/src/ert/dark_storage/endpoints/experiments.py @@ -6,7 +6,6 @@ from ert.config import SurfaceConfig from ert.dark_storage import json_schema as js from ert.dark_storage.common import get_storage -from ert.shared.storage.extraction import create_priors from ert.storage import Storage router = APIRouter(tags=["experiment"]) @@ -26,7 +25,6 @@ def get_experiments( id=experiment.id, name=experiment.name, ensemble_ids=[ens.id for ens in experiment.ensembles], - priors=create_priors(experiment), userdata={}, parameters={ group: config.model_dump() @@ -62,7 +60,6 @@ def get_experiment_by_id( name=experiment.name, id=experiment.id, ensemble_ids=[ens.id for ens in experiment.ensembles], - priors=create_priors(experiment), userdata={}, parameters={ group: config.model_dump() diff --git a/src/ert/dark_storage/json_schema/experiment.py b/src/ert/dark_storage/json_schema/experiment.py index ccd6a5a2073..cbab9cfa04d 100644 --- a/src/ert/dark_storage/json_schema/experiment.py +++ b/src/ert/dark_storage/json_schema/experiment.py @@ -22,7 +22,6 @@ class ExperimentIn(_Experiment): class ExperimentOut(_Experiment): id: UUID ensemble_ids: list[UUID] - priors: Mapping[str, dict[str, Any]] userdata: Mapping[str, Any] parameters: Mapping[str, dict[str, Any]] responses: Mapping[str, list[dict[str, Any]]] diff --git a/src/ert/services/__init__.py b/src/ert/services/__init__.py index e37b7d6c765..7126d4e3210 100644 --- a/src/ert/services/__init__.py +++ b/src/ert/services/__init__.py @@ -1,4 +1,3 @@ from .ert_server import BaseServiceExit, ErtServer, ServerBootFail -from .webviz_ert_service import WebvizErt -__all__ = ["BaseServiceExit", "ErtServer", "ServerBootFail", "WebvizErt"] +__all__ = ["BaseServiceExit", "ErtServer", "ServerBootFail"] diff --git a/src/ert/services/_base_service.py b/src/ert/services/_base_service.py deleted file mode 100644 index 66f83eaa716..00000000000 --- a/src/ert/services/_base_service.py +++ /dev/null @@ -1,204 +0,0 @@ -""" -This file contains a more generic version of "ert services", and -is scheduled for removal when WebvizErt is removed. -""" - -from __future__ import annotations - -import json -import os -import threading -import types -from collections.abc import Mapping, Sequence -from logging import Logger, getLogger -from pathlib import Path -from tempfile import NamedTemporaryFile -from time import sleep -from typing import TYPE_CHECKING, Any, Generic, Self, TypeVar - -from ert.services.ert_server import ErtServerConnectionInfo, _Proc - -if TYPE_CHECKING: - pass - -T = TypeVar("T", bound="BaseService") - - -class _Context(Generic[T]): - def __init__(self, service: T) -> None: - self._service = service - - def __enter__(self) -> T: - return self._service - - def __exit__( - self, - exc_type: type[BaseException] | None, - exc_value: BaseException | None, - traceback: types.TracebackType | None, - ) -> bool: - self._service.shutdown() - return exc_type is None - - -class BaseService: - """ - BaseService provides a block-only-when-needed mechanism for starting and - maintaining services as subprocesses. - - This is achieved by using a POSIX communication pipe, over which the service - can communicate that it has started. The contents of the communication is - also written to a file inside of the ERT storage directory. - - The service itself can implement the other side of the pipe as such:: - - import os - - # ... perform initialisation ... - - # BaseService provides this environment variable with the pipe's FD - comm_fd = os.environ["ERT_COMM_FD"] - - # Open the pipe with Python's IO classes for ease of use - with os.fdopen(comm_fd, "wb") as comm: - # Write JSON over the pipe, which will be interpreted by a subclass - # of BaseService on ERT's side - comm.write('{"some": "json"}') - - # The pipe is flushed and closed here. This tells BaseService that - # initialisation is finished and it will try to read the JSON data. - """ - - _instance: BaseService | None = None - - def __init__( - self, - exec_args: Sequence[str] = (), - timeout: int = 120, - conn_info: ErtServerConnectionInfo | Exception | None = None, - project: str | None = None, - ) -> None: - self._exec_args = exec_args - self._timeout = timeout - - self._proc: _Proc | None = None - self._conn_info: ErtServerConnectionInfo | Exception | None = conn_info - self._conn_info_event = threading.Event() - self._project = Path(project) if project is not None else Path.cwd() - - # Flag that we have connection information - if self._conn_info: - self._conn_info_event.set() - else: - self._proc = _Proc( - self.service_name, exec_args, timeout, self.set_conn_info, self._project - ) - - @classmethod - def start_server(cls, *args: Any, **kwargs: Any) -> _Context[Self]: - if cls._instance is not None: - raise RuntimeError("Server already running") - cls._instance = obj = cls(*args, **kwargs) - if obj._proc is not None: - obj._proc.start() - return _Context(obj) - - @classmethod - def connect( - cls, - *, - project: os.PathLike[str], - timeout: int | None = None, - ) -> Self: - if cls._instance is not None: - cls._instance.wait_until_ready() - assert isinstance(cls._instance, cls) - return cls._instance - - path = Path(project) - name = f"{cls.service_name}_server.json" - # Note: If the caller actually pass None, we override that here... - if timeout is None: - timeout = 240 - t = -1 - while t < timeout: - if (path / name).exists(): - with (path / name).open() as f: - return cls((), conn_info=json.load(f), project=str(path)) - - sleep(1) - t += 1 - - raise TimeoutError("Server not started") - - def wait_until_ready(self, timeout: int | None = None) -> bool: - if timeout is None: - timeout = self._timeout - - if self._conn_info_event.wait(timeout): - return not ( - self._conn_info is None or isinstance(self._conn_info, Exception) - ) - if isinstance(self._conn_info, TimeoutError): - self.logger.critical(f"startup exceeded defined timeout {timeout}s") - return False # Timeout reached - - def wait(self) -> None: - if self._proc is not None: - self._proc.join() - - def set_conn_info(self, info: ErtServerConnectionInfo | Exception | None) -> None: - if self._conn_info is not None: - raise ValueError("Connection information already set") - if info is None: - raise ValueError - self._conn_info = info - - if self._project is not None: - if not Path(self._project).exists(): - raise RuntimeError(f"No storage exists at : {self._project}") - path = f"{self._project}/{self.service_name}_server.json" - else: - path = f"{self.service_name}_server.json" - - if isinstance(info, Mapping): - with NamedTemporaryFile(dir=f"{self._project}", delete=False) as f: - f.write(json.dumps(info, indent=4).encode("utf-8")) - f.flush() - os.rename(f.name, path) - - self._conn_info_event.set() - - def fetch_conn_info(self) -> Mapping[str, Any]: - is_ready = self.wait_until_ready(self._timeout) - if isinstance(self._conn_info, Exception): - raise self._conn_info - if not is_ready: - raise TimeoutError - if self._conn_info is None: - raise ValueError("conn_info is None") - return self._conn_info - - def shutdown(self) -> int: - """Shutdown the server.""" - if self._proc is None: - return -1 - self.__class__._instance = None - proc, self._proc = self._proc, None - return proc.shutdown() - - @property - def service_name(self) -> str: - """ - Subclass should return the name of the service, eg 'storage' for ERT Storage. - Used for identifying the server information JSON file. - """ - raise NotImplementedError - - @property - def logger(self) -> Logger: - return getLogger(f"ert.shared.{self.service_name}") - - @property - def _service_file(self) -> str: - return f"{self.service_name}_server.json" diff --git a/src/ert/services/_storage_main.py b/src/ert/services/_storage_main.py index 058f2404e3b..c8d4aad1cb0 100644 --- a/src/ert/services/_storage_main.py +++ b/src/ert/services/_storage_main.py @@ -260,7 +260,7 @@ def _join_terminate_thread(terminate_on_parent_death_thread: threading.Thread) - logger = logging.getLogger("ert.shared.storage.info") logger.info( "Got BaseServiceExit while joining terminate thread, " - "as expected from _base_service.py" + "as expected from ert_server.py" ) diff --git a/src/ert/services/webviz_ert_service.py b/src/ert/services/webviz_ert_service.py deleted file mode 100644 index b10f9c237c8..00000000000 --- a/src/ert/services/webviz_ert_service.py +++ /dev/null @@ -1,20 +0,0 @@ -import sys -from typing import Any - -from ert.services._base_service import BaseService - - -class WebvizErt(BaseService): - service_name = "webviz-ert" - - def __init__(self, **kwargs: Any) -> None: - exec_args = [sys.executable, "-m", "webviz_ert"] - if kwargs.get("experimental_mode"): - exec_args.append("--experimental-mode") - if kwargs.get("verbose"): - exec_args.append("--verbose") - exec_args.extend(["--title", str(kwargs.get("title"))]) - project = kwargs.get("project") - exec_args.extend(["--project_identifier", str(project)]) - - super().__init__(exec_args, project=project) diff --git a/src/ert/shared/storage/extraction.py b/src/ert/shared/storage/extraction.py deleted file mode 100644 index 7c8c92cbb90..00000000000 --- a/src/ert/shared/storage/extraction.py +++ /dev/null @@ -1,42 +0,0 @@ -from __future__ import annotations - -from collections.abc import Mapping - -from ert.config import GenKwConfig -from ert.storage import Experiment - -_PRIOR_NAME_MAP = { - "NORMAL": "normal", - "LOGNORMAL": "lognormal", - "TRIANGULAR": "trig", - "TRUNCATED_NORMAL": "ert_truncnormal", - "CONST": "const", - "UNIFORM": "uniform", - "LOGUNIF": "loguniform", - "DUNIF": "ert_duniform", - "RAW": "stdnormal", - "ERRF": "ert_erf", - "DERRF": "ert_derf", -} - - -def create_priors( - experiment: Experiment, -) -> Mapping[str, dict[str, str | float]]: - priors_dict = {} - - for param in experiment.parameter_configuration.values(): - if isinstance(param, GenKwConfig): - prior: dict[str, str | float] = { - "function": _PRIOR_NAME_MAP[param.distribution.name.upper()], - **param.distribution.model_dump(exclude={"name"}), - } - # webviz-ert expects some variables names - if param.distribution.name == "triangular": - mapping = {"min": "_xmin", "max": "xmax", "mode": "xmode"} - else: - mapping = {"min": "_min", "max": "_max"} - prior = {mapping.get(k, k): v for k, v in prior.items()} - priors_dict[f"{param.group}:{param.name}"] = prior - - return priors_dict diff --git a/tests/ert/unit_tests/dark_storage/test_http_endpoints.py b/tests/ert/unit_tests/dark_storage/test_http_endpoints.py index aa340a88da1..5cc949dddd5 100644 --- a/tests/ert/unit_tests/dark_storage/test_http_endpoints.py +++ b/tests/ert/unit_tests/dark_storage/test_http_endpoints.py @@ -4,7 +4,6 @@ import pandas as pd import pytest -from numpy.testing import assert_array_equal from requests import Response from ert.dark_storage.common import get_storage_api_version @@ -273,28 +272,6 @@ def test_get_record_observations(poly_example_tmp_dir, dark_storage_client): assert len(response_json[0]["x_axis"]) == 5 -@pytest.mark.integration_test -def test_misfit_endpoint(poly_example_tmp_dir, dark_storage_client): - resp: Response = dark_storage_client.get("/experiments") - experiment_json = resp.json() - ensemble_id = experiment_json[0]["ensemble_ids"][0] - - resp: Response = dark_storage_client.get( - "/compute/misfits", - params={ - "filter_on": json.dumps({"report_step": 0}), - "ensemble_id": ensemble_id, - "response_name": "POLY_RES", - }, - headers={"accept": "text/csv"}, - ) - stream = io.BytesIO(resp.content) - misfit = pd.read_csv(stream, index_col=0, float_precision="round_trip") - - assert_array_equal(misfit.columns, ["0", "2", "4", "6", "8"]) - assert misfit.shape == (3, 5) - - @pytest.mark.integration_test @pytest.mark.parametrize( "coeffs", diff --git a/tests/ert/unit_tests/services/test_ert_server.py b/tests/ert/unit_tests/services/test_ert_server.py index 5d179b1886a..64839984d26 100644 --- a/tests/ert/unit_tests/services/test_ert_server.py +++ b/tests/ert/unit_tests/services/test_ert_server.py @@ -22,7 +22,7 @@ def local_exec_args(script_args: str | list[str]) -> list[str]: """ Convenience function that returns the exec_args for executing a Python - script in the directory of '_base_service.py'. + script in the directory of old '_base_service.py'. This is done instead of using 'python -m [module path]' due to the '-m' flag adding the user's current working directory to sys.path. Executing a Python diff --git a/tests/ert/unit_tests/storage/conftest.py b/tests/ert/unit_tests/storage/conftest.py index 796cd68d7cb..bd6bdccb7ac 100644 --- a/tests/ert/unit_tests/storage/conftest.py +++ b/tests/ert/unit_tests/storage/conftest.py @@ -1,7 +1,5 @@ import pytest -from ert.shared.storage import extraction - @pytest.fixture def client(monkeypatch, ert_storage_client): @@ -10,7 +8,6 @@ class MockStorageService: def session(): return ert_storage_client - monkeypatch.setattr(extraction, "ErtServer", MockStorageService) monkeypatch.setenv("ERT_STORAGE_NO_TOKEN", "ON") # Store a list of experiment IDs that exist in the database, in case the From a5ec8aff0c0f34582c85adeee49e345f2b6ebf1b Mon Sep 17 00:00:00 2001 From: "Yngve S. Kristiansen" Date: Wed, 14 Jan 2026 09:52:19 +0100 Subject: [PATCH 8/8] Rename BaseServiceExit to ErtServerExit --- src/ert/services/__init__.py | 4 ++-- src/ert/services/_storage_main.py | 6 +++--- src/ert/services/ert_server.py | 4 ++-- src/everest/detached/everserver.py | 4 ++-- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/ert/services/__init__.py b/src/ert/services/__init__.py index 7126d4e3210..6b403a703a4 100644 --- a/src/ert/services/__init__.py +++ b/src/ert/services/__init__.py @@ -1,3 +1,3 @@ -from .ert_server import BaseServiceExit, ErtServer, ServerBootFail +from .ert_server import ErtServer, ErtServerExit, ServerBootFail -__all__ = ["BaseServiceExit", "ErtServer", "ServerBootFail"] +__all__ = ["ErtServer", "ErtServerExit", "ServerBootFail"] diff --git a/src/ert/services/_storage_main.py b/src/ert/services/_storage_main.py index c8d4aad1cb0..c9f47ff9e32 100644 --- a/src/ert/services/_storage_main.py +++ b/src/ert/services/_storage_main.py @@ -30,7 +30,7 @@ from ert.logging import STORAGE_LOG_CONFIG from ert.plugins import setup_site_logging -from ert.services import BaseServiceExit +from ert.services import ErtServerExit from ert.shared import __file__ as ert_shared_path from ert.shared import find_available_socket, get_machine_name from ert.trace import tracer @@ -256,7 +256,7 @@ def _join_terminate_thread(terminate_on_parent_death_thread: threading.Thread) - """Join the terminate thread, handling BaseServiceExit (which is used by Everest)""" try: terminate_on_parent_death_thread.join() - except BaseServiceExit: + except ErtServerExit: logger = logging.getLogger("ert.shared.storage.info") logger.info( "Got BaseServiceExit while joining terminate thread, " @@ -311,7 +311,7 @@ def main() -> None: logger.info("Starting dark storage") logger.info(f"Started dark storage with parent {args.parent_pid}") run_server(args, debug=False, uvicorn_config=uvicorn_config) - except (SystemExit, BaseServiceExit): + except (SystemExit, ErtServerExit): logger.info("Stopping dark storage") finally: stopped.set() diff --git a/src/ert/services/ert_server.py b/src/ert/services/ert_server.py index 28f67fd1c7c..ff0fec12895 100644 --- a/src/ert/services/ert_server.py +++ b/src/ert/services/ert_server.py @@ -34,7 +34,7 @@ class ErtServerConnectionInfo(TypedDict): auth: str -class BaseServiceExit(OSError): +class ErtServerExit(OSError): pass @@ -43,7 +43,7 @@ def cleanup_service_files(signum: int, frame: types.FrameType | None) -> None: file = Path(file_path) if file.exists(): file.unlink() - raise BaseServiceExit(f"Signal {signum} received.") + raise ErtServerExit(f"Signal {signum} received.") if threading.current_thread() is threading.main_thread(): diff --git a/src/everest/detached/everserver.py b/src/everest/detached/everserver.py index eee6dda6982..b5d171ea4a0 100644 --- a/src/everest/detached/everserver.py +++ b/src/everest/detached/everserver.py @@ -13,7 +13,7 @@ from ert.plugins.plugin_manager import ErtPluginManager from ert.services import ErtServer -from ert.services.ert_server import BaseServiceExit +from ert.services.ert_server import ErtServerExit from ert.storage import ExperimentStatus from ert.storage.local_experiment import ExperimentState from ert.trace import tracer @@ -179,7 +179,7 @@ def main() -> None: ExperimentState.running, } time.sleep(0.5) - except BaseServiceExit: + except ErtServerExit: # Server exit, happens on normal shutdown and keyboard interrupt logging.getLogger(EVERSERVER).info("Everserver stopped by user") except Exception as e: