diff --git a/tests/gearbox.py b/tests/gearbox.py index 1f98dc1b3a4..eafc7b2f35e 100644 --- a/tests/gearbox.py +++ b/tests/gearbox.py @@ -6,11 +6,55 @@ """ import json +import base64 class TestGearboxHelper: """Helper class for gearbox-related test operations.""" + @staticmethod + def load_gearbox_config(dvs): + """ + Load gearbox configuration from gearbox_config.json. + + Args: + dvs: Docker Virtual Switch instance + + Returns: + tuple: (config_path, config_dict) - Path to the config file and parsed JSON config + """ + # Resolve symlink to get actual config path + config_path = "/usr/share/sonic/hwsku/gearbox_config.json" + rc, actual_path = dvs.runcmd(f"readlink -f {config_path}") + if rc == 0 and actual_path.strip(): + config_path = actual_path.strip() + + # Read current config + rc, config_json = dvs.runcmd(f"cat {config_path}") + assert rc == 0, f"Failed to read gearbox_config.json from {config_path}" + config = json.loads(config_json) + + return config_path, config + + @staticmethod + def write_gearbox_config(dvs, config_path, config_dict): + """ + Write gearbox configuration to gearbox_config.json. + + Args: + dvs: Docker Virtual Switch instance + config_path: Path to the config file + config_dict: Configuration dictionary to write + + Returns: + None + """ + config_str = json.dumps(config_dict, indent=2) + encoded = base64.b64encode(config_str.encode('utf-8')).decode('utf-8') + cmd = f"python3 -c \"import base64; data = base64.b64decode('{encoded}').decode('utf-8'); open('{config_path}', 'w').write(data + '\\n')\"" + rc, _ = dvs.runcmd(cmd) + assert rc == 0, f"Failed to write modified config to {config_path}" + @staticmethod def get_first_gearbox_port(gearbox): """ @@ -37,7 +81,91 @@ def get_first_gearbox_port(gearbox): return port_name, phy_id @staticmethod - def configure_gearbox_macsec_support(dvs, gearbox, phy_id=None, macsec_supported=None): + def get_gearbox_port_by_phy(gearbox, phy_id): + """ + Get a port connected to a specific PHY from Gearbox object. + + Args: + gearbox: Gearbox fixture + phy_id: PHY ID to search for + + Returns: + tuple: (port_name, phy_id) - Port connected to the specified PHY + """ + for idx, intf in gearbox.interfaces.items(): + if int(intf.get("phy_id")) == phy_id: + port_name = intf.get("name") + assert port_name, f"Interface on PHY {phy_id} has no 'name' field" + return port_name, phy_id + + raise AssertionError(f"No interface found connected to PHY {phy_id}") + + @staticmethod + def reassign_interface_to_phy(dvs, interface_name, new_phy_id, macsec_supported=None, restart=False): + """ + Reassign an existing interface to a different PHY for testing multi-PHY scenarios. + + This creates a new PHY (if it doesn't exist) and reassigns the specified interface + to that PHY. The interface keeps its original lane configuration which is valid in VS. + + Args: + dvs: Docker Virtual Switch instance + interface_name: The interface to reassign (e.g., "Ethernet8") + new_phy_id: The PHY ID to reassign the interface to (e.g., 2) + macsec_supported: MACsec support for the new PHY (None=omit field, True, or False) + restart: If True, restart DVS after modifying the config (default: False) + + Returns: + None + """ + config_path, config = TestGearboxHelper.load_gearbox_config(dvs) + + # Check if PHY already exists, if not create it + phy_ids = [int(phy.get("phy_id")) for phy in config.get("phys", [])] + new_phy_id = int(new_phy_id) + if new_phy_id not in phy_ids: + # Get the first PHY as a template + assert len(config.get("phys", [])) > 0, "No PHYs found in config" + first_phy = config["phys"][0] + + # Create new PHY based on first PHY, reusing file paths that exist in VS + new_phy = { + "phy_id": new_phy_id, + "name": f"sesto-{new_phy_id}", + "address": f"0x{new_phy_id}000", + "lib_name": first_phy.get("lib_name"), # Reuse PHY 1's lib + "firmware_path": first_phy.get("firmware_path"), # Reuse PHY 1's firmware + "config_file": first_phy.get("config_file"), # Reuse PHY 1's config + "sai_init_config_file": first_phy.get("sai_init_config_file"), # Reuse PHY 1's init config + "phy_access": first_phy.get("phy_access", "mdio"), + "bus_id": first_phy.get("bus_id", 0), + "context_id": first_phy.get("context_id", 1), + "hwinfo": first_phy.get("hwinfo", "mdio0_0_0/0") # Reuse PHY 1's hwinfo + } + + # Set macsec_supported if specified + if macsec_supported is not None: + new_phy["macsec_supported"] = macsec_supported + + config["phys"].append(new_phy) + + # Find and reassign the interface (keep original lanes) + interface_found = False + for intf in config.get("interfaces", []): + if intf.get("name") == interface_name: + intf["phy_id"] = new_phy_id + interface_found = True + break + + assert interface_found, f"Interface {interface_name} not found in gearbox_config.json" + + TestGearboxHelper.write_gearbox_config(dvs, config_path, config) + + if restart: + dvs.restart() + + @staticmethod + def configure_gearbox_macsec_support(dvs, gearbox, phy_id=None, macsec_supported=None, restart=False): """ Configure MACsec support on a gearbox PHY by modifying gearbox_config.json and restarting DVS. @@ -53,6 +181,7 @@ def configure_gearbox_macsec_support(dvs, gearbox, phy_id=None, macsec_supported gearbox: Gearbox fixture phy_id: PHY ID (string, e.g., "1"). If None, uses the first PHY from Gearbox object. macsec_supported: None (remove field), True, or False + restart: If True, restart DVS after modifying the config (default: False) """ # If phy_id not provided, use the first PHY from Gearbox object if phy_id is None: @@ -60,16 +189,7 @@ def configure_gearbox_macsec_support(dvs, gearbox, phy_id=None, macsec_supported phy_id = next(iter(gearbox.phys)) print(f"No phy_id provided, using first PHY: {phy_id}") - # Resolve symlink to get actual config path - config_path = "/usr/share/sonic/hwsku/gearbox_config.json" - rc, actual_path = dvs.runcmd(f"readlink -f {config_path}") - if rc == 0 and actual_path.strip(): - config_path = actual_path.strip() - - # Read current config - rc, config_json = dvs.runcmd(f"cat {config_path}") - assert rc == 0, f"Failed to read gearbox_config.json from {config_path}" - config = json.loads(config_json) + config_path, config = TestGearboxHelper.load_gearbox_config(dvs) phy_id = int(phy_id) @@ -89,17 +209,7 @@ def configure_gearbox_macsec_support(dvs, gearbox, phy_id=None, macsec_supported assert phy_found, f"PHY {phy_id} not found in gearbox_config.json" - # Write modified config back using heredoc - config_str = json.dumps(config, indent=2) - heredoc = "__GEARBOX_JSON__" - rc, _ = dvs.runcmd( - "bash -lc 'cat > {path} <<\"{tag}\"\n{payload}\n{tag}\n'".format( - path=config_path, - tag=heredoc, - payload=config_str, - ) - ) - assert rc == 0, f"Failed to write modified config to {config_path}" + TestGearboxHelper.write_gearbox_config(dvs, config_path, config) - # Restart DVS to reload configuration - dvs.restart() + if restart: + dvs.restart() diff --git a/tests/macsec.py b/tests/macsec.py index 7501f9fb8f5..9c84371406e 100644 --- a/tests/macsec.py +++ b/tests/macsec.py @@ -141,45 +141,88 @@ def cleanup_macsec(dvs, port_name): print(f"Cleanup encountered error: {e}") @staticmethod - def verify_macsec_in_gb_asic_db(dvs, should_exist=True): + def verify_macsec_for_port_in_gb_asic_db(dvs, port_name, should_exist=True): """ - Verify MACsec objects exist (or don't exist) in GB_ASIC_DB + Verify MACsec objects for a specific port exist (or don't exist) in GB_ASIC_DB. + + This method checks if the specified port's MACsec configuration is present + in GB_ASIC_DB by mapping port name to line-side OID via GB_COUNTERS_DB and + checking if any MACSEC_PORT entry references that port OID. Args: dvs: Docker Virtual Switch instance + port_name: Name of the port to check (e.g., "Ethernet4") should_exist: True if objects should exist, False otherwise Returns: bool: True if verification passes """ - gb_asic_db = DVSDatabase(swsscommon.GB_ASIC_DB, dvs.redis_sock) - - macsec_keys = gb_asic_db.get_keys("ASIC_STATE:SAI_OBJECT_TYPE_MACSEC") + gb_counters_db = DVSDatabase(swsscommon.GB_COUNTERS_DB, dvs.redis_sock) + + # Get port's line-side OID from GB_COUNTERS_DB + # Gearbox ports are stored with "_line" suffix for line-side port + port_map = gb_counters_db.get_entry("COUNTERS_PORT_NAME_MAP", "") + line_port_key = f"{port_name}_line" + expected_port_oid = port_map.get(line_port_key) + + if not expected_port_oid: + # Port not found in GB_COUNTERS_DB (not a gearbox port) + return not should_exist + + # Check if any MACSEC_PORT entry references this port OID + macsec_port_keys = gb_asic_db.get_keys("ASIC_STATE:SAI_OBJECT_TYPE_MACSEC_PORT") + port_found = any( + gb_asic_db.get_entry("ASIC_STATE:SAI_OBJECT_TYPE_MACSEC_PORT", key).get( + "SAI_MACSEC_PORT_ATTR_PORT_ID" + ) == expected_port_oid + for key in macsec_port_keys + ) if should_exist: - return len(macsec_keys) > 0 # Should have at least one object + return port_found else: - return len(macsec_keys) == 0 # Should have no objects + return not port_found @staticmethod - def verify_macsec_in_asic_db(dvs, should_exist=True): + def verify_macsec_for_port_in_asic_db(dvs, port_name, should_exist=True): """ - Verify MACsec objects exist (or don't exist) in ASIC_DB (NPU) + Verify MACsec objects for a specific port exist (or don't exist) in ASIC_DB (NPU). + + This method checks if the specified port's MACsec configuration is present + in ASIC_DB by mapping port name to OID via COUNTERS_DB and checking if any + MACSEC_PORT entry references that port OID. Args: dvs: Docker Virtual Switch instance + port_name: Name of the port to check (e.g., "Ethernet0") should_exist: True if objects should exist, False otherwise Returns: bool: True if verification passes """ asic_db = dvs.get_asic_db() + counters_db = dvs.get_counters_db() + + # Get port OID from COUNTERS_DB port name map + port_map = counters_db.get_entry("COUNTERS_PORT_NAME_MAP", "") + expected_port_oid = port_map.get(port_name) + + if not expected_port_oid: + # Port not found in COUNTERS_DB + return not should_exist - macsec_keys = asic_db.get_keys("ASIC_STATE:SAI_OBJECT_TYPE_MACSEC") + # Check if any MACSEC_PORT entry references this port OID + macsec_port_keys = asic_db.get_keys("ASIC_STATE:SAI_OBJECT_TYPE_MACSEC_PORT") + port_found = any( + asic_db.get_entry("ASIC_STATE:SAI_OBJECT_TYPE_MACSEC_PORT", key).get( + "SAI_MACSEC_PORT_ATTR_PORT_ID" + ) == expected_port_oid + for key in macsec_port_keys + ) if should_exist: - return len(macsec_keys) > 0 + return port_found else: - return len(macsec_keys) == 0 + return not port_found diff --git a/tests/test_macsec_gearbox.py b/tests/test_macsec_gearbox.py index 2d0c4ec71bf..0d1cd908841 100644 --- a/tests/test_macsec_gearbox.py +++ b/tests/test_macsec_gearbox.py @@ -49,16 +49,16 @@ def test_macsec_phy_switch_default(self, dvs, gearbox, gearbox_config): port_name, phy_id = TestGearboxHelper.get_first_gearbox_port(gearbox) try: - TestGearboxHelper.configure_gearbox_macsec_support(dvs, gearbox, phy_id=phy_id, macsec_supported=None) + TestGearboxHelper.configure_gearbox_macsec_support(dvs, gearbox, phy_id=phy_id, macsec_supported=None, restart=True) TestMacsecHelper.enable_macsec_on_port(dvs, port_name=port_name, with_secure_channels=True) - assert TestMacsecHelper.verify_macsec_in_gb_asic_db(dvs, should_exist=True), ( - "FAILED: MACsec objects should exist in GB_ASIC_DB " + assert TestMacsecHelper.verify_macsec_for_port_in_gb_asic_db(dvs, port_name, should_exist=True), ( + f"FAILED: MACsec objects for {port_name} should exist in GB_ASIC_DB " "when macsec_supported is absent" ) - assert TestMacsecHelper.verify_macsec_in_asic_db(dvs, should_exist=False), ( - "FAILED: MACsec objects should NOT exist in ASIC_DB " + assert TestMacsecHelper.verify_macsec_for_port_in_asic_db(dvs, port_name, should_exist=False), ( + f"FAILED: MACsec objects for {port_name} should NOT exist in ASIC_DB " "when using PHY backend" ) @@ -81,16 +81,16 @@ def test_macsec_phy_switch_explicit(self, dvs, gearbox, gearbox_config): port_name, phy_id = TestGearboxHelper.get_first_gearbox_port(gearbox) try: - TestGearboxHelper.configure_gearbox_macsec_support(dvs, gearbox, phy_id=phy_id, macsec_supported=True) + TestGearboxHelper.configure_gearbox_macsec_support(dvs, gearbox, phy_id=phy_id, macsec_supported=True, restart=True) TestMacsecHelper.enable_macsec_on_port(dvs, port_name=port_name, with_secure_channels=True) - assert TestMacsecHelper.verify_macsec_in_gb_asic_db(dvs, should_exist=True), ( - "FAILED: MACsec objects should exist in GB_ASIC_DB " + assert TestMacsecHelper.verify_macsec_for_port_in_gb_asic_db(dvs, port_name, should_exist=True), ( + f"FAILED: MACsec objects for {port_name} should exist in GB_ASIC_DB " "when macsec_supported=true" ) - assert TestMacsecHelper.verify_macsec_in_asic_db(dvs, should_exist=False), ( - "FAILED: MACsec objects should NOT exist in ASIC_DB " + assert TestMacsecHelper.verify_macsec_for_port_in_asic_db(dvs, port_name, should_exist=False), ( + f"FAILED: MACsec objects for {port_name} should NOT exist in ASIC_DB " "when using PHY backend" ) @@ -115,19 +115,82 @@ def test_macsec_npu_switch(self, dvs, gearbox, gearbox_config): try: # Setup gearbox with macsec_supported=false - TestGearboxHelper.configure_gearbox_macsec_support(dvs, gearbox, phy_id=phy_id, macsec_supported=False) + TestGearboxHelper.configure_gearbox_macsec_support(dvs, gearbox, phy_id=phy_id, macsec_supported=False, restart=True) TestMacsecHelper.enable_macsec_on_port(dvs, port_name=port_name, with_secure_channels=True) - assert TestMacsecHelper.verify_macsec_in_asic_db(dvs, should_exist=True), ( - "FAILED: MACsec objects should exist in ASIC_DB " + assert TestMacsecHelper.verify_macsec_for_port_in_asic_db(dvs, port_name, should_exist=True), ( + f"FAILED: MACsec objects for {port_name} should exist in ASIC_DB " "when macsec_supported=false" ) - assert TestMacsecHelper.verify_macsec_in_gb_asic_db(dvs, should_exist=False), ( - "FAILED: MACsec objects should NOT exist in GB_ASIC_DB " + assert TestMacsecHelper.verify_macsec_for_port_in_gb_asic_db(dvs, port_name, should_exist=False), ( + f"FAILED: MACsec objects for {port_name} should NOT exist in GB_ASIC_DB " "when macsec_supported=false" ) finally: TestMacsecHelper.cleanup_macsec(dvs, port_name) + def test_macsec_mixed_phy_support(self, dvs, gearbox, gearbox_config): + """ + Test mixed MACsec support across multiple PHYs. + + This test validates the scenario where a platform owner enables macsec_supported + only for some gearbox PHYs, not all of them. + + Args: + dvs: Docker Virtual Switch instance (pytest fixture) + gearbox: Gearbox fixture + gearbox_config: Gearbox config fixture (auto backup/restore) + """ + # Reassign last interface from PHY 1 to PHY 2 + phy1_interfaces = [ + intf.get("name") for intf in gearbox.interfaces.values() + if str(intf.get("phy_id")) == "1" + ] + interface_to_reassign = phy1_interfaces[-1] + + # Reassign interface to PHY 2 with macsec_supported=False + TestGearboxHelper.reassign_interface_to_phy( + dvs, interface_name=interface_to_reassign, new_phy_id=2, macsec_supported=False + ) + + # Set macsec_supported=True for PHY 1 and restart DVS + TestGearboxHelper.configure_gearbox_macsec_support( + dvs, gearbox, phy_id=1, macsec_supported=True, restart=True + ) + + gearbox_reloaded = Gearbox(dvs) + + port1_name, _ = TestGearboxHelper.get_gearbox_port_by_phy(gearbox_reloaded, phy_id=1) + port2_name = interface_to_reassign + + try: + TestMacsecHelper.enable_macsec_on_port(dvs, port_name=port1_name, with_secure_channels=True) + TestMacsecHelper.enable_macsec_on_port(dvs, port_name=port2_name, with_secure_channels=True) + + # Verify MACsec keys are created in correct databases + # PHY 1 (macsec_supported=true) -> keys should be in GB_ASIC_DB ONLY + assert TestMacsecHelper.verify_macsec_for_port_in_gb_asic_db(dvs, port1_name, should_exist=True), ( + f"FAILED: MACsec keys for {port1_name} (PHY 1) should exist in GB_ASIC_DB " + "when macsec_supported=true" + ) + assert TestMacsecHelper.verify_macsec_for_port_in_asic_db(dvs, port1_name, should_exist=False), ( + f"FAILED: MACsec keys for {port1_name} (PHY 1) should NOT exist in ASIC_DB " + "when macsec_supported=true (should be in GB_ASIC_DB only)" + ) + + # PHY 2 (macsec_supported=false) -> keys should be in ASIC_DB ONLY + assert TestMacsecHelper.verify_macsec_for_port_in_asic_db(dvs, port2_name, should_exist=True), ( + f"FAILED: MACsec keys for {port2_name} (PHY 2) should exist in ASIC_DB " + "when macsec_supported=false" + ) + assert TestMacsecHelper.verify_macsec_for_port_in_gb_asic_db(dvs, port2_name, should_exist=False), ( + f"FAILED: MACsec keys for {port2_name} (PHY 2) should NOT exist in GB_ASIC_DB " + "when macsec_supported=false (should be in ASIC_DB only)" + ) + + finally: + TestMacsecHelper.cleanup_macsec(dvs, port1_name) + TestMacsecHelper.cleanup_macsec(dvs, port2_name) +