@@ -11,6 +11,7 @@ class PortChangeEvent:
1111 PORT_REMOVE = 1
1212 PORT_SET = 2
1313 PORT_DEL = 3
14+ PORT_EVENT = {}
1415
1516 def __init__ (self , port_name , port_index , asic_id , event_type , port_dict = None ):
1617 # Logical port name, e.g. Ethernet0
@@ -105,11 +106,16 @@ def subscribe_port_config_change(namespaces):
105106 sel .addSelectable (port_tbl )
106107 return sel , asic_context
107108
108- def subscribe_port_update_event (namespaces ):
109+ def subscribe_port_update_event (namespaces , logger ):
110+ """
111+ Subscribe to a particular DB's table and listen to only interested fields
112+ Format :
113+ { <DB name> : <Table name> , <field1>, <field2>, .. } where only field<n> update will be received
114+ """
109115 port_tbl_map = [
110- {'APPL_DB ' : swsscommon .APP_PORT_TABLE_NAME },
116+ {'CONFIG_DB ' : swsscommon .CFG_PORT_TABLE_NAME },
111117 {'STATE_DB' : 'TRANSCEIVER_INFO' },
112- {'STATE_DB' : 'PORT_TABLE' },
118+ {'STATE_DB' : 'PORT_TABLE' , 'FILTER' : [ 'host_tx_ready' ] },
113119 ]
114120
115121 sel = swsscommon .Select ()
@@ -119,13 +125,18 @@ def subscribe_port_update_event(namespaces):
119125 db = daemon_base .db_connect (list (d .keys ())[0 ], namespace = namespace )
120126 asic_id = multi_asic .get_asic_index_from_namespace (namespace )
121127 port_tbl = swsscommon .SubscriberStateTable (db , list (d .values ())[0 ])
128+ port_tbl .db_name = list (d .keys ())[0 ]
129+ port_tbl .table_name = list (d .values ())[0 ]
130+ port_tbl .filter = d ['FILTER' ] if 'FILTER' in d else None
122131 asic_context [port_tbl ] = asic_id
123132 sel .addSelectable (port_tbl )
133+ logger .log_warning ("subscribing to port_tbl {} - {} DB of namespace {} " .format (
134+ port_tbl , list (d .values ())[0 ], namespace ))
124135 return sel , asic_context
125136
126137def handle_port_update_event (sel , asic_context , stop_event , logger , port_change_event_handler ):
127138 """
128- Select PORT update events, notify the observers upon a port update in APPL_DB/ CONFIG_DB
139+ Select PORT update events, notify the observers upon a port update in CONFIG_DB
129140 or a XCVR insertion/removal in STATE_DB
130141 """
131142 if not stop_event .is_set ():
@@ -135,6 +146,8 @@ def handle_port_update_event(sel, asic_context, stop_event, logger, port_change_
135146 if state != swsscommon .Select .OBJECT :
136147 logger .log_warning ('sel.select() did not return swsscommon.Select.OBJECT' )
137148 return
149+
150+ port_event_cache = {}
138151 for port_tbl in asic_context .keys ():
139152 while True :
140153 (key , op , fvp ) = port_tbl .pop ()
@@ -143,24 +156,56 @@ def handle_port_update_event(sel, asic_context, stop_event, logger, port_change_
143156 if not validate_port (key ):
144157 continue
145158 fvp = dict (fvp ) if fvp is not None else {}
159+ logger .log_warning ("$$$ {} handle_port_update_event() : op={} DB:{} Table:{} fvp {}" .format (
160+ key , op , port_tbl .db_name , port_tbl .table_name , fvp ))
161+
146162 if 'index' not in fvp :
147- fvp ['index' ] = '-1'
148- port_index = int (fvp ['index' ])
149- port_change_event = None
150- if op == swsscommon .SET_COMMAND :
151- port_change_event = PortChangeEvent (key ,
163+ fvp ['index' ] = '-1'
164+ fvp ['key' ] = key
165+ fvp ['asic_id' ] = asic_context [port_tbl ]
166+ fvp ['op' ] = op
167+ fvp ['FILTER' ] = port_tbl .filter
168+ # Soak duplicate events and consider only the last event
169+ port_event_cache [key + port_tbl .db_name + port_tbl .table_name ] = fvp
170+
171+ # Now apply filter over soaked events
172+ for key , fvp in port_event_cache .items ():
173+ port_index = int (fvp ['index' ])
174+ port_change_event = None
175+ diff = {}
176+ filter = fvp ['FILTER' ]
177+ del fvp ['FILTER' ]
178+ if key in PortChangeEvent .PORT_EVENT :
179+ diff = dict (set (fvp .items ()) - set (PortChangeEvent .PORT_EVENT [key ].items ()))
180+ # Ignore duplicate events
181+ if not diff :
182+ PortChangeEvent .PORT_EVENT [key ] = fvp
183+ continue
184+ # Ensure only interested field update gets through for processing
185+ if filter is not None :
186+ if not (set (filter ) & set (diff .keys ())):
187+ PortChangeEvent .PORT_EVENT [key ] = fvp
188+ continue
189+ PortChangeEvent .PORT_EVENT [key ] = fvp
190+
191+ if fvp ['op' ] == swsscommon .SET_COMMAND :
192+ port_change_event = PortChangeEvent (fvp ['key' ],
152193 port_index ,
153- asic_context [ port_tbl ],
194+ fvp [ 'asic_id' ],
154195 PortChangeEvent .PORT_SET ,
155196 fvp )
156- elif op == swsscommon .DEL_COMMAND :
157- port_change_event = PortChangeEvent (key ,
197+ elif fvp [ 'op' ] == swsscommon .DEL_COMMAND :
198+ port_change_event = PortChangeEvent (fvp [ ' key' ] ,
158199 port_index ,
159- asic_context [ port_tbl ],
200+ fvp [ 'asic_id' ],
160201 PortChangeEvent .PORT_DEL ,
161202 fvp )
162- if port_change_event is not None :
163- port_change_event_handler (port_change_event )
203+ # This is the final event considered for processing
204+ logger .log_warning ("*** {} handle_port_update_event() fvp {}" .format (
205+ key , fvp ))
206+ if port_change_event is not None :
207+ port_change_event_handler (port_change_event )
208+
164209
165210def handle_port_config_change (sel , asic_context , stop_event , port_mapping , logger , port_change_event_handler ):
166211 """Select CONFIG_DB PORT table changes, once there is a port configuration add/remove, notify observers
0 commit comments