Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions orchagent/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dist_swss_DATA = \
pfc_detect_barefoot.lua \
pfc_detect_nephos.lua \
pfc_detect_cisco-8000.lua \
pfc_detect_vs.lua \
pfc_restore.lua \
pfc_restore_cisco-8000.lua \
port_rates.lua \
Expand Down
2 changes: 1 addition & 1 deletion orchagent/orchdaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ bool OrchDaemon::init()
PFC_WD_POLL_MSECS));
}
}
else if (platform == BRCM_PLATFORM_SUBSTRING)
else if ((platform == BRCM_PLATFORM_SUBSTRING) || (platform == VS_PLATFORM_SUBSTRING))
{
static const vector<sai_port_stat_t> portStatIds =
{
Expand Down
101 changes: 101 additions & 0 deletions orchagent/pfc_detect_vs.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
-- KEYS - queue IDs
-- ARGV[1] - counters db index
-- ARGV[2] - counters table name
-- ARGV[3] - poll time interval (milliseconds)
-- return queue Ids that satisfy criteria

local counters_db = ARGV[1]
local counters_table_name = ARGV[2]
local poll_time = tonumber(ARGV[3]) * 1000

local rets = {}

redis.call('SELECT', counters_db)

-- Iterate through each queue
local n = table.getn(KEYS)
for i = n, 1, -1 do
local counter_keys = redis.call('HKEYS', counters_table_name .. ':' .. KEYS[i])
local counter_num = 0
local old_counter_num = 0
local is_deadlock = false
local pfc_wd_status = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'PFC_WD_STATUS')
local pfc_wd_action = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'PFC_WD_ACTION')
local big_red_switch_mode = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'BIG_RED_SWITCH_MODE')
if not big_red_switch_mode and (pfc_wd_status == 'operational' or pfc_wd_action == 'alert') then
local detection_time = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'PFC_WD_DETECTION_TIME')
if detection_time then
detection_time = tonumber(detection_time)
local time_left = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'PFC_WD_DETECTION_TIME_LEFT')
if not time_left then
time_left = detection_time
else
time_left = tonumber(time_left)
end

local queue_index = redis.call('HGET', 'COUNTERS_QUEUE_INDEX_MAP', KEYS[i])
local port_id = redis.call('HGET', 'COUNTERS_QUEUE_PORT_MAP', KEYS[i])

if queue_index and port_id then
local pfc_rx_pkt_key = 'SAI_PORT_STAT_PFC_' .. queue_index .. '_RX_PKTS'
local pfc_on2off_key = 'SAI_PORT_STAT_PFC_' .. queue_index .. '_ON2OFF_RX_PKTS'

-- Get all counters
local occupancy_bytes = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'SAI_QUEUE_STAT_CURR_OCCUPANCY_BYTES')
local packets = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'SAI_QUEUE_STAT_PACKETS')
local pfc_rx_packets = redis.call('HGET', counters_table_name .. ':' .. port_id, pfc_rx_pkt_key)
local pfc_on2off = redis.call('HGET', counters_table_name .. ':' .. port_id, pfc_on2off_key)
local queue_pause_status = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'SAI_QUEUE_ATTR_PAUSE_STATUS')

if occupancy_bytes and packets and pfc_rx_packets and pfc_on2off and queue_pause_status then
occupancy_bytes = tonumber(occupancy_bytes)
packets = tonumber(packets)
pfc_rx_packets = tonumber(pfc_rx_packets)
pfc_on2off = tonumber(pfc_on2off)

local packets_last = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'SAI_QUEUE_STAT_PACKETS_last')
local pfc_rx_packets_last = redis.call('HGET', counters_table_name .. ':' .. port_id, pfc_rx_pkt_key .. '_last')
local pfc_on2off_last = redis.call('HGET', counters_table_name .. ':' .. port_id, pfc_on2off_key .. '_last')
local queue_pause_status_last = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'SAI_QUEUE_ATTR_PAUSE_STATUS_last')

-- DEBUG CODE START. Uncomment to enable
local debug_storm = redis.call('HGET', counters_table_name .. ':' .. KEYS[i], 'DEBUG_STORM')
-- DEBUG CODE END.

-- If this is not a first run, then we have last values available
if packets_last and pfc_rx_packets_last and pfc_on2off_last and queue_pause_status_last then
packets_last = tonumber(packets_last)
pfc_rx_packets_last = tonumber(pfc_rx_packets_last)
pfc_on2off_last = tonumber(pfc_on2off_last)

-- Check actual condition of queue being in PFC storm
if (pfc_rx_packets - pfc_rx_packets_last > 0 and pfc_on2off - pfc_on2off_last == 0 and queue_pause_status_last == 'true' and queue_pause_status == 'true') or
(debug_storm == "enabled") then
if time_left <= poll_time then
redis.call('PUBLISH', 'PFC_WD_ACTION', '["' .. KEYS[i] .. '","storm"]')
is_deadlock = true
time_left = detection_time
else
time_left = time_left - poll_time
end
else
if pfc_wd_action == 'alert' and pfc_wd_status ~= 'operational' then
redis.call('PUBLISH', 'PFC_WD_ACTION', '["' .. KEYS[i] .. '","restore"]')
end
time_left = detection_time
end
end

-- Save values for next run
redis.call('HSET', counters_table_name .. ':' .. KEYS[i], 'SAI_QUEUE_ATTR_PAUSE_STATUS_last', queue_pause_status)
redis.call('HSET', counters_table_name .. ':' .. KEYS[i], 'SAI_QUEUE_STAT_PACKETS_last', packets)
redis.call('HSET', counters_table_name .. ':' .. KEYS[i], 'PFC_WD_DETECTION_TIME_LEFT', time_left)
redis.call('HSET', counters_table_name .. ':' .. port_id, pfc_rx_pkt_key .. '_last', pfc_rx_packets)
redis.call('HSET', counters_table_name .. ':' .. port_id, pfc_on2off_key .. '_last', pfc_on2off)
end
end
end
end
end

return rets
216 changes: 216 additions & 0 deletions tests/test_pfcwd.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,222 @@ def test_PfcWdAclCreationDeletion(self, dvs, dvs_acl, testlog):

finally:
dvs_acl.remove_acl_table(PFCWD_TABLE_NAME)


class TestPfcwdFunc(object):
@pytest.fixture
def setup_teardown_test(self, dvs):
self.get_db_handle(dvs)

self.test_ports = ["Ethernet0"]

self.setup_test(dvs)
self.get_port_oids()
self.get_queue_oids()

yield

self.teardown_test(dvs)

def setup_test(self, dvs):
# get original cable len for test ports
fvs = self.config_db.get_entry("CABLE_LENGTH", "AZURE")
self.orig_cable_len = dict()
for port in self.test_ports:
self.orig_cable_len[port] = fvs[port]
# set cable len to non zero value. if port is down, default cable len is 0
self.set_cable_len(port, "5m")
# startup port
dvs.runcmd("config interface startup {}".format(port))

# enable pfcwd
self.set_flex_counter_status("PFCWD", "enable")
# enable queue so that queue oids are generated
self.set_flex_counter_status("QUEUE", "enable")

def teardown_test(self, dvs):
# disable pfcwd
self.set_flex_counter_status("PFCWD", "disable")
# disable queue
self.set_flex_counter_status("QUEUE", "disable")

for port in self.test_ports:
if self.orig_cable_len:
self.set_cable_len(port, self.orig_cable_len[port])
# shutdown port
dvs.runcmd("config interface shutdown {}".format(port))

def get_db_handle(self, dvs):
self.app_db = dvs.get_app_db()
self.asic_db = dvs.get_asic_db()
self.config_db = dvs.get_config_db()
self.counters_db = dvs.get_counters_db()

def set_flex_counter_status(self, key, state):
fvs = {'FLEX_COUNTER_STATUS': state}
self.config_db.update_entry("FLEX_COUNTER_TABLE", key, fvs)
time.sleep(1)

def get_queue_oids(self):
self.queue_oids = self.counters_db.get_entry("COUNTERS_QUEUE_NAME_MAP", "")

def get_port_oids(self):
self.port_oids = self.counters_db.get_entry("COUNTERS_PORT_NAME_MAP", "")

def _get_bitmask(self, queues):
mask = 0
if queues is not None:
for queue in queues:
mask = mask | 1 << queue

return str(mask)

def set_ports_pfc(self, status='enable', pfc_queues=[3,4]):
for port in self.test_ports:
if 'enable' in status:
fvs = {'pfc_enable': ",".join([str(q) for q in pfc_queues])}
self.config_db.create_entry("PORT_QOS_MAP", port, fvs)
else:
self.config_db.delete_entry("PORT_QOS_MAP", port)

def set_cable_len(self, port_name, cable_len):
fvs = {port_name: cable_len}
self.config_db.update_entry("CABLE_LEN", "AZURE", fvs)

def start_pfcwd_on_ports(self, poll_interval="200", detection_time="200", restoration_time="200", action="drop"):
pfcwd_info = {"POLL_INTERVAL": poll_interval}
self.config_db.update_entry("PFC_WD", "GLOBAL", pfcwd_info)

pfcwd_info = {"action": action,
"detection_time" : detection_time,
"restoration_time": restoration_time
}
for port in self.test_ports:
self.config_db.update_entry("PFC_WD", port, pfcwd_info)

def stop_pfcwd_on_ports(self):
for port in self.test_ports:
self.config_db.delete_entry("PFC_WD", port)

def verify_ports_pfc(self, queues=None):
mask = self._get_bitmask(queues)
fvs = {"SAI_PORT_ATTR_PRIORITY_FLOW_CONTROL" : mask}
for port in self.test_ports:
self.asic_db.wait_for_field_match("ASIC_STATE:SAI_OBJECT_TYPE_PORT", self.port_oids[port], fvs)

def verify_pfcwd_state(self, queues, state="stormed"):
fvs = {"PFC_WD_STATUS": state}
for port in self.test_ports:
for queue in queues:
queue_name = port + ":" + str(queue)
self.counters_db.wait_for_field_match("COUNTERS", self.queue_oids[queue_name], fvs)

def verify_pfcwd_counters(self, queues, restore="0"):
fvs = {"PFC_WD_QUEUE_STATS_DEADLOCK_DETECTED" : "1",
"PFC_WD_QUEUE_STATS_DEADLOCK_RESTORED" : restore
}
for port in self.test_ports:
for queue in queues:
queue_name = port + ":" + str(queue)
self.counters_db.wait_for_field_match("COUNTERS", self.queue_oids[queue_name], fvs)

def reset_pfcwd_counters(self, queues):
fvs = {"PFC_WD_QUEUE_STATS_DEADLOCK_DETECTED" : "0",
"PFC_WD_QUEUE_STATS_DEADLOCK_RESTORED" : "0"
}
for port in self.test_ports:
for queue in queues:
queue_name = port + ":" + str(queue)
self.counters_db.update_entry("COUNTERS", self.queue_oids[queue_name], fvs)

def set_storm_state(self, queues, state="enabled"):
fvs = {"DEBUG_STORM": state}
for port in self.test_ports:
for queue in queues:
queue_name = port + ":" + str(queue)
self.counters_db.update_entry("COUNTERS", self.queue_oids[queue_name], fvs)

def test_pfcwd_single_queue(self, dvs, setup_teardown_test):
try:
# enable PFC on queues
test_queues = [3, 4]
self.set_ports_pfc(pfc_queues=test_queues)

# verify in asic db
self.verify_ports_pfc(test_queues)

# start pfcwd
self.start_pfcwd_on_ports()

# start pfc storm
storm_queue = [3]
self.set_storm_state(storm_queue)

# verify pfcwd is triggered
self.verify_pfcwd_state(storm_queue)

# verify pfcwd counters
self.verify_pfcwd_counters(storm_queue)

# verify if queue is disabled
self.verify_ports_pfc(queues=[4])

# stop storm
self.set_storm_state(storm_queue, state="disabled")

# verify pfcwd state is restored
self.verify_pfcwd_state(storm_queue, state="operational")

# verify pfcwd counters
self.verify_pfcwd_counters(storm_queue, restore="1")

# verify if queue is enabled
self.verify_ports_pfc(test_queues)

finally:
self.reset_pfcwd_counters(storm_queue)
self.stop_pfcwd_on_ports()

def test_pfcwd_multi_queue(self, dvs, setup_teardown_test):
try:
# enable PFC on queues
test_queues = [3, 4]
self.set_ports_pfc(pfc_queues=test_queues)

# verify in asic db
self.verify_ports_pfc(test_queues)

# start pfcwd
self.start_pfcwd_on_ports()

# start pfc storm
self.set_storm_state(test_queues)

# verify pfcwd is triggered
self.verify_pfcwd_state(test_queues)

# verify pfcwd counters
self.verify_pfcwd_counters(test_queues)

# verify if queue is disabled. Expected mask is 0
self.verify_ports_pfc()

# stop storm
self.set_storm_state(test_queues, state="disabled")

# verify pfcwd state is restored
self.verify_pfcwd_state(test_queues, state="operational")

# verify pfcwd counters
self.verify_pfcwd_counters(test_queues, restore="1")

# verify if queue is enabled
self.verify_ports_pfc(test_queues)

finally:
self.reset_pfcwd_counters(test_queues)
self.stop_pfcwd_on_ports()

#
# Add Dummy always-pass test at end as workaroud
# for issue when Flaky fail on final test it invokes module tear-down before retrying
Expand Down