Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 134 additions & 24 deletions tests/gearbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,55 @@
"""

import json
import base64


class TestGearboxHelper:
"""Helper class for gearbox-related test operations."""

@staticmethod
def load_gearbox_config(dvs):
"""
Load gearbox configuration from gearbox_config.json.

Args:
dvs: Docker Virtual Switch instance

Returns:
tuple: (config_path, config_dict) - Path to the config file and parsed JSON config
"""
# Resolve symlink to get actual config path
config_path = "/usr/share/sonic/hwsku/gearbox_config.json"
rc, actual_path = dvs.runcmd(f"readlink -f {config_path}")
if rc == 0 and actual_path.strip():
config_path = actual_path.strip()

# Read current config
rc, config_json = dvs.runcmd(f"cat {config_path}")
assert rc == 0, f"Failed to read gearbox_config.json from {config_path}"
config = json.loads(config_json)

return config_path, config

@staticmethod
def write_gearbox_config(dvs, config_path, config_dict):
"""
Write gearbox configuration to gearbox_config.json.

Args:
dvs: Docker Virtual Switch instance
config_path: Path to the config file
config_dict: Configuration dictionary to write

Returns:
None
"""
config_str = json.dumps(config_dict, indent=2)
encoded = base64.b64encode(config_str.encode('utf-8')).decode('utf-8')
cmd = f"python3 -c \"import base64; data = base64.b64decode('{encoded}').decode('utf-8'); open('{config_path}', 'w').write(data + '\\n')\""
rc, _ = dvs.runcmd(cmd)
assert rc == 0, f"Failed to write modified config to {config_path}"

@staticmethod
def get_first_gearbox_port(gearbox):
"""
Expand All @@ -37,7 +81,91 @@ def get_first_gearbox_port(gearbox):
return port_name, phy_id

@staticmethod
def configure_gearbox_macsec_support(dvs, gearbox, phy_id=None, macsec_supported=None):
def get_gearbox_port_by_phy(gearbox, phy_id):
"""
Get a port connected to a specific PHY from Gearbox object.

Args:
gearbox: Gearbox fixture
phy_id: PHY ID to search for

Returns:
tuple: (port_name, phy_id) - Port connected to the specified PHY
"""
for idx, intf in gearbox.interfaces.items():
if int(intf.get("phy_id")) == phy_id:
port_name = intf.get("name")
assert port_name, f"Interface on PHY {phy_id} has no 'name' field"
return port_name, phy_id

raise AssertionError(f"No interface found connected to PHY {phy_id}")

@staticmethod
def reassign_interface_to_phy(dvs, interface_name, new_phy_id, macsec_supported=None, restart=False):
"""
Reassign an existing interface to a different PHY for testing multi-PHY scenarios.

This creates a new PHY (if it doesn't exist) and reassigns the specified interface
to that PHY. The interface keeps its original lane configuration which is valid in VS.

Args:
dvs: Docker Virtual Switch instance
interface_name: The interface to reassign (e.g., "Ethernet8")
new_phy_id: The PHY ID to reassign the interface to (e.g., 2)
macsec_supported: MACsec support for the new PHY (None=omit field, True, or False)
restart: If True, restart DVS after modifying the config (default: False)

Returns:
None
"""
config_path, config = TestGearboxHelper.load_gearbox_config(dvs)

# Check if PHY already exists, if not create it
phy_ids = [int(phy.get("phy_id")) for phy in config.get("phys", [])]
new_phy_id = int(new_phy_id)
if new_phy_id not in phy_ids:
# Get the first PHY as a template
assert len(config.get("phys", [])) > 0, "No PHYs found in config"
first_phy = config["phys"][0]

# Create new PHY based on first PHY, reusing file paths that exist in VS
new_phy = {
"phy_id": new_phy_id,
"name": f"sesto-{new_phy_id}",
"address": f"0x{new_phy_id}000",
"lib_name": first_phy.get("lib_name"), # Reuse PHY 1's lib
"firmware_path": first_phy.get("firmware_path"), # Reuse PHY 1's firmware
"config_file": first_phy.get("config_file"), # Reuse PHY 1's config
"sai_init_config_file": first_phy.get("sai_init_config_file"), # Reuse PHY 1's init config
"phy_access": first_phy.get("phy_access", "mdio"),
"bus_id": first_phy.get("bus_id", 0),
"context_id": first_phy.get("context_id", 1),
"hwinfo": first_phy.get("hwinfo", "mdio0_0_0/0") # Reuse PHY 1's hwinfo
}

# Set macsec_supported if specified
if macsec_supported is not None:
new_phy["macsec_supported"] = macsec_supported

config["phys"].append(new_phy)

# Find and reassign the interface (keep original lanes)
interface_found = False
for intf in config.get("interfaces", []):
if intf.get("name") == interface_name:
intf["phy_id"] = new_phy_id
interface_found = True
break

assert interface_found, f"Interface {interface_name} not found in gearbox_config.json"

TestGearboxHelper.write_gearbox_config(dvs, config_path, config)

if restart:
dvs.restart()

@staticmethod
def configure_gearbox_macsec_support(dvs, gearbox, phy_id=None, macsec_supported=None, restart=False):
"""
Configure MACsec support on a gearbox PHY by modifying gearbox_config.json and restarting DVS.

Expand All @@ -53,23 +181,15 @@ def configure_gearbox_macsec_support(dvs, gearbox, phy_id=None, macsec_supported
gearbox: Gearbox fixture
phy_id: PHY ID (string, e.g., "1"). If None, uses the first PHY from Gearbox object.
macsec_supported: None (remove field), True, or False
restart: If True, restart DVS after modifying the config (default: False)
"""
# If phy_id not provided, use the first PHY from Gearbox object
if phy_id is None:
assert len(gearbox.phys) > 0, "No PHYs found in gearbox"
phy_id = next(iter(gearbox.phys))
print(f"No phy_id provided, using first PHY: {phy_id}")

# Resolve symlink to get actual config path
config_path = "/usr/share/sonic/hwsku/gearbox_config.json"
rc, actual_path = dvs.runcmd(f"readlink -f {config_path}")
if rc == 0 and actual_path.strip():
config_path = actual_path.strip()

# Read current config
rc, config_json = dvs.runcmd(f"cat {config_path}")
assert rc == 0, f"Failed to read gearbox_config.json from {config_path}"
config = json.loads(config_json)
config_path, config = TestGearboxHelper.load_gearbox_config(dvs)

phy_id = int(phy_id)

Expand All @@ -89,17 +209,7 @@ def configure_gearbox_macsec_support(dvs, gearbox, phy_id=None, macsec_supported

assert phy_found, f"PHY {phy_id} not found in gearbox_config.json"

# Write modified config back using heredoc
config_str = json.dumps(config, indent=2)
heredoc = "__GEARBOX_JSON__"
rc, _ = dvs.runcmd(
"bash -lc 'cat > {path} <<\"{tag}\"\n{payload}\n{tag}\n'".format(
path=config_path,
tag=heredoc,
payload=config_str,
)
)
assert rc == 0, f"Failed to write modified config to {config_path}"
TestGearboxHelper.write_gearbox_config(dvs, config_path, config)

# Restart DVS to reload configuration
dvs.restart()
if restart:
dvs.restart()
67 changes: 55 additions & 12 deletions tests/macsec.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,45 +141,88 @@ def cleanup_macsec(dvs, port_name):
print(f"Cleanup encountered error: {e}")

@staticmethod
def verify_macsec_in_gb_asic_db(dvs, should_exist=True):
def verify_macsec_for_port_in_gb_asic_db(dvs, port_name, should_exist=True):
"""
Verify MACsec objects exist (or don't exist) in GB_ASIC_DB
Verify MACsec objects for a specific port exist (or don't exist) in GB_ASIC_DB.

This method checks if the specified port's MACsec configuration is present
in GB_ASIC_DB by mapping port name to line-side OID via GB_COUNTERS_DB and
checking if any MACSEC_PORT entry references that port OID.

Args:
dvs: Docker Virtual Switch instance
port_name: Name of the port to check (e.g., "Ethernet4")
should_exist: True if objects should exist, False otherwise

Returns:
bool: True if verification passes
"""

gb_asic_db = DVSDatabase(swsscommon.GB_ASIC_DB, dvs.redis_sock)

macsec_keys = gb_asic_db.get_keys("ASIC_STATE:SAI_OBJECT_TYPE_MACSEC")
gb_counters_db = DVSDatabase(swsscommon.GB_COUNTERS_DB, dvs.redis_sock)

# Get port's line-side OID from GB_COUNTERS_DB
# Gearbox ports are stored with "_line" suffix for line-side port
port_map = gb_counters_db.get_entry("COUNTERS_PORT_NAME_MAP", "")
line_port_key = f"{port_name}_line"
expected_port_oid = port_map.get(line_port_key)

if not expected_port_oid:
# Port not found in GB_COUNTERS_DB (not a gearbox port)
return not should_exist

# Check if any MACSEC_PORT entry references this port OID
macsec_port_keys = gb_asic_db.get_keys("ASIC_STATE:SAI_OBJECT_TYPE_MACSEC_PORT")
port_found = any(
gb_asic_db.get_entry("ASIC_STATE:SAI_OBJECT_TYPE_MACSEC_PORT", key).get(
"SAI_MACSEC_PORT_ATTR_PORT_ID"
) == expected_port_oid
for key in macsec_port_keys
)

if should_exist:
return len(macsec_keys) > 0 # Should have at least one object
return port_found
else:
return len(macsec_keys) == 0 # Should have no objects
return not port_found

@staticmethod
def verify_macsec_in_asic_db(dvs, should_exist=True):
def verify_macsec_for_port_in_asic_db(dvs, port_name, should_exist=True):
"""
Verify MACsec objects exist (or don't exist) in ASIC_DB (NPU)
Verify MACsec objects for a specific port exist (or don't exist) in ASIC_DB (NPU).

This method checks if the specified port's MACsec configuration is present
in ASIC_DB by mapping port name to OID via COUNTERS_DB and checking if any
MACSEC_PORT entry references that port OID.

Args:
dvs: Docker Virtual Switch instance
port_name: Name of the port to check (e.g., "Ethernet0")
should_exist: True if objects should exist, False otherwise

Returns:
bool: True if verification passes
"""
asic_db = dvs.get_asic_db()
counters_db = dvs.get_counters_db()

# Get port OID from COUNTERS_DB port name map
port_map = counters_db.get_entry("COUNTERS_PORT_NAME_MAP", "")
expected_port_oid = port_map.get(port_name)

if not expected_port_oid:
# Port not found in COUNTERS_DB
return not should_exist

macsec_keys = asic_db.get_keys("ASIC_STATE:SAI_OBJECT_TYPE_MACSEC")
# Check if any MACSEC_PORT entry references this port OID
macsec_port_keys = asic_db.get_keys("ASIC_STATE:SAI_OBJECT_TYPE_MACSEC_PORT")
port_found = any(
asic_db.get_entry("ASIC_STATE:SAI_OBJECT_TYPE_MACSEC_PORT", key).get(
"SAI_MACSEC_PORT_ATTR_PORT_ID"
) == expected_port_oid
for key in macsec_port_keys
)

if should_exist:
return len(macsec_keys) > 0
return port_found
else:
return len(macsec_keys) == 0
return not port_found

Loading
Loading