Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions workflows/robotic_ultrasound/tests/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from unittest import skipUnless


def requires_rti(func):
RTI_AVAILABLE = bool(os.getenv("RTI_LICENSE_FILE") and os.path.exists(os.getenv("RTI_LICENSE_FILE")))
return skipUnless(RTI_AVAILABLE, "RTI Connext DDS is not installed or license not found")(func)
16 changes: 4 additions & 12 deletions workflows/robotic_ultrasound/tests/test_dds/test_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import time
import unittest
from unittest import skipUnless

import rti.connextdds as dds # noqa: F401
import rti.idl as idl
from dds.publisher import Publisher

try:
import rti.connextdds as dds # noqa: F401
import rti.idl as idl

license_path = os.getenv("RTI_LICENSE_FILE")
RTI_AVAILABLE = bool(license_path and os.path.exists(license_path))
except ImportError:
RTI_AVAILABLE = False
from helpers import requires_rti


@idl.struct
Expand All @@ -36,7 +28,7 @@ class _TestData:
message: str = ""


@skipUnless(RTI_AVAILABLE, "RTI Connext DDS is not installed or license not found")
@requires_rti
class TestPublisher(unittest.TestCase):
class TestDataPublisher(Publisher):
"""Concrete implementation of Publisher for testing"""
Expand Down
20 changes: 6 additions & 14 deletions workflows/robotic_ultrasound/tests/test_dds/test_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import queue
import time
import unittest
from unittest import skipUnless

import rti.connextdds as dds # noqa: F401
import rti.idl as idl
from dds.subscriber import Subscriber, SubscriberWithCallback, SubscriberWithQueue

try:
import rti.connextdds as dds # noqa: F401
import rti.idl as idl

license_path = os.getenv("RTI_LICENSE_FILE")
RTI_AVAILABLE = bool(license_path and os.path.exists(license_path))
except ImportError:
RTI_AVAILABLE = False
from helpers import requires_rti


@idl.struct
Expand All @@ -44,7 +36,7 @@ def consume(self, data) -> None:
return data


@skipUnless(RTI_AVAILABLE, "RTI Connext DDS is not installed or license not found")
@requires_rti
class TestDDSSubscriber(unittest.TestCase):
def setUp(self):
"""Set up test fixtures before each test method"""
Expand Down Expand Up @@ -94,7 +86,7 @@ def test_start_stop(self):
self.assertIsNone(self.subscriber.stop_event)


@skipUnless(RTI_AVAILABLE, "RTI Connext DDS is not installed or license not found")
@requires_rti
class TestSubscriberWithQueue(unittest.TestCase):
def setUp(self):
self.domain_id = 100
Expand Down Expand Up @@ -140,7 +132,7 @@ def test_read_data(self):
self.fail("No data received after multiple retries")


@skipUnless(RTI_AVAILABLE, "RTI Connext DDS is not installed or license not found")
@requires_rti
class TestSubscriberWithCallback(unittest.TestCase):
def setUp(self):
self.domain_id = 200 # Use a unique domain ID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import unittest
from importlib.util import find_spec
from unittest import skipUnless

from helpers import requires_rti
from holoscan_apps.realsense.camera import RealsenseApp
from holoscan_apps.realsense.enumerate import count_devices

try:
RTI_AVAILABLE = bool(find_spec("rti.connextdds"))
if RTI_AVAILABLE:
license_path = os.getenv("RTI_LICENSE_FILE")
RTI_AVAILABLE = bool(license_path and os.path.exists(license_path))
except ImportError:
RTI_AVAILABLE = False

DEVICES_AVAILABLE = count_devices() > 0


@skipUnless(RTI_AVAILABLE, "RTI Connext DDS is not installed or license not found")
@requires_rti
@skipUnless(DEVICES_AVAILABLE, "No Intel RealSense devices found")
class TestRealSenseCamera(unittest.TestCase):
def test_realsense_camera(self):
Expand Down
12 changes: 2 additions & 10 deletions workflows/robotic_ultrasound/tests/test_policy_runner/test_pi0.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,16 @@
import os
import time
import unittest
from unittest import skipUnless

import numpy as np
from dds.publisher import Publisher
from dds.schemas.camera_info import CameraInfo
from dds.schemas.franka_ctrl import FrankaCtrlInput
from dds.schemas.franka_info import FrankaInfo
from dds.subscriber import SubscriberWithCallback
from helpers import requires_rti
from PIL import Image

try:
import rti.connextdds as dds # noqa: F401

license_path = os.getenv("RTI_LICENSE_FILE")
RTI_AVAILABLE = bool(license_path and os.path.exists(license_path))
except ImportError:
RTI_AVAILABLE = False

"""
Must execute the pi0 policy runner in another process before execute this test.
Ensure to set the `height=480` and `width=640` in the`run_policy.py`.
Expand Down Expand Up @@ -80,7 +72,7 @@ def produce(self, dt: float, sim_time: float):
return output


@skipUnless(RTI_AVAILABLE, "RTI Connext DDS is not installed or license not found")
@requires_rti
class TestRunPI0Policy(unittest.TestCase):
def setUp(self):
def cb(topic, data):
Expand Down
13 changes: 2 additions & 11 deletions workflows/robotic_ultrasound/tests/test_simulation/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

import os
import unittest
from importlib.util import find_spec
from unittest import skipUnless
from unittest.mock import MagicMock

from dds.publisher import Publisher
Expand All @@ -27,6 +25,7 @@

simulation_app = SimulationApp({"headless": True})
import omni.usd
from helpers import requires_rti
from pxr import Usd
from simulation.annotators.base import Annotator

Expand All @@ -35,16 +34,8 @@
("mixed_none_publishers", [None, MagicMock(spec=Publisher), None], [MagicMock(spec=Subscriber)], 1, 1, 0),
]

try:
RTI_AVAILABLE = bool(find_spec("rti.connextdds"))
if RTI_AVAILABLE:
license_path = os.getenv("RTI_LICENSE_FILE")
RTI_AVAILABLE = bool(license_path and os.path.exists(license_path))
except ImportError:
RTI_AVAILABLE = False


@skipUnless(RTI_AVAILABLE, "RTI Connext DDS is not installed or license not found")
@requires_rti
class TestAnnotator(unittest.TestCase):
@classmethod
def setUpClass(cls):
Expand Down
13 changes: 2 additions & 11 deletions workflows/robotic_ultrasound/tests/test_simulation/test_camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@

import os
import unittest
from importlib.util import find_spec
from unittest import skipUnless

import numpy as np
from dds.schemas.camera_ctrl import CameraCtrlInput
from dds.schemas.camera_info import CameraInfo
from helpers import requires_rti
from isaacsim import SimulationApp
from parameterized import parameterized
from simulation.utils.assets import robotic_ultrasound_assets as robot_us_assets
Expand All @@ -30,16 +29,8 @@
import omni.usd
from simulation.annotators.camera import CameraPublisher, CameraSubscriber

try:
RTI_AVAILABLE = bool(find_spec("rti.connextdds"))
if RTI_AVAILABLE:
license_path = os.getenv("RTI_LICENSE_FILE")
RTI_AVAILABLE = bool(license_path and os.path.exists(license_path))
except ImportError:
RTI_AVAILABLE = False


@skipUnless(RTI_AVAILABLE, "RTI Connext DDS is not installed or license not found")
@requires_rti
class TestCameraBase(unittest.TestCase):
@classmethod
def setUpClass(cls):
Expand Down
13 changes: 2 additions & 11 deletions workflows/robotic_ultrasound/tests/test_simulation/test_franka.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@

import os
import unittest
from importlib.util import find_spec
from unittest import skipUnless

import numpy as np
from dds.schemas.franka_ctrl import FrankaCtrlInput
from dds.schemas.franka_info import FrankaInfo
from helpers import requires_rti
from isaacsim import SimulationApp
from simulation.utils.assets import robotic_ultrasound_assets as robot_us_assets

Expand All @@ -31,16 +30,8 @@
from omni.isaac.core.robots import Robot
from simulation.annotators.franka import FrankaPublisher, FrankaSubscriber

try:
RTI_AVAILABLE = bool(find_spec("rti.connextdds"))
if RTI_AVAILABLE:
license_path = os.getenv("RTI_LICENSE_FILE")
RTI_AVAILABLE = bool(license_path and os.path.exists(license_path))
except ImportError:
RTI_AVAILABLE = False


@skipUnless(RTI_AVAILABLE, "RTI Connext DDS is not installed or license not found")
@requires_rti
class TestFrankaBase(unittest.TestCase):
@classmethod
def setUpClass(cls):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import time
import unittest
from unittest import skipUnless

import numpy as np
from dds.schemas.camera_info import CameraInfo
from dds.schemas.franka_info import FrankaInfo
from dds.subscriber import SubscriberWithCallback

try:
import rti.connextdds as dds # noqa: F401

license_path = os.getenv("RTI_LICENSE_FILE")
RTI_AVAILABLE = bool(license_path and os.path.exists(license_path))
except ImportError:
RTI_AVAILABLE = False
from helpers import requires_rti

"""
Must immediately execute the `sim_with_dds.py` in another process after executing this test.

"""


@skipUnless(RTI_AVAILABLE, "RTI Connext DDS is not installed or license not found")
@requires_rti
class TestSimWithDDS(unittest.TestCase):
def setUp(self):
topic1 = "topic_room_camera_data_rgb"
Expand Down
13 changes: 2 additions & 11 deletions workflows/robotic_ultrasound/tests/test_simulation/test_target.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@

import os
import unittest
from importlib.util import find_spec
from unittest import skipUnless

from dds.schemas.target_ctrl import TargetCtrlInput
from dds.schemas.target_info import TargetInfo
from helpers import requires_rti
from isaacsim import SimulationApp
from simulation.utils.assets import robotic_ultrasound_assets as robot_us_assets

Expand All @@ -28,16 +27,8 @@
import omni.usd
from simulation.annotators.target import TargetPublisher, TargetSubscriber

try:
RTI_AVAILABLE = bool(find_spec("rti.connextdds"))
if RTI_AVAILABLE:
license_path = os.getenv("RTI_LICENSE_FILE")
RTI_AVAILABLE = bool(license_path and os.path.exists(license_path))
except ImportError:
RTI_AVAILABLE = False


@skipUnless(RTI_AVAILABLE, "RTI Connext DDS is not installed or license not found")
@requires_rti
class TestTargetBase(unittest.TestCase):
@classmethod
def setUpClass(cls):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,18 @@

import os
import unittest
from importlib.util import find_spec
from unittest import skipUnless

from dds.schemas.usp_info import UltraSoundProbeInfo
from helpers import requires_rti
from isaacsim import SimulationApp
from simulation.utils.assets import robotic_ultrasound_assets as robot_us_assets

simulation_app = SimulationApp({"headless": True})
import omni.usd
from simulation.annotators.ultrasound import UltraSoundPublisher

try:
RTI_AVAILABLE = bool(find_spec("rti.connextdds"))
if RTI_AVAILABLE:
license_path = os.getenv("RTI_LICENSE_FILE")
RTI_AVAILABLE = bool(license_path and os.path.exists(license_path))
except ImportError:
RTI_AVAILABLE = False


@skipUnless(RTI_AVAILABLE, "RTI Connext DDS is not installed or license not found")
@requires_rti
class TestUltraSoundBase(unittest.TestCase):
@classmethod
def setUpClass(cls):
Expand Down
Loading
Loading