diff --git a/orchagent/pfc_detect_barefoot.lua b/orchagent/pfc_detect_barefoot.lua index 892be8b5fcb..fd8304362d8 100644 --- a/orchagent/pfc_detect_barefoot.lua +++ b/orchagent/pfc_detect_barefoot.lua @@ -65,7 +65,7 @@ for i = n, 1, -1 do -- DEBUG CODE END. (occupancy_bytes == 0 and pfc_rx_packets - pfc_rx_packets_last > 0 and pfc_on2off - pfc_on2off_last == 0 and queue_pause_status_last == 'true' and queue_pause_status == 'true') then if time_left <= poll_time then - redis.call('PUBLISH', 'PFC_WD', '["' .. KEYS[i] .. '","storm"]') + redis.call('PUBLISH', 'PFC_WD_ACTION', '["' .. KEYS[i] .. '","storm"]') is_deadlock = true time_left = detection_time else @@ -73,7 +73,7 @@ for i = n, 1, -1 do end else if pfc_wd_action == 'alert' and pfc_wd_status ~= 'operational' then - redis.call('PUBLISH', 'PFC_WD', '["' .. KEYS[i] .. '","restore"]') + redis.call('PUBLISH', 'PFC_WD_ACTION', '["' .. KEYS[i] .. '","restore"]') end time_left = detection_time end diff --git a/orchagent/pfc_detect_broadcom.lua b/orchagent/pfc_detect_broadcom.lua index f75bdc6ee09..e651b537ce7 100644 --- a/orchagent/pfc_detect_broadcom.lua +++ b/orchagent/pfc_detect_broadcom.lua @@ -73,7 +73,7 @@ for i = n, 1, -1 do -- DEBUG CODE END. (occupancy_bytes == 0 and pfc_rx_packets - pfc_rx_packets_last > 0 and pfc_on2off - pfc_on2off_last == 0 and queue_pause_status_last == 'true' and queue_pause_status == 'true') then if time_left <= poll_time then - redis.call('PUBLISH', 'PFC_WD', '["' .. KEYS[i] .. '","storm"]') + redis.call('PUBLISH', 'PFC_WD_ACTION', '["' .. KEYS[i] .. '","storm"]') is_deadlock = true time_left = detection_time else @@ -81,7 +81,7 @@ for i = n, 1, -1 do end else if pfc_wd_action == 'alert' and pfc_wd_status ~= 'operational' then - redis.call('PUBLISH', 'PFC_WD', '["' .. KEYS[i] .. '","restore"]') + redis.call('PUBLISH', 'PFC_WD_ACTION', '["' .. KEYS[i] .. '","restore"]') end time_left = detection_time end diff --git a/orchagent/pfc_detect_mellanox.lua b/orchagent/pfc_detect_mellanox.lua index df6cd41dbbc..e77137b2879 100644 --- a/orchagent/pfc_detect_mellanox.lua +++ b/orchagent/pfc_detect_mellanox.lua @@ -73,7 +73,7 @@ for i = n, 1, -1 do if time_left <= poll_time then redis.call('HDEL', counters_table_name .. ':' .. port_id, pfc_rx_pkt_key .. '_last') redis.call('HDEL', counters_table_name .. ':' .. port_id, pfc_duration_key .. '_last') - redis.call('PUBLISH', 'PFC_WD', '["' .. KEYS[i] .. '","storm"]') + redis.call('PUBLISH', 'PFC_WD_ACTION', '["' .. KEYS[i] .. '","storm"]') is_deadlock = true time_left = detection_time else @@ -81,7 +81,7 @@ for i = n, 1, -1 do end else if pfc_wd_action == 'alert' and pfc_wd_status ~= 'operational' then - redis.call('PUBLISH', 'PFC_WD', '["' .. KEYS[i] .. '","restore"]') + redis.call('PUBLISH', 'PFC_WD_ACTION', '["' .. KEYS[i] .. '","restore"]') end time_left = detection_time end diff --git a/orchagent/pfc_restore.lua b/orchagent/pfc_restore.lua index 5b3e4ed0464..4cf0ab49e52 100644 --- a/orchagent/pfc_restore.lua +++ b/orchagent/pfc_restore.lua @@ -49,7 +49,7 @@ for i = n, 1, -1 do -- DEBUG CODE END. then if time_left <= poll_time then - redis.call('PUBLISH', 'PFC_WD', '["' .. KEYS[i] .. '","restore"]') + redis.call('PUBLISH', 'PFC_WD_ACTION', '["' .. KEYS[i] .. '","restore"]') time_left = restoration_time else time_left = time_left - poll_time diff --git a/orchagent/pfcwdorch.cpp b/orchagent/pfcwdorch.cpp index e0283bfeee2..9fe0daacfff 100644 --- a/orchagent/pfcwdorch.cpp +++ b/orchagent/pfcwdorch.cpp @@ -8,12 +8,15 @@ #include "select.h" #include "notifier.h" #include "redisclient.h" +#include "schema.h" +#include "subscriberstatetable.h" #define PFC_WD_GLOBAL "GLOBAL" #define PFC_WD_ACTION "action" #define PFC_WD_DETECTION_TIME "detection_time" #define PFC_WD_RESTORATION_TIME "restoration_time" #define BIG_RED_SWITCH_FIELD "BIG_RED_SWITCH" +#define PFC_WD_IN_STORM "storm" #define PFC_WD_DETECTION_TIME_MAX (5 * 1000) #define PFC_WD_DETECTION_TIME_MIN 100 @@ -55,28 +58,36 @@ void PfcWdOrch::doTask(Consumer& consumer) return; } - auto it = consumer.m_toSync.begin(); - while (it != consumer.m_toSync.end()) + if ((consumer.getDbId() == CONFIG_DB) && (consumer.getTableName() == CFG_PFC_WD_TABLE_NAME)) { - KeyOpFieldsValuesTuple t = it->second; + auto it = consumer.m_toSync.begin(); + while (it != consumer.m_toSync.end()) + { + KeyOpFieldsValuesTuple t = it->second; - string key = kfvKey(t); - string op = kfvOp(t); + string key = kfvKey(t); + string op = kfvOp(t); - if (op == SET_COMMAND) - { - createEntry(key, kfvFieldsValues(t)); - } - else if (op == DEL_COMMAND) - { - deleteEntry(key); + if (op == SET_COMMAND) + { + createEntry(key, kfvFieldsValues(t)); + } + else if (op == DEL_COMMAND) + { + deleteEntry(key); + } + else + { + SWSS_LOG_ERROR("Unknown operation type %s\n", op.c_str()); + } + + consumer.m_toSync.erase(it++); } - else + + if (consumer.m_toSync.empty()) { - SWSS_LOG_ERROR("Unknown operation type %s\n", op.c_str()); + m_entriesCreated = true; } - - consumer.m_toSync.erase(it++); } } @@ -335,8 +346,8 @@ void PfcWdSwOrch::disableBigRedSwitchMode() } auto queueId = entry.first; - RedisClient redisClient(PfcWdOrch::getCountersDb().get()); - string countersKey = COUNTERS_TABLE ":" + sai_serialize_object_id(queueId); + RedisClient redisClient(this->getCountersDb().get()); + string countersKey = this->getCountersTable()->getTableName() + this->getCountersTable()->getTableNameSeparator() + sai_serialize_object_id(queueId); redisClient.hdel(countersKey, "BIG_RED_SWITCH_MODE"); } @@ -381,7 +392,7 @@ void PfcWdSwOrch::enableBigRedSwitchMode() vector countersFieldValues; countersFieldValues.emplace_back("BIG_RED_SWITCH_MODE", "enable"); - PfcWdOrch::getCountersTable()->set(queueIdStr, countersFieldValues); + this->getCountersTable()->set(queueIdStr, countersFieldValues); } } @@ -438,7 +449,7 @@ void PfcWdSwOrch::enableBigRedSwitchMode() entry->second.portId, entry->first, entry->second.index, - PfcWdOrch::getCountersTable()); + this->getCountersTable()); entry->second.handler->initCounters(); } } @@ -497,7 +508,7 @@ void PfcWdSwOrch::registerInWdDb(const Port& port, to_string(restorationTime * 1000)); countersFieldValues.emplace_back("PFC_WD_ACTION", this->serializeAction(action)); - PfcWdOrch::getCountersTable()->set(queueIdStr, countersFieldValues); + this->getCountersTable()->set(queueIdStr, countersFieldValues); // We register our queues in PFC_WD table so that syncd will know that it must poll them vector queueFieldValues; @@ -522,7 +533,7 @@ void PfcWdSwOrch::registerInWdDb(const Port& port, // Initialize PFC WD related counters PfcWdActionHandler::initWdCounters( - PfcWdOrch::getCountersTable(), + this->getCountersTable(), sai_serialize_object_id(queueId)); } @@ -599,12 +610,9 @@ void PfcWdSwOrch::unregisterFromWdDb(const Port& po m_entryMap.erase(queueId); // Clean up - RedisClient redisClient(PfcWdOrch::getCountersDb().get()); - string countersKey = COUNTERS_TABLE ":" + sai_serialize_object_id(queueId); - redisClient.hdel(countersKey, "PFC_WD_DETECTION_TIME"); - redisClient.hdel(countersKey, "PFC_WD_RESTORATION_TIME"); - redisClient.hdel(countersKey, "PFC_WD_ACTION"); - redisClient.hdel(countersKey, "PFC_WD_STATUS"); + RedisClient redisClient(this->getCountersDb().get()); + string countersKey = this->getCountersTable()->getTableName() + this->getCountersTable()->getTableNameSeparator() + sai_serialize_object_id(queueId); + redisClient.hdel(countersKey, {"PFC_WD_DETECTION_TIME", "PFC_WD_RESTORATION_TIME", "PFC_WD_ACTION", "PFC_WD_STATUS"}); } } @@ -624,7 +632,10 @@ PfcWdSwOrch::PfcWdSwOrch( c_portStatIds(portStatIds), c_queueStatIds(queueStatIds), c_queueAttrIds(queueAttrIds), - m_pollInterval(pollInterval) + m_pollInterval(pollInterval), + m_applDb(make_shared(APPL_DB, DBConnector::DEFAULT_UNIXSOCKET, 0)), + m_applTable(make_shared(m_applDb.get(), APP_PFC_WD_TABLE_NAME "_INSTORM")), + m_applDbRedisClient(m_applDb.get()) { SWSS_LOG_ENTER(); @@ -643,12 +654,12 @@ PfcWdSwOrch::PfcWdSwOrch( { string detectLuaScript = swss::loadLuaScript(detectPluginName); detectSha = swss::loadRedisScript( - PfcWdOrch::getCountersDb().get(), + this->getCountersDb().get(), detectLuaScript); string restoreLuaScript = swss::loadLuaScript(restorePluginName); restoreSha = swss::loadRedisScript( - PfcWdOrch::getCountersDb().get(), + this->getCountersDb().get(), restoreLuaScript); vector fieldValues; @@ -663,9 +674,9 @@ PfcWdSwOrch::PfcWdSwOrch( } auto consumer = new swss::NotificationConsumer( - PfcWdSwOrch::getCountersDb().get(), - "PFC_WD"); - auto wdNotification = new Notifier(consumer, this, "PFC_WD"); + this->getCountersDb().get(), + "PFC_WD_ACTION"); + auto wdNotification = new Notifier(consumer, this, "PFC_WD_ACTION"); Orch::addExecutor(wdNotification); auto interv = timespec { .tv_sec = COUNTER_CHECK_POLL_TIMEOUT_SEC, .tv_nsec = 0 }; @@ -673,6 +684,11 @@ PfcWdSwOrch::PfcWdSwOrch( auto executor = new ExecutableTimer(timer, this, "PFC_WD_COUNTERS_POLL"); Orch::addExecutor(executor); timer->start(); + + auto ssTable = new swss::SubscriberStateTable( + m_applDb.get(), APP_PFC_WD_TABLE_NAME, TableConsumable::DEFAULT_POP_BATCH_SIZE, default_orch_pri); + auto ssConsumer = new Consumer(ssTable, this, APP_PFC_WD_TABLE_NAME); + Orch::addExecutor(ssConsumer); } template @@ -714,6 +730,84 @@ bool PfcWdSwOrch::stopWdOnPort(const Port& port) return true; } +template +void PfcWdSwOrch::doTask(Consumer& consumer) +{ + PfcWdOrch::doTask(consumer); + + if (!this->m_entriesCreated) + { + return; + } + + if ((consumer.getDbId() == APPL_DB) && (consumer.getTableName() == APP_PFC_WD_TABLE_NAME)) + { + auto it = consumer.m_toSync.begin(); + while (it != consumer.m_toSync.end()) + { + KeyOpFieldsValuesTuple &t = it->second; + + string &key = kfvKey(t); + Port port; + if (!gPortsOrch->getPort(key, port)) + { + SWSS_LOG_ERROR("Invalid port interface %s", key.c_str()); + it = consumer.m_toSync.erase(it); + continue; + } + if (port.m_type != Port::PHY) + { + SWSS_LOG_ERROR("Interface %s is not physical port", key.c_str()); + it = consumer.m_toSync.erase(it); + continue; + } + + vector &fvTuples = kfvFieldsValues(t); + for (const auto &fv : fvTuples) + { + int qIdx = -1; + string q = fvField(fv); + try + { + qIdx = stoi(q); + } + catch (const std::invalid_argument &e) + { + SWSS_LOG_ERROR("Invalid argument %s to %s()", q.c_str(), e.what()); + continue; + } + catch (const std::out_of_range &e) + { + SWSS_LOG_ERROR("Out of range argument %s to %s()", q.c_str(), e.what()); + continue; + } + + if ((qIdx < 0) || (static_cast(qIdx) >= port.m_queue_ids.size())) + { + SWSS_LOG_ERROR("Invalid queue index %d on port %s", qIdx, key.c_str()); + continue; + } + + string status = fvValue(fv); + if (status != PFC_WD_IN_STORM) + { + SWSS_LOG_ERROR("Port %s queue %s not in %s", key.c_str(), q.c_str(), PFC_WD_IN_STORM); + continue; + } + + SWSS_LOG_INFO("Port %s queue %s in status %s ", key.c_str(), q.c_str(), status.c_str()); + if (!startWdActionOnQueue(PFC_WD_IN_STORM, port.m_queue_ids[qIdx])) + { + SWSS_LOG_ERROR("Failed to start PFC watchdog %s action on port %s queue %d", PFC_WD_IN_STORM, key.c_str(), qIdx); + continue; + } + } + + it = consumer.m_toSync.erase(it); + } + } +} + template void PfcWdSwOrch::doTask(swss::NotificationConsumer& wdNotification) { @@ -728,11 +822,35 @@ void PfcWdSwOrch::doTask(swss::NotificationConsumer sai_object_id_t queueId = SAI_NULL_OBJECT_ID; sai_deserialize_object_id(queueIdStr, queueId); + if (!startWdActionOnQueue(event, queueId)) + { + SWSS_LOG_ERROR("Failed to start PFC watchdog %s event action on queue %s", event.c_str(), queueIdStr.c_str()); + } +} + +template +void PfcWdSwOrch::doTask(SelectableTimer &timer) +{ + SWSS_LOG_ENTER(); + + for (auto& handlerPair : m_entryMap) + { + if (handlerPair.second.handler != nullptr) + { + handlerPair.second.handler->commitCounters(true); + } + } + +} + +template +bool PfcWdSwOrch::startWdActionOnQueue(const string &event, sai_object_id_t queueId) +{ auto entry = m_entryMap.find(queueId); if (entry == m_entryMap.end()) { - SWSS_LOG_ERROR("Queue %s is not registered", queueIdStr.c_str()); - return; + SWSS_LOG_ERROR("Queue 0x%lx is not registered", queueId); + return false; } SWSS_LOG_NOTICE("Receive notification, %s", event.c_str()); @@ -758,8 +876,11 @@ void PfcWdSwOrch::doTask(swss::NotificationConsumer entry->second.portId, entry->first, entry->second.index, - PfcWdOrch::getCountersTable()); + this->getCountersTable()); entry->second.handler->initCounters(); + // Log storm event to APPL_DB for warm-reboot purpose + string key = m_applTable->getTableName() + m_applTable->getTableNameSeparator() + entry->second.portAlias; + m_applDbRedisClient.hset(key, to_string(entry->second.index), PFC_WD_IN_STORM); } } else if (entry->second.action == PfcWdAction::PFC_WD_ACTION_DROP) @@ -777,8 +898,11 @@ void PfcWdSwOrch::doTask(swss::NotificationConsumer entry->second.portId, entry->first, entry->second.index, - PfcWdOrch::getCountersTable()); + this->getCountersTable()); entry->second.handler->initCounters(); + // Log storm event to APPL_DB for warm-reboot purpose + string key = m_applTable->getTableName() + m_applTable->getTableNameSeparator() + entry->second.portAlias; + m_applDbRedisClient.hset(key, to_string(entry->second.index), PFC_WD_IN_STORM); } } else if (entry->second.action == PfcWdAction::PFC_WD_ACTION_FORWARD) @@ -796,13 +920,17 @@ void PfcWdSwOrch::doTask(swss::NotificationConsumer entry->second.portId, entry->first, entry->second.index, - PfcWdOrch::getCountersTable()); + this->getCountersTable()); entry->second.handler->initCounters(); + // Log storm event to APPL_DB for warm-reboot purpose + string key = m_applTable->getTableName() + m_applTable->getTableNameSeparator() + entry->second.portAlias; + m_applDbRedisClient.hset(key, to_string(entry->second.index), PFC_WD_IN_STORM); } } else { SWSS_LOG_ERROR("Unknown PFC WD action"); + return false; } } else if (event == "restore") @@ -818,27 +946,64 @@ void PfcWdSwOrch::doTask(swss::NotificationConsumer entry->second.handler->commitCounters(); entry->second.handler = nullptr; + // Remove storm status in APPL_DB for warm-reboot purpose + string key = m_applTable->getTableName() + m_applTable->getTableNameSeparator() + entry->second.portAlias; + m_applDbRedisClient.hdel(key, to_string(entry->second.index)); } } else { SWSS_LOG_ERROR("Received unknown event from plugin, %s", event.c_str()); + return false; } + + return true; } template -void PfcWdSwOrch::doTask(SelectableTimer &timer) +bool PfcWdSwOrch::bake() { - SWSS_LOG_ENTER(); - - for (auto& handlerPair : m_entryMap) - { - if (handlerPair.second.handler != nullptr) + // clean all *_last fields in COUNTERS_TABLE + // to allow warm-reboot pfc detect & restore state machine to enter the same init state as cold-reboot + RedisClient redisClient(this->getCountersDb().get()); + + vector cKeys; + this->getCountersTable()->getKeys(cKeys); + for (const auto &key : cKeys) + { + vector fvTuples; + this->getCountersTable()->get(key, fvTuples); + vector wLasts; + for (const auto &fv : fvTuples) { - handlerPair.second.handler->commitCounters(true); + if (fvField(fv).find("_last") != string::npos) + { + wLasts.push_back(fvField(fv)); + } + } + if (!wLasts.empty()) + { + redisClient.hdel( + this->getCountersTable()->getTableName() + + this->getCountersTable()->getTableNameSeparator() + + key, + wLasts); } } + Orch::bake(); + + Consumer *consumer = dynamic_cast(this->getExecutor(APP_PFC_WD_TABLE_NAME)); + if (consumer == NULL) + { + SWSS_LOG_ERROR("No consumer %s in Orch", APP_PFC_WD_TABLE_NAME); + return false; + } + + size_t refilled = consumer->refillToSync(m_applTable.get()); + SWSS_LOG_NOTICE("Add warm input PFC watchdog State: %s, %zd", APP_PFC_WD_TABLE_NAME, refilled); + + return true; } // Trick to keep member functions in a separate file diff --git a/orchagent/pfcwdorch.h b/orchagent/pfcwdorch.h index 631b3e57402..ac0f6f55a90 100644 --- a/orchagent/pfcwdorch.h +++ b/orchagent/pfcwdorch.h @@ -7,6 +7,7 @@ #include "producertable.h" #include "notificationconsumer.h" #include "timer.h" +#include "redisclient.h" extern "C" { #include "sai.h" @@ -49,6 +50,11 @@ class PfcWdOrch: public Orch virtual void createEntry(const string& key, const vector& data); void deleteEntry(const string& name); + +protected: + virtual bool startWdActionOnQueue(const string &event, sai_object_id_t queueId) = 0; + + bool m_entriesCreated = false; private: shared_ptr m_countersDb = nullptr; @@ -68,6 +74,7 @@ class PfcWdSwOrch: public PfcWdOrch int pollInterval); virtual ~PfcWdSwOrch(void); + void doTask(Consumer& consumer) override; virtual bool startWdOnPort(const Port& port, uint32_t detectionTime, uint32_t restorationTime, PfcWdAction action); virtual bool stopWdOnPort(const Port& port); @@ -75,6 +82,12 @@ class PfcWdSwOrch: public PfcWdOrch void createEntry(const string& key, const vector& data); virtual void doTask(SelectableTimer &timer); //XXX Add port/queue state change event handlers + + bool bake() override; + +protected: + bool startWdActionOnQueue(const string &event, sai_object_id_t queueId) override; + private: struct PfcWdQueueEntry { @@ -118,6 +131,12 @@ class PfcWdSwOrch: public PfcWdOrch bool m_bigRedSwitchFlag = false; int m_pollInterval; + + shared_ptr m_applDb = nullptr; + // Track queues in storm + shared_ptr
m_applTable = nullptr; + // used for hdel + RedisClient m_applDbRedisClient; }; #endif