Skip to content

Commit c76696b

Browse files
daallarlakshm
authored andcommitted
[everflow] Add retry mechanism for mirror sessions and policers (sonic-net#1486)
Signed-off-by: Danny Allen <daall@microsoft.com> Signed-off-by: Arvindsrinivasan Lakshmi Narasimhan <arlakshm@microsoft.com>
1 parent 7ff817e commit c76696b

13 files changed

Lines changed: 243 additions & 86 deletions

File tree

orchagent/aclorch.cpp

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1083,12 +1083,6 @@ bool AclRuleMirror::validateAddAction(string attr_name, string attr_value)
10831083

10841084
m_sessionName = attr_value;
10851085

1086-
if (!m_pMirrorOrch->sessionExists(m_sessionName))
1087-
{
1088-
SWSS_LOG_ERROR("Mirror rule reference mirror session that does not exists %s", m_sessionName.c_str());
1089-
return false;
1090-
}
1091-
10921086
// insert placeholder value, we'll set the session oid in AclRuleMirror::create()
10931087
m_actions[action] = sai_attribute_value_t{};
10941088

@@ -1178,6 +1172,12 @@ bool AclRuleMirror::create()
11781172
sai_object_id_t oid = SAI_NULL_OBJECT_ID;
11791173
bool state = false;
11801174

1175+
if (!m_pMirrorOrch->sessionExists(m_sessionName))
1176+
{
1177+
SWSS_LOG_ERROR("Mirror rule references mirror session \"%s\" that does not exist yet", m_sessionName.c_str());
1178+
return false;
1179+
}
1180+
11811181
if (!m_pMirrorOrch->getSessionStatus(m_sessionName, state))
11821182
{
11831183
SWSS_LOG_THROW("Failed to get mirror session state for session %s", m_sessionName.c_str());
@@ -3124,7 +3124,16 @@ void AclOrch::doAclRuleTask(Consumer &consumer)
31243124
}
31253125

31263126

3127-
newRule = AclRule::makeShared(type, this, m_mirrorOrch, m_dTelOrch, rule_id, table_id, t);
3127+
try
3128+
{
3129+
newRule = AclRule::makeShared(type, this, m_mirrorOrch, m_dTelOrch, rule_id, table_id, t);
3130+
}
3131+
catch (exception &e)
3132+
{
3133+
SWSS_LOG_ERROR("Error while creating ACL rule %s: %s", rule_id.c_str(), e.what());
3134+
it = consumer.m_toSync.erase(it);
3135+
return;
3136+
}
31283137

31293138
for (const auto& itr : kfvFieldsValues(t))
31303139
{

orchagent/aclorch.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,8 @@ class AclOrch : public Orch, public Observer
434434
// Get the OID for the ACL bind point for a given port
435435
static bool getAclBindPortId(Port& port, sai_object_id_t& port_id);
436436

437+
using Orch::doTask; // Allow access to the basic doTask
438+
437439
private:
438440
SwitchOrch *m_switchOrch;
439441
void doTask(Consumer &consumer);

orchagent/mirrororch.cpp

Lines changed: 28 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,6 @@ bool MirrorOrch::bake()
8787
{
8888
SWSS_LOG_ENTER();
8989

90-
// Freeze the route update during orchagent restoration
91-
m_freeze = true;
92-
9390
deque<KeyOpFieldsValuesTuple> entries;
9491
vector<string> keys;
9592
m_mirrorTable.getKeys(keys);
@@ -134,23 +131,6 @@ bool MirrorOrch::bake()
134131
return Orch::bake();
135132
}
136133

137-
bool MirrorOrch::postBake()
138-
{
139-
SWSS_LOG_ENTER();
140-
141-
SWSS_LOG_NOTICE("Start MirrorOrch post-baking");
142-
143-
// Unfreeze the route update
144-
m_freeze = false;
145-
146-
Orch::doTask();
147-
148-
// Clean up the recovery cache
149-
m_recoverySessionMap.clear();
150-
151-
return Orch::postBake();
152-
}
153-
154134
void MirrorOrch::update(SubjectType type, void *cntx)
155135
{
156136
SWSS_LOG_ENTER();
@@ -340,7 +320,7 @@ bool MirrorOrch::validateSrcPortList(const string& srcPortList)
340320
return true;
341321
}
342322

343-
void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& data)
323+
task_process_status MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& data)
344324
{
345325
SWSS_LOG_ENTER();
346326

@@ -349,7 +329,7 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
349329
{
350330
SWSS_LOG_NOTICE("Failed to create session, session %s already exists",
351331
key.c_str());
352-
return;
332+
return task_process_status::task_duplicated;
353333
}
354334

355335
string platform = getenv("platform") ? getenv("platform") : "";
@@ -364,7 +344,7 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
364344
if (!entry.srcIp.isV4())
365345
{
366346
SWSS_LOG_ERROR("Unsupported version of sessions %s source IP address", key.c_str());
367-
return;
347+
return task_process_status::task_invalid_entry;
368348
}
369349
}
370350
else if (fvField(i) == MIRROR_SESSION_DST_IP)
@@ -373,7 +353,7 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
373353
if (!entry.dstIp.isV4())
374354
{
375355
SWSS_LOG_ERROR("Unsupported version of sessions %s destination IP address", key.c_str());
376-
return;
356+
return task_process_status::task_invalid_entry;
377357
}
378358
}
379359
else if (fvField(i) == MIRROR_SESSION_GRE_TYPE)
@@ -398,7 +378,7 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
398378
{
399379
SWSS_LOG_ERROR("Failed to get policer %s",
400380
fvValue(i).c_str());
401-
return;
381+
return task_process_status::task_need_retry;
402382
}
403383

404384
m_policerOrch->increaseRefCount(fvValue(i));
@@ -409,7 +389,7 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
409389
if (!validateSrcPortList(fvValue(i)))
410390
{
411391
SWSS_LOG_ERROR("Failed to get valid source port list %s", fvValue(i).c_str());
412-
return;
392+
return task_process_status::task_invalid_entry;
413393
}
414394
entry.src_port = fvValue(i);
415395
}
@@ -418,7 +398,7 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
418398
if (!validateDstPort(fvValue(i)))
419399
{
420400
SWSS_LOG_ERROR("Failed to get valid destination port %s", fvValue(i).c_str());
421-
return;
401+
return task_process_status::task_invalid_entry;
422402
}
423403
entry.dst_port = fvValue(i);
424404
}
@@ -428,7 +408,7 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
428408
|| fvValue(i) == MIRROR_BOTH_DIRECTION))
429409
{
430410
SWSS_LOG_ERROR("Failed to get valid direction %s", fvValue(i).c_str());
431-
return;
411+
return task_process_status::task_invalid_entry;
432412
}
433413
entry.direction = fvValue(i);
434414
}
@@ -439,18 +419,18 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
439419
else
440420
{
441421
SWSS_LOG_ERROR("Failed to parse session %s configuration. Unknown attribute %s", key.c_str(), fvField(i).c_str());
442-
return;
422+
return task_process_status::task_invalid_entry;
443423
}
444424
}
445425
catch (const exception& e)
446426
{
447427
SWSS_LOG_ERROR("Failed to parse session %s attribute %s error: %s.", key.c_str(), fvField(i).c_str(), e.what());
448-
return;
428+
return task_process_status::task_invalid_entry;
449429
}
450430
catch (...)
451431
{
452432
SWSS_LOG_ERROR("Failed to parse session %s attribute %s. Unknown error has been occurred", key.c_str(), fvField(i).c_str());
453-
return;
433+
return task_process_status::task_failed;
454434
}
455435
}
456436

@@ -470,6 +450,8 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
470450
// Attach the destination IP to the routeOrch
471451
m_routeOrch->attach(this, entry.dstIp);
472452
}
453+
454+
return task_process_status::task_success;
473455
}
474456

475457
task_process_status MirrorOrch::deleteEntry(const string& name)
@@ -1412,11 +1394,6 @@ void MirrorOrch::doTask(Consumer& consumer)
14121394
{
14131395
SWSS_LOG_ENTER();
14141396

1415-
if (m_freeze)
1416-
{
1417-
return;
1418-
}
1419-
14201397
if (!gPortsOrch->allPortsReady())
14211398
{
14221399
return;
@@ -1429,26 +1406,32 @@ void MirrorOrch::doTask(Consumer& consumer)
14291406

14301407
string key = kfvKey(t);
14311408
string op = kfvOp(t);
1409+
task_process_status task_status = task_process_status::task_failed;
14321410

14331411
if (op == SET_COMMAND)
14341412
{
1435-
createEntry(key, kfvFieldsValues(t));
1413+
task_status = createEntry(key, kfvFieldsValues(t));
14361414
}
14371415
else if (op == DEL_COMMAND)
14381416
{
1439-
auto task_status = deleteEntry(key);
1440-
// Specifically retry the task when asked
1441-
if (task_status == task_process_status::task_need_retry)
1442-
{
1443-
it++;
1444-
continue;
1445-
}
1417+
task_status = deleteEntry(key);
14461418
}
14471419
else
14481420
{
14491421
SWSS_LOG_ERROR("Unknown operation type %s", op.c_str());
14501422
}
14511423

1452-
consumer.m_toSync.erase(it++);
1424+
// Specifically retry the task when asked
1425+
if (task_status == task_process_status::task_need_retry)
1426+
{
1427+
it++;
1428+
}
1429+
else
1430+
{
1431+
consumer.m_toSync.erase(it++);
1432+
}
14531433
}
1434+
1435+
// Clear any recovery state that might be leftover from warm reboot
1436+
m_recoverySessionMap.clear();
14541437
}

orchagent/mirrororch.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,14 +80,15 @@ class MirrorOrch : public Orch, public Observer, public Subject
8080
PortsOrch *portOrch, RouteOrch *routeOrch, NeighOrch *neighOrch, FdbOrch *fdbOrch, PolicerOrch *policerOrch);
8181

8282
bool bake() override;
83-
bool postBake() override;
8483
void update(SubjectType, void *);
8584
bool sessionExists(const string&);
8685
bool getSessionStatus(const string&, bool&);
8786
bool getSessionOid(const string&, sai_object_id_t&);
8887
bool increaseRefCount(const string&);
8988
bool decreaseRefCount(const string&);
9089

90+
using Orch::doTask; // Allow access to the basic doTask
91+
9192
private:
9293
PortsOrch *m_portsOrch;
9394
RouteOrch *m_routeOrch;
@@ -101,9 +102,7 @@ class MirrorOrch : public Orch, public Observer, public Subject
101102
// session_name -> VLAN | monitor_port_alias | next_hop_ip
102103
map<string, string> m_recoverySessionMap;
103104

104-
bool m_freeze = false;
105-
106-
void createEntry(const string&, const vector<FieldValueTuple>&);
105+
task_process_status createEntry(const string&, const vector<FieldValueTuple>&);
107106
task_process_status deleteEntry(const string&);
108107

109108
bool activateSession(const string&, MirrorEntry&);

orchagent/orch.cpp

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -299,13 +299,6 @@ bool Orch::bake()
299299
return true;
300300
}
301301

302-
bool Orch::postBake()
303-
{
304-
SWSS_LOG_ENTER();
305-
306-
return true;
307-
}
308-
309302
/*
310303
- Validates reference has proper format which is [table_name:object_name]
311304
- validates table_name exists

orchagent/orch.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ typedef enum
5050
task_invalid_entry,
5151
task_failed,
5252
task_need_retry,
53-
task_ignore
53+
task_ignore,
54+
task_duplicated
5455
} task_process_status;
5556

5657
typedef struct
@@ -204,8 +205,6 @@ class Orch
204205
// Prepare for warm start if Redis contains valid input data
205206
// otherwise fallback to cold start
206207
virtual bool bake();
207-
// Clean up the state set in bake()
208-
virtual bool postBake();
209208

210209
/* Iterate all consumers in m_consumerMap and run doTask(Consumer) */
211210
virtual void doTask();

orchagent/orchdaemon.cpp

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ NeighOrch *gNeighOrch;
3232
RouteOrch *gRouteOrch;
3333
FgNhgOrch *gFgNhgOrch;
3434
AclOrch *gAclOrch;
35+
MirrorOrch *gMirrorOrch;
3536
CrmOrch *gCrmOrch;
3637
BufferOrch *gBufferOrch;
3738
SwitchOrch *gSwitchOrch;
@@ -177,7 +178,7 @@ bool OrchDaemon::init()
177178

178179
TableConnector stateDbMirrorSession(m_stateDb, STATE_MIRROR_SESSION_TABLE_NAME);
179180
TableConnector confDbMirrorSession(m_configDb, CFG_MIRROR_SESSION_TABLE_NAME);
180-
MirrorOrch *mirror_orch = new MirrorOrch(stateDbMirrorSession, confDbMirrorSession, gPortsOrch, gRouteOrch, gNeighOrch, gFdbOrch, policer_orch);
181+
gMirrorOrch = new MirrorOrch(stateDbMirrorSession, confDbMirrorSession, gPortsOrch, gRouteOrch, gNeighOrch, gFdbOrch, policer_orch);
181182

182183
TableConnector confDbAclTable(m_configDb, CFG_ACL_TABLE_TABLE_NAME);
183184
TableConnector confDbAclRuleTable(m_configDb, CFG_ACL_RULE_TABLE_NAME);
@@ -273,10 +274,10 @@ bool OrchDaemon::init()
273274
dtel_orch = new DTelOrch(m_configDb, dtel_tables, gPortsOrch);
274275
m_orchList.push_back(dtel_orch);
275276
}
276-
gAclOrch = new AclOrch(acl_table_connectors, gSwitchOrch, gPortsOrch, mirror_orch, gNeighOrch, gRouteOrch, dtel_orch);
277+
gAclOrch = new AclOrch(acl_table_connectors, gSwitchOrch, gPortsOrch, gMirrorOrch, gNeighOrch, gRouteOrch, dtel_orch);
277278

278279
m_orchList.push_back(gFdbOrch);
279-
m_orchList.push_back(mirror_orch);
280+
m_orchList.push_back(gMirrorOrch);
280281
m_orchList.push_back(gAclOrch);
281282
m_orchList.push_back(chassis_frontend_orch);
282283
m_orchList.push_back(vrf_orch);
@@ -548,18 +549,24 @@ bool OrchDaemon::warmRestoreAndSyncUp()
548549

549550
for (auto it = 0; it < 3; it++)
550551
{
551-
SWSS_LOG_DEBUG("The current iteration is %d", it);
552+
SWSS_LOG_DEBUG("The current doTask iteration is %d", it);
552553

553554
for (Orch *o : m_orchList)
554555
{
556+
if (o == gMirrorOrch) {
557+
SWSS_LOG_DEBUG("Skipping mirror processing until the end");
558+
continue;
559+
}
560+
555561
o->doTask();
556562
}
557563
}
558564

559-
for (Orch *o : m_orchList)
560-
{
561-
o->postBake();
562-
}
565+
// MirrorOrch depends on everything else being settled before it can run,
566+
// and mirror ACL rules depend on MirrorOrch, so run these two at the end
567+
// after the rest of the data has been processed.
568+
gMirrorOrch->doTask();
569+
gAclOrch->doTask();
563570

564571
/*
565572
* At this point, all the pre-existing data should have been processed properly, and

tests/conftest.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1021,16 +1021,25 @@ def remove_neighbor(self, interface, ip):
10211021
tbl._del(interface + ":" + ip)
10221022
time.sleep(1)
10231023

1024-
# deps: mirror_port_erspan
1024+
# deps: mirror_port_erspan, warm_reboot
10251025
def add_route(self, prefix, nexthop):
10261026
self.runcmd("ip route add " + prefix + " via " + nexthop)
10271027
time.sleep(1)
10281028

1029-
# deps: mirror_port_erspan
1029+
# deps: mirror_port_erspan, warm_reboot
10301030
def change_route(self, prefix, nexthop):
10311031
self.runcmd("ip route change " + prefix + " via " + nexthop)
10321032
time.sleep(1)
10331033

1034+
# deps: warm_reboot
1035+
def change_route_ecmp(self, prefix, nexthops):
1036+
cmd = ""
1037+
for nexthop in nexthops:
1038+
cmd += " nexthop via " + nexthop
1039+
1040+
self.runcmd("ip route change " + prefix + cmd)
1041+
time.sleep(1)
1042+
10341043
# deps: acl, mirror_port_erspan
10351044
def remove_route(self, prefix):
10361045
self.runcmd("ip route del " + prefix)

0 commit comments

Comments
 (0)