Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fpmsyncd/fpmlink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ FpmLink::FpmLink(int port) :

memset (&addr, 0, sizeof (addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_port = htons((uint16_t)port);
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);

if (bind(m_server_socket, (struct sockaddr *)&addr, sizeof(addr)) < 0)
Expand Down
2 changes: 2 additions & 0 deletions orchagent/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ orchagent_SOURCES = \
main.cpp \
port.cpp \
orchdaemon.cpp \
orchbase.cpp \
orch.cpp \
notifications.cpp \
routeorch.cpp \
Expand All @@ -47,6 +48,7 @@ orchagent_SOURCES = \
neighorch.h \
notifications.h \
observer.h \
orchbase.h \
orch.h \
orchdaemon.h \
pfcactionhandler.h \
Expand Down
1 change: 0 additions & 1 deletion orchagent/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ sai_object_id_t gUnderlayIfId;
sai_object_id_t gSwitchId = SAI_NULL_OBJECT_ID;
MacAddress gMacAddress;

#define DEFAULT_BATCH_SIZE 128
int gBatchSize = DEFAULT_BATCH_SIZE;

bool gSairedisRecord = true;
Expand Down
124 changes: 11 additions & 113 deletions orchagent/orch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#include <sys/time.h>

#include "orch.h"

#include "subscriberstatetable.h"
#include "portsorch.h"
#include "tokenize.h"
Expand All @@ -25,128 +24,33 @@ extern string gRecordFile;
extern string getTimestamp();

Orch::Orch(DBConnector *db, string tableName)
:OrchBase(db, tableName)
{
addConsumer(db, tableName);
}

Orch::Orch(DBConnector *db, vector<string> &tableNames)
:OrchBase(db, tableNames)
{
for(auto it : tableNames)
{
addConsumer(db, it);
}
}

Orch::Orch(const vector<TableConnector>& tables)
:OrchBase(tables)
{
for (auto it : tables)
{
addConsumer(it.first, it.second);
}
}

Orch::~Orch()
{
for(auto &it : m_consumerMap)
delete it.second.m_consumer;

if (gRecordOfs.is_open())
{
gRecordOfs.close();
}
}

vector<Selectable *> Orch::getSelectables()
{
vector<Selectable *> selectables;
for(auto it : m_consumerMap) {
selectables.push_back(it.second.m_consumer);
}
return selectables;
}

bool Orch::hasSelectable(TableConsumable *selectable) const
{
for(auto it : m_consumerMap) {
if (it.second.m_consumer == selectable) {
return true;
}
}
return false;
}

bool Orch::execute(string tableName)
{
SWSS_LOG_ENTER();

lock_guard<mutex> lock(gDbMutex);
return OrchBase::execute(tableName);

auto consumer_it = m_consumerMap.find(tableName);
if (consumer_it == m_consumerMap.end())
{
SWSS_LOG_ERROR("Unrecognized tableName:%s\n", tableName.c_str());
return false;
}
Consumer& consumer = consumer_it->second;

std::deque<KeyOpFieldsValuesTuple> entries;
consumer.m_consumer->pops(entries);

/* Nothing popped */
if (entries.empty())
{
return true;
}

for (auto entry: entries)
{
string key = kfvKey(entry);
string op = kfvOp(entry);

/* Record incoming tasks */
if (gSwssRecord)
{
recordTuple(consumer, entry);
}

/* If a new task comes or if a DEL task comes, we directly put it into consumer.m_toSync map */
if (consumer.m_toSync.find(key) == consumer.m_toSync.end() || op == DEL_COMMAND)
{
consumer.m_toSync[key] = entry;
}
/* If an old task is still there, we combine the old task with new task */
else
{
KeyOpFieldsValuesTuple existing_data = consumer.m_toSync[key];

auto new_values = kfvFieldsValues(entry);
auto existing_values = kfvFieldsValues(existing_data);


for (auto it : new_values)
{
string field = fvField(it);
string value = fvValue(it);

auto iu = existing_values.begin();
while (iu != existing_values.end())
{
string ofield = fvField(*iu);
if (field == ofield)
iu = existing_values.erase(iu);
else
iu++;
}
existing_values.push_back(FieldValueTuple(field, value));
}
consumer.m_toSync[key] = KeyOpFieldsValuesTuple(key, op, existing_values);
}
}

if (!consumer.m_toSync.empty())
doTask(consumer);

return true;
}

/*
Expand Down Expand Up @@ -236,11 +140,7 @@ void Orch::doTask()
if (!gPortsOrch->isInitDone())
return;

for(auto &it : m_consumerMap)
{
if (!it.second.m_toSync.empty())
doTask(it.second);
}
OrchBase::doTask();
}

void Orch::logfileReopen()
Expand All @@ -264,6 +164,11 @@ void Orch::logfileReopen()

void Orch::recordTuple(Consumer &consumer, KeyOpFieldsValuesTuple &tuple)
{
if (!gSwssRecord)
{
return;
}

string s = consumer.m_consumer->getTableName() + ":" + kfvKey(tuple)
+ "|" + kfvOp(tuple);
for (auto i = kfvFieldsValues(tuple).begin(); i != kfvFieldsValues(tuple).end(); i++)
Expand Down Expand Up @@ -364,12 +269,5 @@ bool Orch::parseIndexRange(const string &input, sai_uint32_t &range_low, sai_uin

void Orch::addConsumer(DBConnector *db, string tableName)
{
if (db->getDB() == CONFIG_DB)
{
Consumer consumer(new SubscriberStateTable(db, tableName));
m_consumerMap.insert(ConsumerMapPair(tableName, consumer));
} else {
Consumer consumer(new ConsumerStateTable(db, tableName, gBatchSize));
m_consumerMap.insert(ConsumerMapPair(tableName, consumer));
}
OrchBase::addConsumer(db, tableName, gBatchSize);
}
27 changes: 2 additions & 25 deletions orchagent/orch.h
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
#ifndef SWSS_ORCH_H
#define SWSS_ORCH_H

#include <map>
#include <memory>

extern "C" {
#include "sai.h"
#include "saistatus.h"
}

#include "dbconnector.h"
#include "table.h"
#include "consumertable.h"
#include "consumerstatetable.h"
#include "orchbase.h"

using namespace std;
using namespace swss;
Expand Down Expand Up @@ -42,16 +36,6 @@ typedef pair<string, sai_object_id_t> object_map_pair;
typedef map<string, object_map*> type_map;
typedef pair<string, object_map*> type_map_pair;

typedef map<string, KeyOpFieldsValuesTuple> SyncMap;
struct Consumer {
Consumer(TableConsumable* consumer) : m_consumer(consumer) { }
TableConsumable* m_consumer;
/* Store the latest 'golden' status */
SyncMap m_toSync;
};
typedef pair<string, Consumer> ConsumerMapPair;
typedef map<string, Consumer> ConsumerMap;

typedef enum
{
success,
Expand All @@ -61,20 +45,13 @@ typedef enum
failure
} ref_resolve_status;

typedef pair<DBConnector *, string> TableConnector;
typedef pair<DBConnector *, vector<string>> TablesConnector;

class Orch
class Orch : public OrchBase
{
public:
Orch(DBConnector *db, string tableName);
Orch(DBConnector *db, vector<string> &tableNames);
Orch(const vector<TableConnector>& tables);
virtual ~Orch();

vector<Selectable*> getSelectables();
bool hasSelectable(TableConsumable* s) const;

bool execute(string tableName);
/* Iterate all consumers in m_consumerMap and run doTask(Consumer) */
void doTask();
Expand Down
Loading