Skip to content

Commit 6e9bd06

Browse files
wangxingshemesh2
authored andcommitted
Fix dead worker issue by using SafeThreadPoolExecutor (sonic-net#21423)
According to sonic-net#19263, python 3.12 enforces more rigorous check around fork() in multiple-threaded programs. After the docker-sonic-mgmt image is upgraded to Ubuntu 24.04. python and ansible are upgraded too. With python 3.12 and ansible 2.18 in new docker-sonic-mgmt, the nbrhosts fixture depends on concurrent.futures may fail with error like below: ``` self = <ansible.plugins.strategy.linear.StrategyModule object at 0x7596c07986e0> iterator = <ansible.executor.play_iterator.PlayIterator object at 0x7596c09b2a80> def _wait_on_pending_results(self, iterator): ''' Wait for the shared counter to drop to zero, using a short sleep between checks to ensure we don't spin lock ''' ret_results = [] display.debug("waiting for pending results...") while self._pending_results > 0 and not self._tqm._terminated: if self._tqm.has_dead_workers(): > raise AnsibleError("A worker was found in a dead state") E ansible.errors.AnsibleError: A worker was found in a dead state ``` PR sonic-net#21407 introduced threading lock to temporarily workaround the issue. A better way to fix the issue is to use the SafeThreadPoolExecutor updated in sonic-net#19263 to initialize the `nbrhosts` objects. This change reverted the threading lock of PR sonic-net#21407 and updated the `nbrhosts` fixture to use the new SafeThreadPoolExecutor. Signed-off-by: Xin Wang <[email protected]> Signed-off-by: Guy Shemesh <[email protected]>
1 parent 903e738 commit 6e9bd06

File tree

1 file changed

+56
-64
lines changed

1 file changed

+56
-64
lines changed

tests/conftest.py

Lines changed: 56 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
1-
import concurrent.futures
21
from functools import lru_cache
32
import enum
43
import os
54
import json
65
import logging
76
import random
8-
from concurrent.futures import as_completed
97
import re
108
import sys
119

@@ -89,7 +87,6 @@
8987

9088
logger = logging.getLogger(__name__)
9189
cache = FactsCache()
92-
_ansible_tqm_lock = threading.Lock()
9390

9491
DUTHOSTS_FIXTURE_FAILED_RC = 15
9592
CUSTOM_MSG_PREFIX = "sonic_custom_msg"
@@ -864,75 +861,70 @@ def nbrhosts(enhance_inventory, ansible_adhoc, tbinfo, creds, request):
864861
return devices
865862

866863
def initial_neighbor(neighbor_name, vm_name):
867-
with _ansible_tqm_lock:
868-
logger.info(f"nbrhosts started: {neighbor_name}_{vm_name}")
869-
if neighbor_type == "eos":
870-
device = NeighborDevice(
871-
{
872-
'host': EosHost(
873-
ansible_adhoc,
874-
vm_name,
875-
creds['eos_login'],
876-
creds['eos_password'],
877-
shell_user=creds['eos_root_user'] if 'eos_root_user' in creds else None,
878-
shell_passwd=creds['eos_root_password'] if 'eos_root_password' in creds else None
879-
),
880-
'conf': tbinfo['topo']['properties']['configuration'][neighbor_name]
881-
}
882-
)
883-
elif neighbor_type == "sonic":
884-
device = NeighborDevice(
885-
{
886-
'host': SonicHost(
887-
ansible_adhoc,
888-
vm_name,
889-
ssh_user=creds['sonic_login'] if 'sonic_login' in creds else None,
890-
ssh_passwd=creds['sonic_password'] if 'sonic_password' in creds else None
891-
),
892-
'conf': tbinfo['topo']['properties']['configuration'][neighbor_name]
893-
}
894-
)
895-
elif neighbor_type == "cisco":
896-
device = NeighborDevice(
897-
{
898-
'host': CiscoHost(
899-
ansible_adhoc,
900-
vm_name,
901-
creds['cisco_login'],
902-
creds['cisco_password'],
903-
),
904-
'conf': tbinfo['topo']['properties']['configuration'][neighbor_name]
905-
}
906-
)
907-
else:
908-
raise ValueError("Unknown neighbor type %s" % (neighbor_type,))
909-
devices[neighbor_name] = device
910-
logger.info(f"nbrhosts finished: {neighbor_name}_{vm_name}")
864+
logger.info(f"nbrhosts started: {neighbor_name}_{vm_name}")
865+
if neighbor_type == "eos":
866+
device = NeighborDevice(
867+
{
868+
'host': EosHost(
869+
ansible_adhoc,
870+
vm_name,
871+
creds['eos_login'],
872+
creds['eos_password'],
873+
shell_user=creds['eos_root_user'] if 'eos_root_user' in creds else None,
874+
shell_passwd=creds['eos_root_password'] if 'eos_root_password' in creds else None
875+
),
876+
'conf': tbinfo['topo']['properties']['configuration'][neighbor_name]
877+
}
878+
)
879+
elif neighbor_type == "sonic":
880+
device = NeighborDevice(
881+
{
882+
'host': SonicHost(
883+
ansible_adhoc,
884+
vm_name,
885+
ssh_user=creds['sonic_login'] if 'sonic_login' in creds else None,
886+
ssh_passwd=creds['sonic_password'] if 'sonic_password' in creds else None
887+
),
888+
'conf': tbinfo['topo']['properties']['configuration'][neighbor_name]
889+
}
890+
)
891+
elif neighbor_type == "cisco":
892+
device = NeighborDevice(
893+
{
894+
'host': CiscoHost(
895+
ansible_adhoc,
896+
vm_name,
897+
creds['cisco_login'],
898+
creds['cisco_password'],
899+
),
900+
'conf': tbinfo['topo']['properties']['configuration'][neighbor_name]
901+
}
902+
)
903+
else:
904+
raise ValueError("Unknown neighbor type %s" % (neighbor_type,))
905+
devices[neighbor_name] = device
906+
logger.info(f"nbrhosts finished: {neighbor_name}_{vm_name}")
911907

912-
executor = concurrent.futures.ThreadPoolExecutor(max_workers=8)
913-
futures = []
914908
servers = []
915909
if 'servers' in tbinfo:
916910
servers.extend(tbinfo['servers'].values())
917911
elif 'server' in tbinfo:
918912
servers.append(tbinfo)
919913
else:
920914
logger.warning("Unknown testbed schema for setup nbrhosts")
921-
for server in servers:
922-
vm_base = int(server['vm_base'][2:])
923-
vm_name_fmt = 'VM%0{}d'.format(len(server['vm_base']) - 2)
924-
vms = MultiServersUtils.get_vms_by_dut_interfaces(
925-
tbinfo['topo']['properties']['topology']['VMs'],
926-
server['dut_interfaces']
927-
) if 'dut_interfaces' in server else tbinfo['topo']['properties']['topology']['VMs']
928-
for neighbor_name, neighbor in vms.items():
929-
vm_name = vm_name_fmt % (vm_base + neighbor['vm_offset'])
930-
futures.append(executor.submit(initial_neighbor, neighbor_name, vm_name))
931-
932-
for future in as_completed(futures):
933-
# if exception caught in the sub-thread, .result() will raise it in the main thread
934-
_ = future.result()
935-
executor.shutdown(wait=True)
915+
916+
with SafeThreadPoolExecutor(max_workers=8) as executor:
917+
for server in servers:
918+
vm_base = int(server['vm_base'][2:])
919+
vm_name_fmt = 'VM%0{}d'.format(len(server['vm_base']) - 2)
920+
vms = MultiServersUtils.get_vms_by_dut_interfaces(
921+
tbinfo['topo']['properties']['topology']['VMs'],
922+
server['dut_interfaces']
923+
) if 'dut_interfaces' in server else tbinfo['topo']['properties']['topology']['VMs']
924+
for neighbor_name, neighbor in vms.items():
925+
vm_name = vm_name_fmt % (vm_base + neighbor['vm_offset'])
926+
executor.submit(initial_neighbor, neighbor_name, vm_name)
927+
936928
logger.info("Fixture nbrhosts finished")
937929
return devices
938930

0 commit comments

Comments
 (0)