Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions workflows/robotic_ultrasound/scripts/simulation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,33 @@ The collected data includes:

During execution, you can press the 'r' key to reset the environment and state machine.

### Replay Recorded Trajectories

The `replay_recording.py` script allows you to visualize previously recorded HDF5 trajectories in the Isaac Sim environment. It loads recorded actions, organ positions, and robot joint states from HDF5 files for each episode and steps through them in the simulation.

#### Usage

Ensure your `PYTHONPATH` is set up as described in the [Environment Setup - Set environment variables before running the scripts](../../README.md#set-environment-variables-before-running-the-scripts).

Navigate to the [`simulation` folder](./) and execute:

```sh
python environments/state_machine/replay_recording.py --hdf5_path /path/to/your/hdf5_data_directory --task <YourTaskName>
```

Replace `/path/to/your/hdf5_data_directory` with the actual path to the directory containing your `data_*.hdf5` files, and `<YourTaskName>` with the task name used during data collection (e.g., `Isaac-Teleop-Torso-FrankaUsRs-IK-RL-Rel-v0`).

#### Command Line Arguments

| Argument | Type | Default | Description |
|--------------------|------|------------------------------------------|----------------------------------------------------------------------------------|
| `--hdf5_path` | str | (Required) | Path to the directory containing recorded HDF5 files. |
| `--task` | str | `Isaac-Teleop-Torso-FrankaUsRs-IK-RL-Rel-v0` | Name of the task (environment) to use. Should match the task used for recording. |
| `--num_envs` | int | `1` | Number of environments to spawn (should typically be 1 for replay). |
| `--disable_fabric` | flag | `False` | Disable fabric and use USD I/O operations. |

> **Note:** Additional common Isaac Lab arguments (like `--device`) can also be used.

### Teleoperation

The teleoperation interface allows direct control of the robotic arm using various input devices. It supports keyboard, SpaceMouse, and gamepad controls for precise manipulation of the ultrasound probe.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ def main():
obs["rgb_images"] = rgb_images
obs["depth_images"] = depth_images

obs["joint_vel"] = env.unwrapped.scene["robot"].data.joint_vel
data_collector.record_step(
env,
obs,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

import argparse

from isaaclab.app import AppLauncher

# add argparse arguments
parser = argparse.ArgumentParser(description="This script demonstrates a single-arm manipulator.")
parser.add_argument(
"hdf5_path", type=str, help="Path to a .hdf5 file with recorded data or a directory of .hdf5 files."
)
parser.add_argument(
"--disable_fabric", action="store_true", default=False, help="Disable fabric and use USD I/O operations."
)
parser.add_argument("--num_envs", type=int, default=1, help="Number of environments to spawn.")
parser.add_argument("--task", type=str, default="Isaac-Teleop-Torso-FrankaUsRs-IK-RL-Rel-v0", help="Name of the task.")

# append AppLauncher cli args
AppLauncher.add_app_launcher_args(parser)
# parse the arguments
args_cli = parser.parse_args()

# launch omniverse app
app_launcher = AppLauncher(args_cli)
simulation_app = app_launcher.app

import os

import gymnasium as gym
import h5py
import torch
from isaaclab_tasks.utils.parse_cfg import parse_env_cfg
# Import extensions to set up environment tasks
from robotic_us_ext import tasks # noqa: F401
from simulation.environments.state_machine.utils import reset_organ_to_position, reset_robot_to_position


def validate_hdf5_path(path):
"""Validate that the path contains HDF5 files and return the number of episodes."""
if os.path.isdir(path):
# Check if directory contains HDF5 files that start with "data_"
hdf5_files = [file for file in os.listdir(path) if file.startswith("data_") and file.endswith(".hdf5")]
return len(hdf5_files)
elif os.path.isfile(path) and path.endswith(".hdf5"):
return 1
return 0


def get_hdf5_episode_data(root, action_key: str):
"""Get episode data from HDF5 format."""
base_path = "data/demo_0"
try:
actions = root[f"{base_path}/{action_key}"][()]
except KeyError as e:
print(f"Error loading data using key: {base_path}/{action_key}")
print(f"Available keys: {list(root[base_path].keys())}")
raise e
return actions


def get_observation_episode_data(data_path: str, episode_idx: int, key: str):
"""Get episode data from HDF5 format for a given key."""
if not data_path.endswith(".hdf5"):
data_path = os.path.join(data_path, f"data_{episode_idx}.hdf5")
with h5py.File(data_path, "r") as f:
data = get_hdf5_episode_data(f, key)
return data


def get_episode_data(data_path: str, episode_idx: int):
"""Get episode data from HDF5 format."""
# Load initial episode data
try:
if "Rel" in args_cli.task:
action_key = "action"
else:
action_key = "abs_action"
if not data_path.endswith(".hdf5"):
data_path = os.path.join(data_path, f"data_{episode_idx}.hdf5")
root = h5py.File(data_path, "r")
actions = get_hdf5_episode_data(root, action_key)
return actions
except Exception as e:
print(f"Error loading data: {str(e)}")
return None


def reset_scene_to_initial_state(env, episode_idx: int, torso_obs_key: str, joint_state_key: str, joint_vel_key: str):
"""Reset the scene to the initial state."""
actions = get_episode_data(args_cli.hdf5_path, episode_idx)
object_position = get_observation_episode_data(args_cli.hdf5_path, episode_idx, torso_obs_key)
reset_organ_to_position(env, object_position)
robot_initial_joint_state = get_observation_episode_data(args_cli.hdf5_path, episode_idx, joint_state_key)
try:
joint_vel = get_observation_episode_data(args_cli.hdf5_path, episode_idx, joint_vel_key)
except KeyError:
print("No joint velocity provided, setting to zero")
joint_vel = None
reset_robot_to_position(env, robot_initial_joint_state, joint_vel=joint_vel)
return actions


def main():
"""Main function."""
# Check if the provided path contains valid HDF5 files
total_episodes = validate_hdf5_path(args_cli.hdf5_path)
print(f"total_episodes: {total_episodes}")
if total_episodes == 0:
raise AssertionError(
"Unsupported file format. Please use a directory containing .hdf5 files or a single .hdf5 file."
)

# parse configuration
env_cfg = parse_env_cfg(
args_cli.task, device=args_cli.device, num_envs=args_cli.num_envs, use_fabric=not args_cli.disable_fabric
)

# create environment
env = gym.make(args_cli.task, cfg=env_cfg)

# Use inference mode for all simulation state updates
with torch.inference_mode():
# reset environment at start
obs = env.reset()

print(f"[INFO]: Gym observation space: {env.observation_space}")
print(f"[INFO]: Gym action space: {env.action_space}")

episode_idx = 0
action_idx = 0
torso_obs_key = "observations/torso_obs"
joint_state_key = "abs_joint_pos"
joint_vel_key = "observations/joint_vel"

# simulate physics
while simulation_app.is_running():
actions = reset_scene_to_initial_state(env, episode_idx, torso_obs_key, joint_state_key, joint_vel_key)

for episode_idx in range(total_episodes):
print(f"\nepisode_idx: {episode_idx}")

# get from the second action since the first joint state is not recorded
for action_step, action in enumerate(actions[1:]):
# Get next action from recorded data
current_action = torch.tensor(action, device=args_cli.device)
current_action = current_action.unsqueeze(0)

print(f"Step {action_step}/{len(actions)} | Episode {episode_idx + 1}/{total_episodes}")
print(f"Action: {current_action}")

# Step environment with recorded action
obs, rew, terminated, truncated, info_ = env.step(current_action)

# Update counters
action_idx += 1

env.reset()
# Check if next episode exists before trying to load it
if episode_idx + 1 >= total_episodes:
print(f"Completed all episodes ({total_episodes})")
break

actions = reset_scene_to_initial_state(
env, episode_idx + 1, torso_obs_key, joint_state_key, joint_vel_key
)

# Check if we've reached the end of available episodes
if episode_idx >= total_episodes - 1 or actions is None:
print(f"Reached the end of available episodes ({episode_idx + 1}/{total_episodes})")
break

# close the environment and data file
env.close()


if __name__ == "__main__":
# run the main function
main()
# close sim app
simulation_app.close()
Original file line number Diff line number Diff line change
Expand Up @@ -281,3 +281,26 @@ def load_onnx_model(model_path):
session = ort.InferenceSession(model_path, providers=providers)
print(f"session using: {session.get_providers()}")
return session


def reset_organ_to_position(env, object_position, device="cuda:0"):
"""Reset the organ position."""
organs = env.unwrapped.scene._rigid_objects["organs"]
root_state = organs.data.default_root_state.clone()
root_state[:, :7] = torch.tensor(object_position[0, :7], device=device)
# write to the sim
organs.write_root_state_to_sim(root_state)


def reset_robot_to_position(env, robot_initial_joint_state, joint_vel=None, device="cuda:0"):
"""Reset the robot to the initial joint state."""
robot = env.unwrapped.scene["robot"]
joint_pos = torch.tensor(robot_initial_joint_state[0], device=device).unsqueeze(0)
if joint_vel is not None:
joint_vel = torch.tensor(joint_vel[0], device=device)
else:
print("No joint velocity provided, setting to zero")
joint_vel = torch.zeros_like(joint_pos)
# set joint positions
robot.write_joint_state_to_sim(joint_pos, joint_vel)
robot.reset()
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ class RoboticSoftCfg(InteractiveSceneCfg):
prim_path="/World/Light",
spawn=sim_utils.DomeLightCfg(intensity=3000.0, color=(0.75, 0.75, 0.75)),
)
table = AssetBaseCfg(
table = RigidObjectCfg(
prim_path="{ENV_REGEX_NS}/Table",
init_state=AssetBaseCfg.InitialStateCfg(
pos=[0.4804, 0.02017, -0.83415], rot=euler_angles_to_quats(torch.tensor([0.0, 0.0, -90.0]), degrees=True)
init_state=RigidObjectCfg.InitialStateCfg(
pos=[0.4804, 0.02017, -0.84415], rot=euler_angles_to_quats(torch.tensor([0.0, 0.0, -90.0]), degrees=True)
),
spawn=sim_utils.UsdFileCfg(
usd_path=robot_us_assets.table_with_cover,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Copyright (c) 2022-2024, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

import os
import shutil
import sys
import unittest
from pathlib import Path

import h5py
import numpy as np

sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from helpers import run_with_monitoring
from parameterized import parameterized

TESTS_PATH = Path(__file__).parents[1].as_posix()

TEST_DATA_DIR = os.path.join(TESTS_PATH, "temp_test_hdf5_data_dir")
FAKE_HDF5_FILENAME = "data_0.hdf5"
ORGAN_POS = np.array([[0.69, -0.11, 0.09, -0.21, -0.00, 0.00, 0.98]] * 6)
ROBOT_JOINT_POS = np.array([[0.23, -0.75, -0.22, -2.47, -0.13, 1.73, 0.077]] * 6)
ROBOT_JOINT_VEL = np.array([[0.39, 2.32, -1.74, 2.28, 0.53, -1.36, -1.74]] * 6)
ACTIONS_DATA = np.array([[0.36, -0.15, -0.11, -0.01, 0.016, 0.012]] * 6)

TORSO_OBS_KEY = "observations/torso_obs"
JOINT_STATE_KEY = "abs_joint_pos"
JOINT_VEL_KEY = "observations/joint_vel"
ACTION_KEY_IN_HDF5 = "action"


TEST_CASES = [
(
f"python -u -m simulation.environments.state_machine.replay_recording "
f"{TEST_DATA_DIR} --headless --enable_camera",
300,
"Completed all episodes",
),
(
f"python -u -m simulation.environments.state_machine.replay_recording "
f"{TEST_DATA_DIR}/data_0.hdf5 --headless --enable_camera",
300,
"Completed all episodes",
),
]


class TestReplayRecording(unittest.TestCase):
@classmethod
def setUpClass(cls):
os.makedirs(TEST_DATA_DIR, exist_ok=True)
cls.fake_hdf5_path = os.path.join(TEST_DATA_DIR, FAKE_HDF5_FILENAME)
print(f"Creating fake HDF5 file at: {cls.fake_hdf5_path}")
with h5py.File(cls.fake_hdf5_path, "w") as f:
base_path = "data/demo_0"
f.create_dataset(f"{base_path}/{TORSO_OBS_KEY}", data=ORGAN_POS)
f.create_dataset(f"{base_path}/{JOINT_STATE_KEY}", data=ROBOT_JOINT_POS)
f.create_dataset(f"{base_path}/{JOINT_VEL_KEY}", data=ROBOT_JOINT_VEL)
f.create_dataset(f"{base_path}/{ACTION_KEY_IN_HDF5}", data=ACTIONS_DATA)

@classmethod
def tearDownClass(cls):
if os.path.exists(TEST_DATA_DIR):
shutil.rmtree(TEST_DATA_DIR)

@parameterized.expand(TEST_CASES)
def test_replay_recording(self, command, timeout, target_line):
# Run and monitor command
_, found_target = run_with_monitoring(command, timeout, target_line)
self.assertTrue(found_target)


if __name__ == "__main__":
unittest.main()
Loading