Skip to content

Commit 124a1ae

Browse files
wangxinlakshmi-nexthop
authored andcommitted
Fix dead worker issue by using SafeThreadPoolExecutor (sonic-net#21423)
According to sonic-net#19263, python 3.12 enforces more rigorous check around fork() in multiple-threaded programs. After the docker-sonic-mgmt image is upgraded to Ubuntu 24.04. python and ansible are upgraded too. With python 3.12 and ansible 2.18 in new docker-sonic-mgmt, the nbrhosts fixture depends on concurrent.futures may fail with error like below: ``` self = <ansible.plugins.strategy.linear.StrategyModule object at 0x7596c07986e0> iterator = <ansible.executor.play_iterator.PlayIterator object at 0x7596c09b2a80> def _wait_on_pending_results(self, iterator): ''' Wait for the shared counter to drop to zero, using a short sleep between checks to ensure we don't spin lock ''' ret_results = [] display.debug("waiting for pending results...") while self._pending_results > 0 and not self._tqm._terminated: if self._tqm.has_dead_workers(): > raise AnsibleError("A worker was found in a dead state") E ansible.errors.AnsibleError: A worker was found in a dead state ``` PR sonic-net#21407 introduced threading lock to temporarily workaround the issue. A better way to fix the issue is to use the SafeThreadPoolExecutor updated in sonic-net#19263 to initialize the `nbrhosts` objects. This change reverted the threading lock of PR sonic-net#21407 and updated the `nbrhosts` fixture to use the new SafeThreadPoolExecutor. Signed-off-by: Xin Wang <[email protected]> Signed-off-by: Lakshmi Yarramaneni <[email protected]>
1 parent f5705bd commit 124a1ae

1 file changed

Lines changed: 13 additions & 19 deletions

File tree

tests/conftest.py

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
1-
import concurrent.futures
21
from functools import lru_cache
32
import enum
43
import os
54
import json
65
import logging
76
import random
8-
from concurrent.futures import as_completed
97
import re
108
import sys
119

@@ -904,30 +902,26 @@ def initial_neighbor(neighbor_name, vm_name):
904902
devices[neighbor_name] = device
905903
logger.info(f"nbrhosts finished: {neighbor_name}_{vm_name}")
906904

907-
executor = concurrent.futures.ThreadPoolExecutor(max_workers=8)
908-
futures = []
909905
servers = []
910906
if 'servers' in tbinfo:
911907
servers.extend(tbinfo['servers'].values())
912908
elif 'server' in tbinfo:
913909
servers.append(tbinfo)
914910
else:
915911
logger.warning("Unknown testbed schema for setup nbrhosts")
916-
for server in servers:
917-
vm_base = int(server['vm_base'][2:])
918-
vm_name_fmt = 'VM%0{}d'.format(len(server['vm_base']) - 2)
919-
vms = MultiServersUtils.get_vms_by_dut_interfaces(
920-
tbinfo['topo']['properties']['topology']['VMs'],
921-
server['dut_interfaces']
922-
) if 'dut_interfaces' in server else tbinfo['topo']['properties']['topology']['VMs']
923-
for neighbor_name, neighbor in vms.items():
924-
vm_name = vm_name_fmt % (vm_base + neighbor['vm_offset'])
925-
futures.append(executor.submit(initial_neighbor, neighbor_name, vm_name))
926-
927-
for future in as_completed(futures):
928-
# if exception caught in the sub-thread, .result() will raise it in the main thread
929-
_ = future.result()
930-
executor.shutdown(wait=True)
912+
913+
with SafeThreadPoolExecutor(max_workers=8) as executor:
914+
for server in servers:
915+
vm_base = int(server['vm_base'][2:])
916+
vm_name_fmt = 'VM%0{}d'.format(len(server['vm_base']) - 2)
917+
vms = MultiServersUtils.get_vms_by_dut_interfaces(
918+
tbinfo['topo']['properties']['topology']['VMs'],
919+
server['dut_interfaces']
920+
) if 'dut_interfaces' in server else tbinfo['topo']['properties']['topology']['VMs']
921+
for neighbor_name, neighbor in vms.items():
922+
vm_name = vm_name_fmt % (vm_base + neighbor['vm_offset'])
923+
executor.submit(initial_neighbor, neighbor_name, vm_name)
924+
931925
logger.info("Fixture nbrhosts finished")
932926
return devices
933927

0 commit comments

Comments
 (0)