Skip to content
214 changes: 214 additions & 0 deletions tests/common/snappi_tests/snappi_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -1357,3 +1357,217 @@ def static_routes_cisco_8000(addr, dut=None, intf=None, namespace=None, setup=Tr
return DEST_TO_GATEWAY_MAP[addr]['dest']
else:
del DEST_TO_GATEWAY_MAP[addr]


@pytest.fixture(scope="module")
def snappi_port_selection(get_snappi_ports, number_of_tx_rx_ports, mixed_speed=None):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to support port combination of downstream LC -> upstream LC, or upstream -> downstream in multi LC scenario.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sdszhang :
'I think we need to support port combination of downstream LC -> upstream LC, or upstream -> downstream in multi LC scenario.'
Are the downstream and upstream LC - T0/T1 pizza boxes?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to simulate traffic from RNG -> [T2 upstream LC -> T2 downstram LC ] -> T1, and vice versa.

'''
Dynamic selection of the DUT ports for the test.
Selects ports for three test combinations:
- Single line-card single asic
- Single line-card multiple asic
- Multiple line-card.
Args:
get_snappi_ports(fixture): returns list of the ports available in test.
number_of_tx_rx_ports(fixture): count of tx and rx ports available from the test.
Returns:
snappi_ports(dict): Dictionary with interface-speed and line-card-combo being primary keys.
Example: {'100':{'single-linecard-single-asic':{ports}, 'single-linecard-multiple-asic':{ports}}}

'''
tx_port_count, rx_port_count = number_of_tx_rx_ports
tmp_snappi_port_list = get_snappi_ports

if (not mixed_speed):
# Creating list of all interface speeds from selected ports.
port_speed_list = []
for item in tmp_snappi_port_list:
if (int(item['speed'])/1000) not in port_speed_list:
port_speed_list.append(int(item['speed'])/1000)

port_list = {}
# Repeating loop for speed_types
for port_speed in port_speed_list:
new_list = []
# Selecting ports matching the port_speed
for item in tmp_snappi_port_list:
if (int(item['speed']) == (port_speed * 1000)):
new_list.append(item)

# Creating dictionary f{hostname}{asic_val}
# f[hostname]['asic'] should contain associated elements.
f = {}
for item in new_list:
hostname = item['peer_device']
asic = item['asic_value']
if hostname not in f:
f[hostname] = {}
if asic not in f[hostname]:
f[hostname][asic] = []
f[hostname][asic].append(item)

total_ports = tx_port_count + rx_port_count

# Initializing dictionary port_list{speed}{line-card-asic-combo}
# example port_list['100']['single_linecard_single_asic']

# for 'single-linecard-single-asic'
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @amitpawar12

This code works for me. Just a few improvement suggestions which could help improving the readability.

I notice that you're separating into 3 cases:

  1. for 'single-linecard-single-asic'
  2. for 'single_linecard_multiple_asic'
  3. for 'multiple linecard, multiple ASIC'

Could we have some if statement to separate these logic instead of letting the code goes through all? It will be easier to debug if we need to modify later.

A suggestion could be:

is_single_linecard_single_asic = len(f) == 1 and len(f[0].keys()) == 1
if (is_single_linecard_single_asic):
   # logic for single_linecard_single_asic
   
is_single_linecard_multi_asic = len(f) == 1 and len(f[0].keys()) >= 1
 
if (is_single_linecard_multi_asic):
   # logic for is_single_linecard_multi_asic
   

cc @sdszhang

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I think we should pass a parameter called "test_subtype" which is default = ALL, in case user wants just to pull a specific subtype ONLY instead of all the test_subtypes.

I am keeping this as enhancement of functionality.

for device, asic in f.items():
for asic_val in asic.keys():
if len(f[device][asic_val]) >= (total_ports):
if port_speed not in port_list:
port_list[port_speed] = {}
if 'single_linecard_single_asic' not in port_list[port_speed]:
port_list[port_speed]['single_linecard_single_asic'] = []
if len(port_list[port_speed]['single_linecard_single_asic']) == total_ports:
break
else:
port_list[port_speed]['single_linecard_single_asic'] = f[device][asic_val][0:total_ports]

# for 'single_linecard_multiple_asic'
egress_done = False
ingress_done = False
tmp_ing_list = []
for device, asic in f.items():
# Execute ONLY if the number of asics is more than one.
if len(asic.keys()) < 2:
continue
else:
for asic_val in asic.keys():
asic_port_len = len(f[device][asic_val])
if ((asic_port_len >= tx_port_count) or (asic_port_len >= rx_port_count)):
# Initializing the dictionary
if port_speed not in port_list:
port_list[port_speed] = {}
if 'single_linecard_multiple_asic' not in port_list[port_speed]:
port_list[port_speed]['single_linecard_multiple_asic'] = []

# If the dictionary is complete, no need to add further ports.
if len(port_list[port_speed]['single_linecard_multiple_asic']) == total_ports:
break

# Accomodating ingress ports first if more ports are available.
if ((asic_port_len - tx_port_count) > (asic_port_len - rx_port_count)
and not ingress_done
and not tmp_ing_list
and (asic_port_len >= rx_port_count)):
tmp_ing_list = f[device][asic_val][0:rx_port_count]
ingress_done = True
elif (not egress_done and (asic_port_len >= tx_port_count)):
tx_list = f[device][asic_val][0:tx_port_count]
port_list[port_speed]['single_linecard_multiple_asic'] = tx_list
egress_done = True
tmp_len = len(port_list[port_speed]['single_linecard_multiple_asic'])
if (tmp_ing_list
and (tmp_len < total_ports)):
port_list[port_speed]['single_linecard_multiple_asic'].append(tmp_ing_list)
elif (not ingress_done and (asic_port_len >= rx_port_count)):
rx_list = f[device][asic_val][0:rx_port_count]
port_list[port_speed]['single_linecard_multiple_asic'].append(rx_list)
tmp_ing_list = f[device][asic_val][0:rx_port_count]
ingress_done = True

if (ingress_done
and egress_done
and (len(flatten_list(port_list[port_speed]['single_linecard_multiple_asic'])) < total_ports)):
port_list[port_speed]['single_linecard_multiple_asic'].append(tmp_ing_list)

# Flatten the dictionary if the dictionary is created.
if (port_speed in port_list) and ('single_linecard_multiple_asic' in port_list[port_speed]):
port_list[port_speed]['single_linecard_multiple_asic'] = flatten_list(
port_list[port_speed]['single_linecard_multiple_asic'])
# If egress or ingress ports are not found, delete the dictionary key-value.
if (not egress_done or not ingress_done):
del port_list[port_speed]['single_linecard_multiple_asic']

# for 'multiple linecard, multiple ASIC'
egress_done = False
ingress_done = False
tmp_ing_list = []

for device, asic in f.items():
# Creating list for a given device for all ASIC combinations.
all_asic_ports = []
for asic_val in asic.keys():
all_asic_ports.append(f[device][asic_val])
all_asic_ports = flatten_list(all_asic_ports)

# Initializing the dictionary, if it does not exist.
if port_speed not in port_list:
port_list[port_speed] = {}
if 'multiple_linecard_multiple_asic' not in port_list[port_speed]:
port_list[port_speed]['multiple_linecard_multiple_asic'] = []

asic_port_len = len(all_asic_ports)
if ((asic_port_len - tx_port_count) > (asic_port_len - rx_port_count)
and not ingress_done
and not tmp_ing_list
and (asic_port_len >= rx_port_count)):
tmp_ing_list = all_asic_ports[0:rx_port_count]
ingress_done = True
# Identifying egress ports first
elif (len(port_list[port_speed]['multiple_linecard_multiple_asic']) <= tx_port_count
and not egress_done and len(all_asic_ports) >= tx_port_count):
port_list[port_speed]['multiple_linecard_multiple_asic'].append(all_asic_ports[0:tx_port_count])
# egress ports identified, move to next device.
# No need to select egress ports now.
egress_done = True
continue
# Identifying ingress ports
elif (len(port_list[port_speed]['multiple_linecard_multiple_asic']) <= rx_port_count
and not ingress_done and len(all_asic_ports) >= rx_port_count):
port_list[port_speed]['multiple_linecard_multiple_asic'].append(all_asic_ports[0:rx_port_count])
# ingress ports identified, move to next device.
# No need to select ingress ports now.
ingress_done = True
continue

if (ingress_done
and egress_done
and (len(port_list[port_speed]['multiple_linecard_multiple_asic']) < total_ports)):
port_list[port_speed]['multiple_linecard_multiple_asic'].append(tmp_ing_list)

# Flatten the dictionary, if the dictionary is created.
if (port_speed in port_list) and ('multiple_linecard_multiple_asic' in port_list[port_speed]):
# Flattening the list.
port_list[port_speed]['multiple_linecard_multiple_asic'] = flatten_list(
port_list[port_speed]['multiple_linecard_multiple_asic'])

# If the dictionary does not select either ingress or egress ports, then dictionary is deleted.
if (not egress_done or not ingress_done):
del port_list[port_speed]['multiple_linecard_multiple_asic']

pytest_assert(port_list is not None, 'snappi ports are not available for required Rx and Tx port counts')
return port_list


@pytest.fixture(scope="function")
def tgen_port_info(request, snappi_port_selection):
flatten_skeleton_parameter = request.param
speed, category = flatten_skeleton_parameter.split("-")
if float(speed) not in snappi_port_selection or category not in snappi_port_selection[float(speed)]:
pytest.skip(f"Unsupported combination for {flatten_skeleton_parameter}")

result = snappi_port_selection[float(speed)][category]

if not result:
pytest.skip(f"Unsupported combination for {flatten_skeleton_parameter}")

return result


def flatten_list(lst):
'''
Function to flatten the list
Args:
lst(list): list that needs to be flattened
Retuns:
flattened(list): flattened list
'''
flattened = []
for item in lst:
if isinstance(item, list):
flattened.extend(flatten_list(item))
else:
flattened.append(item)
return flattened
83 changes: 83 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import getpass
import random
from concurrent.futures import as_completed
import re

import pytest
import yaml
Expand Down Expand Up @@ -1449,6 +1450,29 @@ def get_testbed_metadata(request):
return metadata.get(tbname)


def get_snappi_testbed_metadata(request):
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@auspham - I am going to remove this function from conftest because there is already check (and code) in test_pretest.py to create metadata/snappi_tests/.json file.

please let me know if this is ok.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to rather read this metadata/snappi_tests/testbed.json. Are you planning to integrate with an already existing function from conftest? @amitpawar12

"""
Get the metadata for the testbed name. Return None if tbname is
not provided, or metadata file not found or metadata does not
contain tbname
"""
tbname = request.config.getoption("--testbed")
if not tbname:
return None

folder = 'metadata/snappi_tests'
filepath = os.path.join(folder, tbname + '.json')
metadata = None

try:
with open(filepath, 'r') as yf:
metadata = json.load(yf)
except IOError:
return None

return metadata.get(tbname)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return metadata.get(tbname)
return metadata.get(tbname) or []

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or change here



def generate_port_lists(request, port_scope, with_completeness_level=False):
empty = [encode_dut_port_name('unknown', 'unknown')]
if 'ports' in port_scope:
Expand Down Expand Up @@ -1866,6 +1890,9 @@ def format_portautoneg_test_id(param):
else:
metafunc.parametrize('topo_scenario', ['default'], scope='module')

if 'tgen_port_info' in metafunc.fixturenames:
metafunc.parametrize('tgen_port_info', generate_skeleton_port_info(metafunc), indirect=True)

if 'vlan_name' in metafunc.fixturenames:
if tbinfo['topo']['type'] == 'm0' and 'topo_scenario' in metafunc.fixturenames:
if tbinfo['topo']['name'] == 'm0-2vlan':
Expand Down Expand Up @@ -1907,6 +1934,62 @@ def format_portautoneg_test_id(param):
metafunc.parametrize('vlan_name', ['no_vlan'], scope='module')


def generate_skeleton_port_info(request):
"""
Return minimal port_info parameters to populate later in the format of <speed>-<category>. i.e

["400.0-single_linecard_single_asic", "400.0-multiple_linecard_multiple_asic",...]
"""
dut_info = get_snappi_testbed_metadata(request) or []
available_interfaces = {}
matrix = {}
for index, linecard in enumerate(dut_info):
interface_to_asic = {}
for asic in dut_info[linecard]["asic_to_interface"]:
for interface in dut_info[linecard]["asic_to_interface"][asic]:
interface_to_asic[interface] = asic

available_interfaces[linecard] = [dut_info[linecard]['intf_status'][interface]
for interface in dut_info[linecard]['intf_status']
if dut_info[linecard]['intf_status'][interface]["admin_state"] == "up"]

for interface in available_interfaces[linecard]:
for key, value in dut_info[linecard]["asic_to_interface"].items():
if interface['name'] in value:
interface['asic'] = key

for interface in available_interfaces[linecard]:
speed = float(re.match(r"([\d.]+)", interface['speed']).group(0))
asic = interface['asic']
if (speed not in matrix):
matrix[speed] = {}
if (linecard not in matrix[speed]):
matrix[speed][linecard] = {}
if (asic not in matrix[speed][linecard]):
matrix[speed][linecard][asic] = 1
else:
matrix[speed][linecard][asic] += 1

def build_params(speed, category):
return f"{speed}-{category}"

flattened_list = set()

for speed, linecards in matrix.items():
if len(linecards) >= 2:
flattened_list.add(build_params(speed, 'multiple_linecard_multiple_asic'))

for linecard, asic_list in linecards.items():
if len(asic_list) >= 2:
flattened_list.add(build_params(speed, 'single_linecard_multiple_asic'))

for asics, port_count in asic_list.items():
if int(port_count) >= 2:
flattened_list.add(build_params(speed, 'single_linecard_single_asic'))

return list(flattened_list)


def get_autoneg_tests_data():
folder = 'metadata'
filepath = os.path.join(folder, 'autoneg-test-params.json')
Expand Down
Loading
Loading