Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 78 additions & 102 deletions syncd/syncd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2535,59 +2535,102 @@ sai_status_t processEvent(
return status;
}

void processFlexCounterEvent(
_In_ swss::ConsumerStateTable &consumer)
void processFlexCounterGroupEvent(
_In_ swss::ConsumerTable &consumer)
{
SWSS_LOG_ENTER();

std::lock_guard<std::mutex> lock(g_mutex);

swss::KeyOpFieldsValuesTuple kco;
consumer.pop(kco);

const auto &key = kfvKey(kco);
const auto &groupName = kfvKey(kco);
const auto &op = kfvOp(kco);
const auto values = kfvFieldsValues(kco);

std::size_t delimiter = key.find_last_of(":");

if (delimiter == std::string::npos)
if (op == DEL_COMMAND)
{
SWSS_LOG_ERROR("Failed to parse the key %s", key.c_str());
FlexCounter::removeCounterPlugin(groupName);
return;
}

const auto intervalStr = key.substr(delimiter + 1);
const auto vidStr = key.substr(0, delimiter);
int pollInterval;

try
for (const auto& valuePair : values)
{
pollInterval = std::stoi(intervalStr);
const auto field = fvField(valuePair);
const auto value = fvValue(valuePair);

if (op == SET_COMMAND)
{
if (field == POLL_INTERVAL_FIELD)
{
FlexCounter::setPollInterval(stoi(value), groupName);
}
else if (field == QUEUE_PLUGIN_FIELD)
{
auto shaStrings = swss::tokenize(value, ',');
for (const auto &sha : shaStrings)
{
FlexCounter::addQueueCounterPlugin(sha, groupName);
}
}
else if (field == PORT_PLUGIN_FIELD)
{
auto shaStrings = swss::tokenize(value, ',');
for (const auto &sha : shaStrings)
{
FlexCounter::addPortCounterPlugin(sha, groupName);
}
}
else
{
SWSS_LOG_ERROR("Field is not supported %s", field.c_str());
}
}
}
catch(const std::invalid_argument)
}

void processFlexCounterEvent(
_In_ swss::ConsumerTable &consumer)
{
SWSS_LOG_ENTER();

std::lock_guard<std::mutex> lock(g_mutex);

swss::KeyOpFieldsValuesTuple kco;
consumer.pop(kco);

const auto &key = kfvKey(kco);
const auto &op = kfvOp(kco);

std::size_t delimiter = key.find_first_of(":");
if (delimiter == std::string::npos)
{
SWSS_LOG_ERROR("Failed to convert the poll intervall, key = %s", key.c_str());
SWSS_LOG_ERROR("Failed to parse the key %s", key.c_str());
return;
}

const auto groupName = key.substr(0, delimiter);
const auto vidStr = key.substr(delimiter+1);

sai_object_id_t vid = SAI_NULL_OBJECT_ID;
sai_deserialize_object_id(vidStr, vid);
sai_object_id_t rid = translate_vid_to_rid(vid);
sai_object_type_t objectType = sai_object_type_query(rid);
std::string objectTypeStr = sai_serialize_object_type(objectType);

if (op == DEL_COMMAND)
{
if (objectType == SAI_OBJECT_TYPE_PORT)
{
FlexCounter::removePort(vid, pollInterval);
FlexCounter::removePort(vid, groupName);
}
else if (objectType == SAI_OBJECT_TYPE_QUEUE)
{
FlexCounter::removeQueue(vid, pollInterval);
FlexCounter::removeQueue(vid, groupName);
}
else
{
SWSS_LOG_ERROR("Object type for removal not supported");
SWSS_LOG_ERROR("Object type for removal not supported, %s", objectTypeStr.c_str());
}
}

Expand All @@ -2601,7 +2644,7 @@ void processFlexCounterEvent(
{
auto idStrings = swss::tokenize(value, ',');

if (objectType == SAI_OBJECT_TYPE_PORT && field == PFC_WD_PORT_COUNTER_ID_LIST)
if (objectType == SAI_OBJECT_TYPE_PORT && field == PORT_COUNTER_ID_LIST)
{
std::vector<sai_port_stat_t> portCounterIds;
for (const auto &str : idStrings)
Expand All @@ -2610,9 +2653,9 @@ void processFlexCounterEvent(
sai_deserialize_port_stat(str, stat);
portCounterIds.push_back(stat);
}
FlexCounter::setPortCounterList(vid, rid, pollInterval, portCounterIds);
FlexCounter::setPortCounterList(vid, rid, groupName, portCounterIds);
}
else if (objectType == SAI_OBJECT_TYPE_QUEUE && field == PFC_WD_QUEUE_COUNTER_ID_LIST)
else if (objectType == SAI_OBJECT_TYPE_QUEUE && field == QUEUE_COUNTER_ID_LIST)
{
std::vector<sai_queue_stat_t> queueCounterIds;
for (const auto &str : idStrings)
Expand All @@ -2621,9 +2664,9 @@ void processFlexCounterEvent(
sai_deserialize_queue_stat(str, stat);
queueCounterIds.push_back(stat);
}
FlexCounter::setQueueCounterList(vid, rid, pollInterval, queueCounterIds);
FlexCounter::setQueueCounterList(vid, rid, groupName, queueCounterIds);
}
else if (objectType == SAI_OBJECT_TYPE_QUEUE && field == PFC_WD_QUEUE_ATTR_ID_LIST)
else if (objectType == SAI_OBJECT_TYPE_QUEUE && field == QUEUE_ATTR_ID_LIST)
{
std::vector<sai_queue_attr_t> queueAttrIds;
for (const auto &str : idStrings)
Expand All @@ -2633,83 +2676,16 @@ void processFlexCounterEvent(
queueAttrIds.push_back(attr);
}

FlexCounter::setQueueAttrList(vid, rid, pollInterval, queueAttrIds);
FlexCounter::setQueueAttrList(vid, rid, groupName, queueAttrIds);
}
else
{
SWSS_LOG_ERROR("Object type not supported");
SWSS_LOG_ERROR("Object type and field combination is not supported, object type %s, field %s", objectTypeStr.c_str(), field.c_str());
}
}
}
}

void processFlexCounterPluginEvent(
_In_ swss::ConsumerStateTable &consumer)
{
SWSS_LOG_ENTER();

std::lock_guard<std::mutex> lock(g_mutex);

swss::KeyOpFieldsValuesTuple kco;
consumer.pop(kco);

const auto &key = kfvKey(kco);
const auto &op = kfvOp(kco);

std::size_t delimiter = key.find_last_of(":");

if (delimiter == std::string::npos)
{
SWSS_LOG_ERROR("Failed to parse the key %s", key.c_str());
return;
}

const auto intervalStr = key.substr(delimiter + 1);
const auto sha = key.substr(0, delimiter);
int pollInterval;

try
{
pollInterval = std::stoi(intervalStr);
}
catch(const std::invalid_argument)
{
SWSS_LOG_ERROR("Failed to convert the poll intervall, key = %s", key.c_str());
return;
}

if (op == DEL_COMMAND)
{
FlexCounter::removeCounterPlugin(sha, pollInterval);
return;
}

const auto values = kfvFieldsValues(kco);
for (const auto& valuePair : values)
{
const auto field = fvField(valuePair);
const auto value = fvValue(valuePair);

if (field != SAI_OBJECT_TYPE)
{
continue;
}

if (value == sai_serialize_object_type(SAI_OBJECT_TYPE_PORT))
{
FlexCounter::addPortCounterPlugin(sha, pollInterval);
}
else if (value == sai_serialize_object_type(SAI_OBJECT_TYPE_QUEUE))
{
FlexCounter::addQueueCounterPlugin(sha, pollInterval);
}
else
{
SWSS_LOG_ERROR("Plugin for %s is not supported", value.c_str());
}
}
}

void printUsage()
{
SWSS_LOG_ENTER();
Expand Down Expand Up @@ -3292,14 +3268,14 @@ int syncd_main(int argc, char **argv)

std::shared_ptr<swss::DBConnector> dbAsic = std::make_shared<swss::DBConnector>(ASIC_DB, swss::DBConnector::DEFAULT_UNIXSOCKET, 0);
std::shared_ptr<swss::DBConnector> dbNtf = std::make_shared<swss::DBConnector>(ASIC_DB, swss::DBConnector::DEFAULT_UNIXSOCKET, 0);
std::shared_ptr<swss::DBConnector> dbFlexCounter = std::make_shared<swss::DBConnector>(PFC_WD_DB, swss::DBConnector::DEFAULT_UNIXSOCKET, 0);
std::shared_ptr<swss::DBConnector> dbFlexCounter = std::make_shared<swss::DBConnector>(FLEX_COUNTER_DB, swss::DBConnector::DEFAULT_UNIXSOCKET, 0);

g_redisClient = std::make_shared<swss::RedisClient>(dbAsic.get());

std::shared_ptr<swss::ConsumerTable> asicState = std::make_shared<swss::ConsumerTable>(dbAsic.get(), ASIC_STATE_TABLE);
std::shared_ptr<swss::NotificationConsumer> restartQuery = std::make_shared<swss::NotificationConsumer>(dbAsic.get(), "RESTARTQUERY");
std::shared_ptr<swss::ConsumerStateTable> flexCounterState = std::make_shared<swss::ConsumerStateTable>(dbFlexCounter.get(), PFC_WD_STATE_TABLE);
std::shared_ptr<swss::ConsumerStateTable> flexCounterPlugin = std::make_shared<swss::ConsumerStateTable>(dbFlexCounter.get(), PLUGIN_TABLE);
std::shared_ptr<swss::ConsumerTable> flexCounter = std::make_shared<swss::ConsumerTable>(dbFlexCounter.get(), FLEX_COUNTER_TABLE);
std::shared_ptr<swss::ConsumerTable> flexCounterGroup = std::make_shared<swss::ConsumerTable>(dbFlexCounter.get(), FLEX_COUNTER_GROUP_TABLE);

/*
* At the end we cant use producer consumer concept since if one proces
Expand Down Expand Up @@ -3388,8 +3364,8 @@ int syncd_main(int argc, char **argv)

s.addSelectable(asicState.get());
s.addSelectable(restartQuery.get());
s.addSelectable(flexCounterState.get());
s.addSelectable(flexCounterPlugin.get());
s.addSelectable(flexCounter.get());
s.addSelectable(flexCounterGroup.get());

SWSS_LOG_NOTICE("starting main loop");

Expand All @@ -3414,13 +3390,13 @@ int syncd_main(int argc, char **argv)
warmRestartHint = handleRestartQuery(*restartQuery);
break;
}
else if (sel == flexCounterState.get())
else if (sel == flexCounter.get())
{
processFlexCounterEvent(*(swss::ConsumerStateTable*)sel);
processFlexCounterEvent(*(swss::ConsumerTable*)sel);
}
else if (sel == flexCounterPlugin.get())
else if (sel == flexCounterGroup.get())
{
processFlexCounterPluginEvent(*(swss::ConsumerStateTable*)sel);
processFlexCounterGroupEvent(*(swss::ConsumerTable*)sel);
}
else if (result == swss::Select::OBJECT)
{
Expand Down
Loading