Skip to content

Commit d35ad18

Browse files
wangxinabhishek-nexthop
authored andcommitted
Fix dead worker issue by using SafeThreadPoolExecutor (sonic-net#21423)
According to sonic-net#19263, python 3.12 enforces more rigorous check around fork() in multiple-threaded programs. After the docker-sonic-mgmt image is upgraded to Ubuntu 24.04. python and ansible are upgraded too. With python 3.12 and ansible 2.18 in new docker-sonic-mgmt, the nbrhosts fixture depends on concurrent.futures may fail with error like below: ``` self = <ansible.plugins.strategy.linear.StrategyModule object at 0x7596c07986e0> iterator = <ansible.executor.play_iterator.PlayIterator object at 0x7596c09b2a80> def _wait_on_pending_results(self, iterator): ''' Wait for the shared counter to drop to zero, using a short sleep between checks to ensure we don't spin lock ''' ret_results = [] display.debug("waiting for pending results...") while self._pending_results > 0 and not self._tqm._terminated: if self._tqm.has_dead_workers(): > raise AnsibleError("A worker was found in a dead state") E ansible.errors.AnsibleError: A worker was found in a dead state ``` PR sonic-net#21407 introduced threading lock to temporarily workaround the issue. A better way to fix the issue is to use the SafeThreadPoolExecutor updated in sonic-net#19263 to initialize the `nbrhosts` objects. This change reverted the threading lock of PR sonic-net#21407 and updated the `nbrhosts` fixture to use the new SafeThreadPoolExecutor. Signed-off-by: Xin Wang <xiwang5@microsoft.com> Signed-off-by: Abhishek <abhishek@nexthop.ai>
1 parent 41eef4b commit d35ad18

1 file changed

Lines changed: 56 additions & 64 deletions

File tree

tests/conftest.py

Lines changed: 56 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
1-
import concurrent.futures
21
from functools import lru_cache
32
import enum
43
import os
54
import json
65
import logging
76
import random
8-
from concurrent.futures import as_completed
97
import re
108
import sys
119

@@ -89,7 +87,6 @@
8987

9088
logger = logging.getLogger(__name__)
9189
cache = FactsCache()
92-
_ansible_tqm_lock = threading.Lock()
9390

9491
DUTHOSTS_FIXTURE_FAILED_RC = 15
9592
CUSTOM_MSG_PREFIX = "sonic_custom_msg"
@@ -859,75 +856,70 @@ def nbrhosts(enhance_inventory, ansible_adhoc, tbinfo, creds, request):
859856
return devices
860857

861858
def initial_neighbor(neighbor_name, vm_name):
862-
with _ansible_tqm_lock:
863-
logger.info(f"nbrhosts started: {neighbor_name}_{vm_name}")
864-
if neighbor_type == "eos":
865-
device = NeighborDevice(
866-
{
867-
'host': EosHost(
868-
ansible_adhoc,
869-
vm_name,
870-
creds['eos_login'],
871-
creds['eos_password'],
872-
shell_user=creds['eos_root_user'] if 'eos_root_user' in creds else None,
873-
shell_passwd=creds['eos_root_password'] if 'eos_root_password' in creds else None
874-
),
875-
'conf': tbinfo['topo']['properties']['configuration'][neighbor_name]
876-
}
877-
)
878-
elif neighbor_type == "sonic":
879-
device = NeighborDevice(
880-
{
881-
'host': SonicHost(
882-
ansible_adhoc,
883-
vm_name,
884-
ssh_user=creds['sonic_login'] if 'sonic_login' in creds else None,
885-
ssh_passwd=creds['sonic_password'] if 'sonic_password' in creds else None
886-
),
887-
'conf': tbinfo['topo']['properties']['configuration'][neighbor_name]
888-
}
889-
)
890-
elif neighbor_type == "cisco":
891-
device = NeighborDevice(
892-
{
893-
'host': CiscoHost(
894-
ansible_adhoc,
895-
vm_name,
896-
creds['cisco_login'],
897-
creds['cisco_password'],
898-
),
899-
'conf': tbinfo['topo']['properties']['configuration'][neighbor_name]
900-
}
901-
)
902-
else:
903-
raise ValueError("Unknown neighbor type %s" % (neighbor_type,))
904-
devices[neighbor_name] = device
905-
logger.info(f"nbrhosts finished: {neighbor_name}_{vm_name}")
859+
logger.info(f"nbrhosts started: {neighbor_name}_{vm_name}")
860+
if neighbor_type == "eos":
861+
device = NeighborDevice(
862+
{
863+
'host': EosHost(
864+
ansible_adhoc,
865+
vm_name,
866+
creds['eos_login'],
867+
creds['eos_password'],
868+
shell_user=creds['eos_root_user'] if 'eos_root_user' in creds else None,
869+
shell_passwd=creds['eos_root_password'] if 'eos_root_password' in creds else None
870+
),
871+
'conf': tbinfo['topo']['properties']['configuration'][neighbor_name]
872+
}
873+
)
874+
elif neighbor_type == "sonic":
875+
device = NeighborDevice(
876+
{
877+
'host': SonicHost(
878+
ansible_adhoc,
879+
vm_name,
880+
ssh_user=creds['sonic_login'] if 'sonic_login' in creds else None,
881+
ssh_passwd=creds['sonic_password'] if 'sonic_password' in creds else None
882+
),
883+
'conf': tbinfo['topo']['properties']['configuration'][neighbor_name]
884+
}
885+
)
886+
elif neighbor_type == "cisco":
887+
device = NeighborDevice(
888+
{
889+
'host': CiscoHost(
890+
ansible_adhoc,
891+
vm_name,
892+
creds['cisco_login'],
893+
creds['cisco_password'],
894+
),
895+
'conf': tbinfo['topo']['properties']['configuration'][neighbor_name]
896+
}
897+
)
898+
else:
899+
raise ValueError("Unknown neighbor type %s" % (neighbor_type,))
900+
devices[neighbor_name] = device
901+
logger.info(f"nbrhosts finished: {neighbor_name}_{vm_name}")
906902

907-
executor = concurrent.futures.ThreadPoolExecutor(max_workers=8)
908-
futures = []
909903
servers = []
910904
if 'servers' in tbinfo:
911905
servers.extend(tbinfo['servers'].values())
912906
elif 'server' in tbinfo:
913907
servers.append(tbinfo)
914908
else:
915909
logger.warning("Unknown testbed schema for setup nbrhosts")
916-
for server in servers:
917-
vm_base = int(server['vm_base'][2:])
918-
vm_name_fmt = 'VM%0{}d'.format(len(server['vm_base']) - 2)
919-
vms = MultiServersUtils.get_vms_by_dut_interfaces(
920-
tbinfo['topo']['properties']['topology']['VMs'],
921-
server['dut_interfaces']
922-
) if 'dut_interfaces' in server else tbinfo['topo']['properties']['topology']['VMs']
923-
for neighbor_name, neighbor in vms.items():
924-
vm_name = vm_name_fmt % (vm_base + neighbor['vm_offset'])
925-
futures.append(executor.submit(initial_neighbor, neighbor_name, vm_name))
926-
927-
for future in as_completed(futures):
928-
# if exception caught in the sub-thread, .result() will raise it in the main thread
929-
_ = future.result()
930-
executor.shutdown(wait=True)
910+
911+
with SafeThreadPoolExecutor(max_workers=8) as executor:
912+
for server in servers:
913+
vm_base = int(server['vm_base'][2:])
914+
vm_name_fmt = 'VM%0{}d'.format(len(server['vm_base']) - 2)
915+
vms = MultiServersUtils.get_vms_by_dut_interfaces(
916+
tbinfo['topo']['properties']['topology']['VMs'],
917+
server['dut_interfaces']
918+
) if 'dut_interfaces' in server else tbinfo['topo']['properties']['topology']['VMs']
919+
for neighbor_name, neighbor in vms.items():
920+
vm_name = vm_name_fmt % (vm_base + neighbor['vm_offset'])
921+
executor.submit(initial_neighbor, neighbor_name, vm_name)
922+
931923
logger.info("Fixture nbrhosts finished")
932924
return devices
933925

0 commit comments

Comments
 (0)