Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 58 additions & 4 deletions fpmsyncd/fpmsyncd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,34 @@
#include "subscriberstatetable.h"
#include "warmRestartHelper.h"
#include "fpmsyncd/fpmlink.h"
#include "fpmsyncd/fpmsyncd.h"
#include "fpmsyncd/routesync.h"

#include <netlink/route/route.h>

using namespace std;
using namespace swss;

// gSelectTimeout specifies the maximum wait time in milliseconds (-1 == infinite)
static int gSelectTimeout;
#define INFINITE -1
#define FLUSH_TIMEOUT 500 // 500 milliseconds
static int gFlushTimeout = FLUSH_TIMEOUT;
// consider the traffic is small if pipeline contains < 500 entries
#define SMALL_TRAFFIC 500

/**
* @brief fpmsyncd invokes redispipeline's flush with a timer
*
* redispipeline would automatically flush itself when full,
* but fpmsyncd can invoke pipeline's flush even if it's not full yet.
*
* By setting gSelectTimeout, fpmsyncd controls the flush interval.
*
* @param pipeline reference to the pipeline to be flushed
*/
void flushPipeline(RedisPipeline& pipeline);

/*
* Default warm-restart timer interval for routing-stack app. To be used only if
* no explicit value has been defined in configuration.
Expand Down Expand Up @@ -61,7 +82,7 @@ int main(int argc, char **argv)
DBConnector applStateDb("APPL_STATE_DB", 0);
std::unique_ptr<NotificationConsumer> routeResponseChannel;

RedisPipeline pipeline(&db);
RedisPipeline pipeline(&db, ROUTE_SYNC_PPL_SIZE);
RouteSync sync(&pipeline);

DBConnector stateDb("STATE_DB", 0);
Expand Down Expand Up @@ -152,12 +173,14 @@ int main(int argc, char **argv)
sync.m_warmStartHelper.setState(WarmStart::WSDISABLED);
}

gSelectTimeout = INFINITE;

while (true)
{
Selectable *temps;

/* Reading FPM messages forever (and calling "readMe" to read them) */
s.select(&temps);
s.select(&temps, gSelectTimeout);

/*
* Upon expiration of the warm-restart timer or eoiu Hold Timer, proceed to run the
Expand Down Expand Up @@ -286,8 +309,7 @@ int main(int argc, char **argv)
}
else if (!warmStartEnabled || sync.m_warmStartHelper.isReconciled())
{
pipeline.flush();
SWSS_LOG_DEBUG("Pipeline flushed");
flushPipeline(pipeline);
}
}
}
Expand All @@ -304,3 +326,35 @@ int main(int argc, char **argv)

return 1;
}

void flushPipeline(RedisPipeline& pipeline) {

size_t remaining = pipeline.size();

if (remaining == 0) {
gSelectTimeout = INFINITE;
return;
}

int idle = pipeline.getIdleTime();

// flush the pipeline if
// 1. traffic is not scaled (only prevent fpmsyncd from flushing ppl too frequently in the scaled case)
// 2. the idle time since last flush has exceeded gFlushTimeout
// 3. idle <= 0, due to system clock drift, should not happen since we already use steady_clock for timing
if (remaining < SMALL_TRAFFIC || idle >= gFlushTimeout || idle <= 0) {

pipeline.flush();

gSelectTimeout = INFINITE;

SWSS_LOG_DEBUG("Pipeline flushed");
}
else
{
// skip flushing ppl and set the timeout of fpmsyncd select function to be (gFlushTimeout - idle)
// so that fpmsyncd select function would block at most for (gFlushTimeout - idle)
// by doing this, we make sure every entry eventually gets flushed
gSelectTimeout = gFlushTimeout - idle;
}
}
8 changes: 8 additions & 0 deletions fpmsyncd/fpmsyncd.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#ifndef __FPMSYNCD__
#define __FPMSYNCD__


// redispipeline has a maximum capacity of 50000 entries
#define ROUTE_SYNC_PPL_SIZE 50000

#endif