From 90e1177e9f9e267be58b6894748b8972a6b2242e Mon Sep 17 00:00:00 2001 From: Mingxin Zheng Date: Tue, 25 Mar 2025 04:04:36 +0000 Subject: [PATCH 1/8] Add integration test for robotic ultrasound workflow Signed-off-by: Mingxin Zheng --- tools/run_all_tests.py | 66 +++++++- .../state_machine/pi0_policy/eval.py | 1 + .../test_simulation/test_integration_eval.py | 142 ++++++++++++++++++ 3 files changed, 206 insertions(+), 3 deletions(-) create mode 100644 workflows/robotic_ultrasound/tests/test_simulation/test_integration_eval.py diff --git a/tools/run_all_tests.py b/tools/run_all_tests.py index 9010a8b3..c104bc6c 100644 --- a/tools/run_all_tests.py +++ b/tools/run_all_tests.py @@ -32,8 +32,8 @@ ] -def get_tests(test_root): - path = f"{test_root}/**/test_*.py" +def get_tests(test_root, pattern="test_*.py"): + path = f"{test_root}/**/{pattern}" return glob.glob(path, recursive=True) @@ -80,6 +80,9 @@ def run_tests_with_coverage(workflow_name): # TODO: remove this as integration tests elif "test_sim_with_dds" in test_path or "test_pi0" in test_path: continue + elif "test_integration" in test_path: + # TODO: refractor the test file names and structure + continue else: cmd = [ sys.executable, @@ -137,13 +140,70 @@ def run_tests_with_coverage(workflow_name): return 1 +def run_integration_tests(workflow_name): + """Run integration tests for a workflow""" + project_root = f"workflows/{workflow_name}" + default_license_file = os.path.join(os.getcwd(), project_root, "scripts", "dds", "rti_license.dat") + os.environ["RTI_LICENSE_FILE"] = os.environ.get("RTI_LICENSE_FILE", default_license_file) + all_tests_passed = True + tests_dir = os.path.join(project_root, "tests") + print(f"Looking for tests in {tests_dir}") + tests = get_tests(tests_dir, pattern="test_integration_*.py") + + for test_path in tests: + # add project root to pythonpath + print(f"Running integration test: {test_path}") + env = os.environ.copy() + pythonpath = [os.path.join(project_root, "scripts"), tests_dir] + + if "PYTHONPATH" in env: + env["PYTHONPATH"] = ":".join(pythonpath) + ":" + env["PYTHONPATH"] + else: + env["PYTHONPATH"] = ":".join(pythonpath) + + cmd = [ + sys.executable, + "-m", + "unittest", + test_path, + ] + + process = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + stdout, stderr = process.communicate() + + # Filter out extension loading messages + filtered_stdout = "\n".join( + [line for line in stdout.split("\n") if not ("[ext:" in line and "startup" in line)] + ) + filtered_stderr = "\n".join( + [line for line in stderr.split("\n") if not ("[ext:" in line and "startup" in line)] + ) + + # Print filtered output + if filtered_stdout.strip(): + print(filtered_stdout) + if filtered_stderr.strip(): + print(filtered_stderr) + + if all_tests_passed: + print("All tests passed") + return 0 + else: + print("Some tests failed") + return 1 + + if __name__ == "__main__": parser = argparse.ArgumentParser(description="Run all tests for a workflow") parser.add_argument("--workflow", type=str, default="robotic_ultrasound", help="Workflow name") + parser.add_argument("--integration", action="store_true", help="Run integration tests") args = parser.parse_args() if args.workflow not in WORKFLOWS: raise ValueError(f"Invalid workflow name: {args.workflow}") - exit_code = run_tests_with_coverage(args.workflow) + if args.integration: + exit_code = run_integration_tests(args.workflow) + else: + exit_code = run_tests_with_coverage(args.workflow) sys.exit(exit_code) diff --git a/workflows/robotic_ultrasound/scripts/simulation/environments/state_machine/pi0_policy/eval.py b/workflows/robotic_ultrasound/scripts/simulation/environments/state_machine/pi0_policy/eval.py index 9c7e34d4..755dbdd2 100644 --- a/workflows/robotic_ultrasound/scripts/simulation/environments/state_machine/pi0_policy/eval.py +++ b/workflows/robotic_ultrasound/scripts/simulation/environments/state_machine/pi0_policy/eval.py @@ -138,6 +138,7 @@ def main(): obs, rew, terminated, truncated, info_ = env.step(action) env.reset() + print("Resetting the environment.") for _ in range(reset_steps): reset_tensor = get_reset_action(env) obs, rew, terminated, truncated, info_ = env.step(reset_tensor) diff --git a/workflows/robotic_ultrasound/tests/test_simulation/test_integration_eval.py b/workflows/robotic_ultrasound/tests/test_simulation/test_integration_eval.py new file mode 100644 index 00000000..9d906df7 --- /dev/null +++ b/workflows/robotic_ultrasound/tests/test_simulation/test_integration_eval.py @@ -0,0 +1,142 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import signal +import subprocess +import threading +import time +import unittest + +from parameterized import parameterized + + +def monitor_output(process, found_event, target_line=None): + """Monitor process output for target_line and set event when found.""" + try: + if target_line: + for line in iter(process.stdout.readline, ""): + if target_line in line: + found_event.set() + break # todo + except (ValueError, IOError): + # Handle case where stdout is closed + pass + + +def run_with_monitoring(command, timeout_seconds, target_line=None): + # Start the process with pipes for output + env = os.environ.copy() + process = subprocess.Popen( + command, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, # Redirect stderr to stdout + text=True, + bufsize=1, # Line buffered + preexec_fn=os.setsid if os.name != "nt" else None, # Create a new process group on Unix + env=env, + ) + + # Event to signal when target line is found + found_event = threading.Event() + + # Start monitoring thread + monitor_thread = threading.Thread(target=monitor_output, args=(process, found_event, target_line)) + monitor_thread.daemon = True + monitor_thread.start() + + target_found = False + + try: + # Wait for either timeout or target line found + start_time = time.time() + while time.time() - start_time < timeout_seconds: + if target_line and found_event.is_set(): + target_found = True + + # Check if process has already terminated + if process.poll() is not None: + break + + time.sleep(0.1) + + # If we get here, either timeout occurred or process ended + if process.poll() is None: # Process is still running + print(f"Sending SIGINT after {timeout_seconds} seconds...") + + if os.name != "nt": # Unix/Linux/MacOS + # Send SIGINT to the entire process group + os.killpg(os.getpgid(process.pid), signal.SIGINT) + else: # Windows + process.send_signal(signal.CTRL_C_EVENT) + + # Give the process some time to handle the signal and exit gracefully + try: + process.wait(timeout=5) + except subprocess.TimeoutExpired: + print("Process didn't terminate after SIGINT, force killing...") + if os.name != "nt": # Unix/Linux/MacOS + os.killpg(os.getpgid(process.pid), signal.SIGKILL) + else: # Windows + process.kill() + + except Exception as e: + print(f"Error during process execution: {e}") + if process.poll() is None: + process.kill() + + finally: + # Ensure we close all pipes and terminate the process + try: + # Try to get any remaining output, but with a short timeout + remaining_output, _ = process.communicate(timeout=2) + if remaining_output: + print(remaining_output) + except subprocess.TimeoutExpired: + # If communicate times out, force kill the process + process.kill() + process.communicate() + + # If the process is somehow still running, make sure it's killed + if process.poll() is None: + process.kill() + process.communicate() + + # Check if target was found + if not target_found and found_event.is_set(): + target_found = True + + return process.returncode, target_found + + +SM_CASES = [ + ( + "python -u workflows/robotic_ultrasound/scripts/simulation/environments/state_machine/pi0_policy/eval.py --task Isaac-Teleop-Torso-FrankaUsRs-IK-RL-Rel-v0 --enable_camera --repo_id i4h/sim_liver_scan --headless", + 120, + "Resetting the environment.", + ), +] + +class TestSurgerySM(unittest.TestCase): + @parameterized.expand(SM_CASES) + def test_surgery_sm(self, command, timeout, target_line): + # Run and monitor command + exit_code, found_target = run_with_monitoring(command, timeout, target_line) + self.assertTrue(found_target) + + +if __name__ == "__main__": + unittest.main() From 07668eba07af859fc9ca4513930bdecf42bd42af Mon Sep 17 00:00:00 2001 From: Mingxin Zheng Date: Tue, 25 Mar 2025 04:05:52 +0000 Subject: [PATCH 2/8] fix Signed-off-by: Mingxin Zheng --- tools/run_all_tests.py | 2 +- .../tests/test_simulation/test_integration_eval.py | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/tools/run_all_tests.py b/tools/run_all_tests.py index c104bc6c..5397a9e6 100644 --- a/tools/run_all_tests.py +++ b/tools/run_all_tests.py @@ -160,7 +160,7 @@ def run_integration_tests(workflow_name): env["PYTHONPATH"] = ":".join(pythonpath) + ":" + env["PYTHONPATH"] else: env["PYTHONPATH"] = ":".join(pythonpath) - + cmd = [ sys.executable, "-m", diff --git a/workflows/robotic_ultrasound/tests/test_simulation/test_integration_eval.py b/workflows/robotic_ultrasound/tests/test_simulation/test_integration_eval.py index 9d906df7..4274d86b 100644 --- a/workflows/robotic_ultrasound/tests/test_simulation/test_integration_eval.py +++ b/workflows/robotic_ultrasound/tests/test_simulation/test_integration_eval.py @@ -124,7 +124,12 @@ def run_with_monitoring(command, timeout_seconds, target_line=None): SM_CASES = [ ( - "python -u workflows/robotic_ultrasound/scripts/simulation/environments/state_machine/pi0_policy/eval.py --task Isaac-Teleop-Torso-FrankaUsRs-IK-RL-Rel-v0 --enable_camera --repo_id i4h/sim_liver_scan --headless", + ( + "python -u workflows/robotic_ultrasound/scripts/simulation/environments/state_machine/pi0_policy/eval.py" + "--task Isaac-Teleop-Torso-FrankaUsRs-IK-RL-Rel-v0 " + "--enable_camera " + "--repo_id i4h/sim_liver_scan --headless" + ), 120, "Resetting the environment.", ), From 1e25d08dda1768b6ebc1027055fbfbe71add9cc8 Mon Sep 17 00:00:00 2001 From: Mingxin Zheng Date: Mon, 31 Mar 2025 15:19:27 +0000 Subject: [PATCH 3/8] fix comments Signed-off-by: Mingxin Zheng --- tools/run_all_tests.py | 112 +++++++++++++++++------------------------ 1 file changed, 45 insertions(+), 67 deletions(-) diff --git a/tools/run_all_tests.py b/tools/run_all_tests.py index 5397a9e6..8ea87f78 100644 --- a/tools/run_all_tests.py +++ b/tools/run_all_tests.py @@ -27,7 +27,6 @@ XVFB_TEST_CASES = [ - "test_orientation", "test_visualization", ] @@ -37,34 +36,60 @@ def get_tests(test_root, pattern="test_*.py"): return glob.glob(path, recursive=True) +def _run_test_process(cmd, env, test_path): + """Helper function to run a test process and handle its output""" + print(f"Running test: {test_path}") + process = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + stdout, stderr = process.communicate() + + # Filter out extension loading messages + filtered_stdout = "\n".join( + [line for line in stdout.split("\n") if not ("[ext:" in line and "startup" in line)] + ) + filtered_stderr = "\n".join( + [line for line in stderr.split("\n") if not ("[ext:" in line and "startup" in line)] + ) + + # Print filtered output + if filtered_stdout.strip(): + print(filtered_stdout) + if filtered_stderr.strip(): + print(filtered_stderr) + + return process.returncode == 0 + + +def _setup_test_env(project_root, tests_dir): + """Helper function to setup test environment""" + env = os.environ.copy() + pythonpath = [os.path.join(project_root, "scripts"), tests_dir] + + if "PYTHONPATH" in env: + env["PYTHONPATH"] = ":".join(pythonpath) + ":" + env["PYTHONPATH"] + else: + env["PYTHONPATH"] = ":".join(pythonpath) + + return env + + def run_tests_with_coverage(workflow_name): """Run all unittest cases with coverage reporting""" project_root = f"workflows/{workflow_name}" try: - # TODO: add license file to secrets default_license_file = os.path.join(os.getcwd(), project_root, "scripts", "dds", "rti_license.dat") os.environ["RTI_LICENSE_FILE"] = os.environ.get("RTI_LICENSE_FILE", default_license_file) all_tests_passed = True tests_dir = os.path.join(project_root, "tests") print(f"Looking for tests in {tests_dir}") tests = get_tests(tests_dir) + env = _setup_test_env(project_root, tests_dir) for test_path in tests: test_name = os.path.basename(test_path).replace(".py", "") - print(f"\nRunning test: {test_path}") - - # add project root to pythonpath - env = os.environ.copy() - pythonpath = [os.path.join(project_root, "scripts"), tests_dir] - - if "PYTHONPATH" in env: - env["PYTHONPATH"] = ":".join(pythonpath) + ":" + env["PYTHONPATH"] - else: - env["PYTHONPATH"] = ":".join(pythonpath) # Check if this test needs a virtual display - if test_name in XVFB_TEST_CASES: # virtual display for GUI tests + if test_name in XVFB_TEST_CASES: cmd = [ "xvfb-run", "-a", @@ -77,11 +102,10 @@ def run_tests_with_coverage(workflow_name): "unittest", test_path, ] - # TODO: remove this as integration tests + # TODO: move these tests to integration tests elif "test_sim_with_dds" in test_path or "test_pi0" in test_path: continue elif "test_integration" in test_path: - # TODO: refractor the test file names and structure continue else: cmd = [ @@ -95,25 +119,7 @@ def run_tests_with_coverage(workflow_name): test_path, ] - process = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) - stdout, stderr = process.communicate() - - # Filter out extension loading messages - filtered_stdout = "\n".join( - [line for line in stdout.split("\n") if not ("[ext:" in line and "startup" in line)] - ) - filtered_stderr = "\n".join( - [line for line in stderr.split("\n") if not ("[ext:" in line and "startup" in line)] - ) - - # Print filtered output - if filtered_stdout.strip(): - print(filtered_stdout) - if filtered_stderr.strip(): - print(filtered_stderr) - - result = process - if result.returncode != 0: + if not _run_test_process(cmd, env, test_path): all_tests_passed = False # combine coverage results @@ -149,48 +155,20 @@ def run_integration_tests(workflow_name): tests_dir = os.path.join(project_root, "tests") print(f"Looking for tests in {tests_dir}") tests = get_tests(tests_dir, pattern="test_integration_*.py") + env = _setup_test_env(project_root, tests_dir) for test_path in tests: - # add project root to pythonpath - print(f"Running integration test: {test_path}") - env = os.environ.copy() - pythonpath = [os.path.join(project_root, "scripts"), tests_dir] - - if "PYTHONPATH" in env: - env["PYTHONPATH"] = ":".join(pythonpath) + ":" + env["PYTHONPATH"] - else: - env["PYTHONPATH"] = ":".join(pythonpath) - cmd = [ sys.executable, "-m", "unittest", test_path, ] + + if not _run_test_process(cmd, env, test_path): + all_tests_passed = False - process = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) - stdout, stderr = process.communicate() - - # Filter out extension loading messages - filtered_stdout = "\n".join( - [line for line in stdout.split("\n") if not ("[ext:" in line and "startup" in line)] - ) - filtered_stderr = "\n".join( - [line for line in stderr.split("\n") if not ("[ext:" in line and "startup" in line)] - ) - - # Print filtered output - if filtered_stdout.strip(): - print(filtered_stdout) - if filtered_stderr.strip(): - print(filtered_stderr) - - if all_tests_passed: - print("All tests passed") - return 0 - else: - print("Some tests failed") - return 1 + return 0 if all_tests_passed else 1 if __name__ == "__main__": From a0b77d233756715922e0599afff347d0ccf03bd9 Mon Sep 17 00:00:00 2001 From: Mingxin Zheng Date: Mon, 31 Mar 2025 16:27:54 +0000 Subject: [PATCH 4/8] fix comments Signed-off-by: Mingxin Zheng --- .../scripts/simulation/README.md | 5 +---- .../environments/state_machine/pi0_policy/eval.py | 14 ++++++++++++-- .../tests/test_simulation/test_integration_eval.py | 11 +++-------- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/workflows/robotic_ultrasound/scripts/simulation/README.md b/workflows/robotic_ultrasound/scripts/simulation/README.md index 5262cd5d..0b908b2a 100644 --- a/workflows/robotic_ultrasound/scripts/simulation/README.md +++ b/workflows/robotic_ultrasound/scripts/simulation/README.md @@ -45,10 +45,7 @@ export PYTHONPATH=`pwd` 5. Return to this folder and run the following command: ```sh -python environments/state_machine/pi0_policy/eval.py \ - --task Isaac-Teleop-Torso-FrankaUsRs-IK-RL-Rel-v0 \ - --enable_camera \ - --repo_id i4h/sim_liver_scan +python environments/state_machine/pi0_policy/eval.py --enable_camera ``` NOTE: You can also specify `--ckpt_path` to run a specific policy. diff --git a/workflows/robotic_ultrasound/scripts/simulation/environments/state_machine/pi0_policy/eval.py b/workflows/robotic_ultrasound/scripts/simulation/environments/state_machine/pi0_policy/eval.py index 53e65ced..246c6f99 100644 --- a/workflows/robotic_ultrasound/scripts/simulation/environments/state_machine/pi0_policy/eval.py +++ b/workflows/robotic_ultrasound/scripts/simulation/environments/state_machine/pi0_policy/eval.py @@ -37,14 +37,24 @@ "--disable_fabric", action="store_true", default=False, help="Disable fabric and use USD I/O operations." ) parser.add_argument("--num_envs", type=int, default=1, help="Number of environments to spawn.") -parser.add_argument("--task", type=str, default=None, help="Name of the task.") +parser.add_argument( + "--task", + type=str, + default="Isaac-Teleop-Torso-FrankaUsRs-IK-RL-Rel-v0", + help="Name of the task.", +) parser.add_argument( "--ckpt_path", type=str, default=robot_us_assets.policy_ckpt, help="checkpoint path. Default to use policy checkpoint in the latest assets.", ) -parser.add_argument("--repo_id", type=str, help="the LeRobot repo id for the dataset norm.") +parser.add_argument( + "--repo_id", + type=str, + default="i4h/sim_liver_scan", + help="the LeRobot repo id for the dataset norm.", +) # append AppLauncher cli argr AppLauncher.add_app_launcher_args(parser) diff --git a/workflows/robotic_ultrasound/tests/test_simulation/test_integration_eval.py b/workflows/robotic_ultrasound/tests/test_simulation/test_integration_eval.py index 4274d86b..d10657fa 100644 --- a/workflows/robotic_ultrasound/tests/test_simulation/test_integration_eval.py +++ b/workflows/robotic_ultrasound/tests/test_simulation/test_integration_eval.py @@ -30,7 +30,7 @@ def monitor_output(process, found_event, target_line=None): for line in iter(process.stdout.readline, ""): if target_line in line: found_event.set() - break # todo + break # TODO: should we force the process to exit here? except (ValueError, IOError): # Handle case where stdout is closed pass @@ -124,12 +124,7 @@ def run_with_monitoring(command, timeout_seconds, target_line=None): SM_CASES = [ ( - ( - "python -u workflows/robotic_ultrasound/scripts/simulation/environments/state_machine/pi0_policy/eval.py" - "--task Isaac-Teleop-Torso-FrankaUsRs-IK-RL-Rel-v0 " - "--enable_camera " - "--repo_id i4h/sim_liver_scan --headless" - ), + "python -u -m simulation.environments.state_machine.pi0_policy.eval --enable_camera --headless", 120, "Resetting the environment.", ), @@ -139,7 +134,7 @@ class TestSurgerySM(unittest.TestCase): @parameterized.expand(SM_CASES) def test_surgery_sm(self, command, timeout, target_line): # Run and monitor command - exit_code, found_target = run_with_monitoring(command, timeout, target_line) + _, found_target = run_with_monitoring(command, timeout, target_line) self.assertTrue(found_target) From 2d673ae5a8efc0c04bf2f381a368878efd2470c2 Mon Sep 17 00:00:00 2001 From: Mingxin Zheng Date: Mon, 31 Mar 2025 16:29:12 +0000 Subject: [PATCH 5/8] autofix Signed-off-by: Mingxin Zheng --- tools/run_all_tests.py | 12 ++++-------- .../tests/test_simulation/test_integration_eval.py | 1 + 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/tools/run_all_tests.py b/tools/run_all_tests.py index 8ea87f78..4e4a5934 100644 --- a/tools/run_all_tests.py +++ b/tools/run_all_tests.py @@ -43,12 +43,8 @@ def _run_test_process(cmd, env, test_path): stdout, stderr = process.communicate() # Filter out extension loading messages - filtered_stdout = "\n".join( - [line for line in stdout.split("\n") if not ("[ext:" in line and "startup" in line)] - ) - filtered_stderr = "\n".join( - [line for line in stderr.split("\n") if not ("[ext:" in line and "startup" in line)] - ) + filtered_stdout = "\n".join([line for line in stdout.split("\n") if not ("[ext:" in line and "startup" in line)]) + filtered_stderr = "\n".join([line for line in stderr.split("\n") if not ("[ext:" in line and "startup" in line)]) # Print filtered output if filtered_stdout.strip(): @@ -68,7 +64,7 @@ def _setup_test_env(project_root, tests_dir): env["PYTHONPATH"] = ":".join(pythonpath) + ":" + env["PYTHONPATH"] else: env["PYTHONPATH"] = ":".join(pythonpath) - + return env @@ -164,7 +160,7 @@ def run_integration_tests(workflow_name): "unittest", test_path, ] - + if not _run_test_process(cmd, env, test_path): all_tests_passed = False diff --git a/workflows/robotic_ultrasound/tests/test_simulation/test_integration_eval.py b/workflows/robotic_ultrasound/tests/test_simulation/test_integration_eval.py index d10657fa..e81197bd 100644 --- a/workflows/robotic_ultrasound/tests/test_simulation/test_integration_eval.py +++ b/workflows/robotic_ultrasound/tests/test_simulation/test_integration_eval.py @@ -130,6 +130,7 @@ def run_with_monitoring(command, timeout_seconds, target_line=None): ), ] + class TestSurgerySM(unittest.TestCase): @parameterized.expand(SM_CASES) def test_surgery_sm(self, command, timeout, target_line): From 43393c79e85d239728e614a6f4e5cfd43ddb3f9b Mon Sep 17 00:00:00 2001 From: Mingxin Zheng Date: Tue, 1 Apr 2025 01:19:12 +0000 Subject: [PATCH 6/8] move to helper Signed-off-by: Mingxin Zheng --- workflows/robotic_ultrasound/tests/helpers.py | 103 +++++++++++++++++ .../test_simulation/test_integration_eval.py | 109 +----------------- 2 files changed, 106 insertions(+), 106 deletions(-) diff --git a/workflows/robotic_ultrasound/tests/helpers.py b/workflows/robotic_ultrasound/tests/helpers.py index 2b51f69d..c3c7b79a 100644 --- a/workflows/robotic_ultrasound/tests/helpers.py +++ b/workflows/robotic_ultrasound/tests/helpers.py @@ -14,9 +14,112 @@ # limitations under the License. import os +import signal +import subprocess +import threading +import time from unittest import skipUnless def requires_rti(func): RTI_AVAILABLE = bool(os.getenv("RTI_LICENSE_FILE") and os.path.exists(os.getenv("RTI_LICENSE_FILE"))) return skipUnless(RTI_AVAILABLE, "RTI Connext DDS is not installed or license not found")(func) + + +def monitor_output(process, found_event, target_line=None): + """Monitor process output for target_line and set event when found.""" + try: + if target_line: + for line in iter(process.stdout.readline, ""): + if target_line in line: + found_event.set() + break # TODO: should we force the process to exit here? + except (ValueError, IOError): + # Handle case where stdout is closed + pass + + +def run_with_monitoring(command, timeout_seconds, target_line=None): + # Start the process with pipes for output + env = os.environ.copy() + process = subprocess.Popen( + command, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, # Redirect stderr to stdout + text=True, + bufsize=1, # Line buffered + preexec_fn=os.setsid if os.name != "nt" else None, # Create a new process group on Unix + env=env, + ) + + # Event to signal when target line is found + found_event = threading.Event() + + # Start monitoring thread + monitor_thread = threading.Thread(target=monitor_output, args=(process, found_event, target_line)) + monitor_thread.daemon = True + monitor_thread.start() + + target_found = False + + try: + # Wait for either timeout or target line found + start_time = time.time() + while time.time() - start_time < timeout_seconds: + if target_line and found_event.is_set(): + target_found = True + + # Check if process has already terminated + if process.poll() is not None: + break + + time.sleep(0.1) + + # If we get here, either timeout occurred or process ended + if process.poll() is None: # Process is still running + print(f"Sending SIGINT after {timeout_seconds} seconds...") + + if os.name != "nt": # Unix/Linux/MacOS + # Send SIGINT to the entire process group + os.killpg(os.getpgid(process.pid), signal.SIGINT) + else: # Windows + process.send_signal(signal.CTRL_C_EVENT) + + # Give the process some time to handle the signal and exit gracefully + try: + process.wait(timeout=5) + except subprocess.TimeoutExpired: + print("Process didn't terminate after SIGINT, force killing...") + if os.name != "nt": # Unix/Linux/MacOS + os.killpg(os.getpgid(process.pid), signal.SIGKILL) + else: # Windows + process.kill() + + except Exception as e: + print(f"Error during process execution: {e}") + if process.poll() is None: + process.kill() + + finally: + # Ensure we close all pipes and terminate the process + try: + # Try to get any remaining output, but with a short timeout + remaining_output, _ = process.communicate(timeout=2) + if remaining_output: + print(remaining_output) + except subprocess.TimeoutExpired: + # If communicate times out, force kill the process + process.kill() + process.communicate() + + # If the process is somehow still running, make sure it's killed + if process.poll() is None: + process.kill() + process.communicate() + + # Check if target was found + if not target_found and found_event.is_set(): + target_found = True + + return process.returncode, target_found diff --git a/workflows/robotic_ultrasound/tests/test_simulation/test_integration_eval.py b/workflows/robotic_ultrasound/tests/test_simulation/test_integration_eval.py index e81197bd..d826f621 100644 --- a/workflows/robotic_ultrasound/tests/test_simulation/test_integration_eval.py +++ b/workflows/robotic_ultrasound/tests/test_simulation/test_integration_eval.py @@ -13,115 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os -import signal -import subprocess -import threading -import time import unittest +from helpers import run_with_monitoring from parameterized import parameterized -def monitor_output(process, found_event, target_line=None): - """Monitor process output for target_line and set event when found.""" - try: - if target_line: - for line in iter(process.stdout.readline, ""): - if target_line in line: - found_event.set() - break # TODO: should we force the process to exit here? - except (ValueError, IOError): - # Handle case where stdout is closed - pass - - -def run_with_monitoring(command, timeout_seconds, target_line=None): - # Start the process with pipes for output - env = os.environ.copy() - process = subprocess.Popen( - command, - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, # Redirect stderr to stdout - text=True, - bufsize=1, # Line buffered - preexec_fn=os.setsid if os.name != "nt" else None, # Create a new process group on Unix - env=env, - ) - - # Event to signal when target line is found - found_event = threading.Event() - - # Start monitoring thread - monitor_thread = threading.Thread(target=monitor_output, args=(process, found_event, target_line)) - monitor_thread.daemon = True - monitor_thread.start() - - target_found = False - - try: - # Wait for either timeout or target line found - start_time = time.time() - while time.time() - start_time < timeout_seconds: - if target_line and found_event.is_set(): - target_found = True - - # Check if process has already terminated - if process.poll() is not None: - break - - time.sleep(0.1) - - # If we get here, either timeout occurred or process ended - if process.poll() is None: # Process is still running - print(f"Sending SIGINT after {timeout_seconds} seconds...") - - if os.name != "nt": # Unix/Linux/MacOS - # Send SIGINT to the entire process group - os.killpg(os.getpgid(process.pid), signal.SIGINT) - else: # Windows - process.send_signal(signal.CTRL_C_EVENT) - - # Give the process some time to handle the signal and exit gracefully - try: - process.wait(timeout=5) - except subprocess.TimeoutExpired: - print("Process didn't terminate after SIGINT, force killing...") - if os.name != "nt": # Unix/Linux/MacOS - os.killpg(os.getpgid(process.pid), signal.SIGKILL) - else: # Windows - process.kill() - - except Exception as e: - print(f"Error during process execution: {e}") - if process.poll() is None: - process.kill() - - finally: - # Ensure we close all pipes and terminate the process - try: - # Try to get any remaining output, but with a short timeout - remaining_output, _ = process.communicate(timeout=2) - if remaining_output: - print(remaining_output) - except subprocess.TimeoutExpired: - # If communicate times out, force kill the process - process.kill() - process.communicate() - - # If the process is somehow still running, make sure it's killed - if process.poll() is None: - process.kill() - process.communicate() - - # Check if target was found - if not target_found and found_event.is_set(): - target_found = True - - return process.returncode, target_found - - SM_CASES = [ ( "python -u -m simulation.environments.state_machine.pi0_policy.eval --enable_camera --headless", @@ -131,9 +28,9 @@ def run_with_monitoring(command, timeout_seconds, target_line=None): ] -class TestSurgerySM(unittest.TestCase): +class TestPolicyEval(unittest.TestCase): @parameterized.expand(SM_CASES) - def test_surgery_sm(self, command, timeout, target_line): + def test_policy_eval(self, command, timeout, target_line): # Run and monitor command _, found_target = run_with_monitoring(command, timeout, target_line) self.assertTrue(found_target) From 0147d96f2573c0905bb6bc1724b7d944533a2cd4 Mon Sep 17 00:00:00 2001 From: Mingxin Zheng Date: Tue, 1 Apr 2025 01:21:17 +0000 Subject: [PATCH 7/8] format autofix Signed-off-by: Mingxin Zheng --- .../tests/test_simulation/test_integration_eval.py | 1 - 1 file changed, 1 deletion(-) diff --git a/workflows/robotic_ultrasound/tests/test_simulation/test_integration_eval.py b/workflows/robotic_ultrasound/tests/test_simulation/test_integration_eval.py index d826f621..26f57f49 100644 --- a/workflows/robotic_ultrasound/tests/test_simulation/test_integration_eval.py +++ b/workflows/robotic_ultrasound/tests/test_simulation/test_integration_eval.py @@ -18,7 +18,6 @@ from helpers import run_with_monitoring from parameterized import parameterized - SM_CASES = [ ( "python -u -m simulation.environments.state_machine.pi0_policy.eval --enable_camera --headless", From 56cbc0f97cb6e2f4605384803a53fb9817652f9e Mon Sep 17 00:00:00 2001 From: Mingxin Zheng Date: Tue, 1 Apr 2025 02:55:49 +0000 Subject: [PATCH 8/8] update doc Signed-off-by: Mingxin Zheng --- CONTRIBUTING.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 69054f66..6912a3ba 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -88,8 +88,11 @@ i4h-asset-retrieve # Set up your RTI license, skip if you are only running tests for robotic_surgery export RTI_LICENSE_FILE= -# Run all tests +# Run all unit tests python tools/run_all_tests.py --workflow + +# Run all integration tests +python tools/run_all_tests.py --integration ``` ## Reporting issues