Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 25 additions & 21 deletions config/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1168,25 +1168,27 @@ def interface_has_mirror_config(ctx, mirror_table, dst_port, src_port, direction


def is_port_mirror_capability_supported(direction, namespace=None):
""" Check if port mirror capability is supported for the given direction """
""" Check if port mirror capability is supported for the given direction.

PORT_INGRESS_MIRROR_CAPABLE / PORT_EGRESS_MIRROR_CAPABLE only apply to SPAN
(port mirror) sessions. Callers should not invoke this for ERSPAN sessions.
Absent STATE_DB keys (None) are treated as supported for backward compatibility
with platforms that do not populate the SWITCH_CAPABILITY table.
"""
state_db = SonicV2Connector(use_unix_socket_path=True, namespace=namespace)
state_db.connect(state_db.STATE_DB, False)
entry_name = "SWITCH_CAPABILITY|switch"

# If no direction is specified, check both ingress and egress capabilities
if not direction:
ingress_supported = state_db.get(state_db.STATE_DB, entry_name, "PORT_INGRESS_MIRROR_CAPABLE")
egress_supported = state_db.get(state_db.STATE_DB, entry_name, "PORT_EGRESS_MIRROR_CAPABLE")
return ingress_supported == "true" and egress_supported == "true"
directions_to_check = []
if not direction or direction in ['rx', 'both']:
directions_to_check.append("PORT_INGRESS_MIRROR_CAPABLE")
if not direction or direction in ['tx', 'both']:
directions_to_check.append("PORT_EGRESS_MIRROR_CAPABLE")

if direction in ['rx', 'both']:
ingress_supported = state_db.get(state_db.STATE_DB, entry_name, "PORT_INGRESS_MIRROR_CAPABLE")
if ingress_supported != "true":
return False

if direction in ['tx', 'both']:
egress_supported = state_db.get(state_db.STATE_DB, entry_name, "PORT_EGRESS_MIRROR_CAPABLE")
if egress_supported != "true":
for capability_key in directions_to_check:
value = state_db.get(state_db.STATE_DB, entry_name, capability_key)
# Treat absent key (None) as supported; only reject explicit "false"
if value is not None and value != "true":
return False

return True
Expand Down Expand Up @@ -1241,13 +1243,15 @@ def validate_mirror_session_config(config_db, session_name, dst_port, src_port,
if direction not in ['rx', 'tx', 'both']:
ctx.fail("Error: Direction {} is invalid".format(direction))

# Check port mirror capability before allowing configuration
# If direction is provided, check the specific direction

for ns in namespace_set:
if not is_port_mirror_capability_supported(direction, namespace=ns):
ctx.fail("Error: Port mirror direction '{}' is not supported by the ASIC".format(
direction if direction else 'both'))
# Check port mirror capability before allowing configuration.
# ERSPAN sessions (dst_port=None) use src/dst IPs, not ports; the
# PORT_INGRESS/EGRESS_MIRROR_CAPABLE flags only apply to SPAN sessions.
is_erspan = dst_port is None
if not is_erspan:
for ns in namespace_set:
if not is_port_mirror_capability_supported(direction, namespace=ns):
ctx.fail("Error: Port mirror direction '{}' is not supported by the ASIC".format(
direction if direction else 'both'))

return True

Expand Down
34 changes: 31 additions & 3 deletions tests/config_mirror_session_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,18 @@ def test_mirror_session_capability_checking():
assert result.exit_code != 0
assert "Error: Port mirror direction 'both' is not supported by the ASIC" in result.output

# Test 2: ERSPAN sessions bypass capability check even when capability returns False
with mock.patch('config.main.is_port_mirror_capability_supported') as mock_capability:
mock_capability.return_value = False

result = runner.invoke(
config.config.commands["mirror_session"].commands["erspan"].commands["add"],
["test_erspan", "1.1.1.1", "2.2.2.2", "8", "64", "0x88be"])

# ERSPAN should not be blocked by port mirror capability
assert "is not supported by the ASIC" not in result.output
mock_capability.assert_not_called()


def test_mirror_session_capability_function():
"""Test the is_port_mirror_capability_supported function directly"""
Expand Down Expand Up @@ -450,9 +462,9 @@ def test_mirror_session_capability_function():
result = config.is_port_mirror_capability_supported("both")
assert result is False

# Test no direction (should fail)
# Test no direction (checks both ingress and egress)
result = config.is_port_mirror_capability_supported(None)
assert result is False
assert result is False # egress is "false", so fails

# Test 3: Test with no capability support
with mock.patch('config.main.SonicV2Connector') as mock_connector:
Expand All @@ -468,8 +480,24 @@ def test_mirror_session_capability_function():
("SWITCH_CAPABILITY|switch", "PORT_EGRESS_MIRROR_CAPABLE"): "false"
}.get((entry, field), "false")

# All directions should fail
# SPAN directions should fail when explicitly set to "false"
assert config.is_port_mirror_capability_supported("rx") is False
assert config.is_port_mirror_capability_supported("tx") is False
assert config.is_port_mirror_capability_supported("both") is False
# direction=None checks both; both are "false" so fails
assert config.is_port_mirror_capability_supported(None) is False

# Test 4: Test with absent capability keys (None returned from STATE_DB)
with mock.patch('config.main.SonicV2Connector') as mock_connector:
mock_instance = mock.Mock()
mock_connector.return_value = mock_instance
mock_instance.connect.return_value = None

# Simulate keys absent from STATE_DB (returns None)
mock_instance.get.return_value = None

# All directions should return True (backward compatibility: absent = supported)
assert config.is_port_mirror_capability_supported("rx") is True
assert config.is_port_mirror_capability_supported("tx") is True
assert config.is_port_mirror_capability_supported("both") is True
assert config.is_port_mirror_capability_supported(None) is True
Loading