Skip to content

Commit 5aa761d

Browse files
authored
Merge pull request #156 from mssonicbld/sonicbld/202205-merge
[code sync] Merge code from sonic-net/sonic-buildimage:202205 to 202205
2 parents 7f55d26 + e07bc6d commit 5aa761d

File tree

1 file changed

+87
-63
lines changed

1 file changed

+87
-63
lines changed

dockers/docker-orchagent/tunnel_packet_handler.py

Lines changed: 87 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,14 @@
88
destination IP to trigger the process of obtaining neighbor information
99
"""
1010
import subprocess
11+
import sys
1112
import time
1213
from datetime import datetime
1314
from ipaddress import ip_interface
15+
from queue import Queue
1416

15-
from swsssdk import ConfigDBConnector, SonicV2Connector
17+
from swsscommon.swsscommon import ConfigDBConnector, SonicV2Connector, \
18+
DBConnector, Select, SubscriberStateTable
1619
from sonic_py_common import logger as log
1720

1821
from pyroute2 import IPRoute
@@ -25,18 +28,35 @@
2528
logger = log.Logger()
2629

2730
STATE_DB = 'STATE_DB'
31+
APPL_DB = 'APPL_DB'
2832
PORTCHANNEL_INTERFACE_TABLE = 'PORTCHANNEL_INTERFACE'
2933
TUNNEL_TABLE = 'TUNNEL'
3034
PEER_SWITCH_TABLE = 'PEER_SWITCH'
3135
INTF_TABLE_TEMPLATE = 'INTERFACE_TABLE|{}|{}'
36+
LAG_TABLE = 'LAG_TABLE'
3237
STATE_KEY = 'state'
3338
TUNNEL_TYPE_KEY = 'tunnel_type'
3439
DST_IP_KEY = 'dst_ip'
3540
ADDRESS_IPV4_KEY = 'address_ipv4'
41+
OPER_STATUS_KEY = 'oper_status'
3642
IPINIP_TUNNEL = 'ipinip'
37-
3843
RTM_NEWLINK = 'RTM_NEWLINK'
44+
SELECT_TIMEOUT = 1000
45+
46+
nl_msgs = Queue()
47+
portchannel_intfs = None
48+
49+
def add_msg_to_queue(target, msg):
50+
"""
51+
Adds a netlink message to a queue
3952
53+
Args:
54+
target: unused, needed by NDB API
55+
msg: a netlink message
56+
"""
57+
58+
if msg.get_attr('IFLA_IFNAME') in portchannel_intfs:
59+
nl_msgs.put(msg)
4060

4161
class TunnelPacketHandler(object):
4262
"""
@@ -55,7 +75,10 @@ def __init__(self):
5575
self.sniffer = None
5676
self.self_ip = ''
5777
self.packet_filter = ''
58-
self.sniff_intfs = []
78+
self.sniff_intfs = set()
79+
80+
global portchannel_intfs
81+
portchannel_intfs = [name for name, _ in self.portchannel_intfs]
5982

6083
@property
6184
def portchannel_intfs(self):
@@ -95,17 +118,6 @@ def get_intf_name(self, msg):
95118

96119
return ''
97120

98-
def netlink_msg_is_for_portchannel(self, msg):
99-
"""
100-
Determines if a netlink message is about a PortChannel interface
101-
102-
Returns:
103-
(list) integers representing kernel indices
104-
"""
105-
ifname = self.get_intf_name(msg)
106-
107-
return ifname in [name for name, _ in self.portchannel_intfs]
108-
109121
def get_up_portchannels(self):
110122
"""
111123
Returns the portchannels which are operationally up
@@ -125,11 +137,11 @@ def get_up_portchannels(self):
125137
logger.log_notice("Skipping non-existent interface {}".format(intf))
126138
continue
127139
link_statuses.append(status[0])
128-
up_portchannels = []
140+
up_portchannels = set()
129141

130142
for status in link_statuses:
131-
if status['state'] == 'up':
132-
up_portchannels.append(self.get_intf_name(status))
143+
if status.get_attr('IFLA_OPERSTATE').lower() == 'up':
144+
up_portchannels.add(status.get_attr('IFLA_IFNAME'))
133145

134146
return up_portchannels
135147

@@ -242,52 +254,47 @@ def get_inner_pkt_type(self, packet):
242254
return IPv6
243255
return False
244256

245-
def wait_for_netlink_msgs(self):
246-
"""
247-
Gathers any RTM_NEWLINK messages
248-
249-
Returns:
250-
(list) containing any received messages
251-
"""
252-
msgs = []
253-
with IPRoute() as ipr:
254-
ipr.bind()
255-
for msg in ipr.get():
256-
if msg['event'] == RTM_NEWLINK:
257-
msgs.append(msg)
258-
259-
return msgs
260-
261-
def sniffer_restart_required(self, messages):
257+
def sniffer_restart_required(self, lag, fvs):
262258
"""
263259
Determines if the packet sniffer needs to be restarted
264260
265-
A restart is required if all of the following conditions are met:
266-
1. A netlink message of type RTM_NEWLINK is received
267-
(this is checked by `wait_for_netlink_msgs`)
268-
2. The interface index of the message corresponds to a portchannel
269-
interface
270-
3. The state of the interface in the message is 'up'
271-
Here, we do not care about an interface going down since
272-
the sniffer is able to continue sniffing on the other
273-
interfaces. However, if an interface has gone down and
274-
come back up, we need to restart the sniffer to be able
275-
to sniff traffic on the interface that has come back up.
261+
The sniffer needs to be restarted when a portchannel interface transitions
262+
from down to up. When a portchannel interface goes down, the sniffer is
263+
able to continue sniffing on other portchannels.
276264
"""
277-
for msg in messages:
278-
if self.netlink_msg_is_for_portchannel(msg):
279-
if msg['state'] == 'up':
280-
logger.log_info('{} came back up, sniffer restart required'
281-
.format(self.get_intf_name(msg)))
282-
return True
283-
return False
265+
oper_status = dict(fvs).get(OPER_STATUS_KEY)
266+
if lag not in self.sniff_intfs and oper_status == 'up':
267+
logger.log_info('{} came back up, sniffer restart required'
268+
.format(lag))
269+
# Don't need to modify self.sniff_intfs here since it is repopulated
270+
# by self.get_up_portchannels()
271+
return True
272+
elif lag in self.sniff_intfs and oper_status == 'down':
273+
# A portchannel interface went down, remove it from the list of
274+
# sniffed interfaces so we can detect when it comes back up
275+
self.sniff_intfs.remove(lag)
276+
return False
277+
else:
278+
return False
284279

285280
def start_sniffer(self):
286281
"""
287282
Starts an AsyncSniffer and waits for it to inititalize fully
288283
"""
284+
start = datetime.now()
285+
286+
self.sniff_intfs = self.get_up_portchannels()
287+
288+
while not self.sniff_intfs:
289+
logger.log_info('No portchannels are up yet...')
290+
if (datetime.now() - start).seconds > 180:
291+
logger.log_error('All portchannels failed to come up within 3 minutes, exiting...')
292+
sys.exit(1)
293+
self.sniff_intfs = self.get_up_portchannels()
294+
time.sleep(10)
295+
289296
self.sniffer = AsyncSniffer(
290-
iface=self.sniff_intfs,
297+
iface=list(self.sniff_intfs),
291298
filter=self.packet_filter,
292299
prn=self.ping_inner_dst,
293300
store=0
@@ -332,18 +339,35 @@ def listen_for_tunnel_pkts(self):
332339
logger.log_notice('Starting tunnel packet handler for {}'
333340
.format(self.packet_filter))
334341

335-
self.sniff_intfs = self.get_up_portchannels()
336-
logger.log_info("Listening on interfaces {}".format(self.sniff_intfs))
342+
343+
app_db = DBConnector(APPL_DB, 0)
344+
lag_table = SubscriberStateTable(app_db, LAG_TABLE)
345+
sel = Select()
346+
sel.addSelectable(lag_table)
337347

338348
self.start_sniffer()
349+
logger.log_info("Listening on interfaces {}".format(self.sniff_intfs))
339350
while True:
340-
msgs = self.wait_for_netlink_msgs()
341-
if self.sniffer_restart_required(msgs):
342-
self.sniffer.stop()
343-
sniff_intfs = self.get_up_portchannels()
344-
logger.log_notice('Restarting tunnel packet handler on '
345-
'interfaces {}'.format(sniff_intfs))
346-
self.start_sniffer()
351+
rc, _ = sel.select(SELECT_TIMEOUT)
352+
353+
if rc == Select.TIMEOUT:
354+
continue
355+
elif rc == Select.ERROR:
356+
raise Exception("Select() error")
357+
else:
358+
lag, op, fvs = lag_table.pop()
359+
if self.sniffer_restart_required(lag, fvs):
360+
self.sniffer.stop()
361+
start = datetime.now()
362+
# wait up to 3 seconds for the kernel interface to be synced with APPL_DB status
363+
while (datetime.now() - start).seconds < 3:
364+
self.sniff_intfs = self.get_up_portchannels()
365+
if lag in self.sniff_intfs:
366+
break
367+
time.sleep(0.1)
368+
logger.log_notice('Restarting tunnel packet handler on '
369+
'interfaces {}'.format(self.sniff_intfs))
370+
self.start_sniffer()
347371

348372
def run(self):
349373
"""
@@ -360,4 +384,4 @@ def main():
360384

361385

362386
if __name__ == "__main__":
363-
main()
387+
main()

0 commit comments

Comments
 (0)