Skip to content

Commit 31889c3

Browse files
cyw233gshemesh2
authored andcommitted
feat: new SafeThreadPoolExecutor for Ubuntu 24.04 upgrade (sonic-net#19263)
Description of PR We will soon upgrade sonic-mgmt docker to Ubuntu 24.04 which comes with Python 3.12 + Ansible 2.18.6. Since Python 3.12 enforces more rigorous checks around fork() in multi‐threaded programs, we will start getting the ansible.errors.AnsibleError: A worker was found in a dead state exception due to ThreadPoolExecutor from concurrent.futures.thread. To mitigate this issue, we re-implemented the SafeThreadPoolExecutor class with the traditional ThreadPool from multiprocessing.pool for multithreading operations. Summary: Fixes # (issue) Microsoft ADO 33039693 signed-off-by: jianquanye@microsoft.com Signed-off-by: Guy Shemesh <gshemesh@nvidia.com>
1 parent bfdeaa4 commit 31889c3

2 files changed

Lines changed: 79 additions & 20 deletions

File tree

azure-pipelines.yml

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ stages:
130130
MAX_WORKER: $(INSTANCE_NUMBER)
131131
KVM_IMAGE_BRANCH: $(BUILD_BRANCH)
132132
MGMT_BRANCH: $(BUILD_BRANCH)
133+
COMMON_EXTRA_PARAMS: "--disable_sai_validation "
133134

134135
- job: impacted_area_t0_2vlans_elastictest
135136
displayName: "impacted-area-kvmtest-t0-2vlans by Elastictest"
@@ -156,6 +157,7 @@ stages:
156157
DEPLOY_MG_EXTRA_PARAMS: "-e vlan_config=two_vlan_a"
157158
KVM_IMAGE_BRANCH: $(BUILD_BRANCH)
158159
MGMT_BRANCH: $(BUILD_BRANCH)
160+
COMMON_EXTRA_PARAMS: "--disable_sai_validation "
159161

160162
- job: impacted_area_t1_lag_elastictest
161163
displayName: "impacted-area-kvmtest-t1-lag by Elastictest"
@@ -183,6 +185,7 @@ stages:
183185
MAX_WORKER: $(INSTANCE_NUMBER)
184186
KVM_IMAGE_BRANCH: $(BUILD_BRANCH)
185187
MGMT_BRANCH: $(BUILD_BRANCH)
188+
COMMON_EXTRA_PARAMS: "--disable_sai_validation "
186189

187190
- job: impacted_area_dualtor_elastictest
188191
displayName: "impacted-area-kvmtest-dualtor by Elastictest"
@@ -208,7 +211,7 @@ stages:
208211
SCRIPTS: $(SCRIPTS)
209212
MIN_WORKER: $(INSTANCE_NUMBER)
210213
MAX_WORKER: $(INSTANCE_NUMBER)
211-
COMMON_EXTRA_PARAMS: "--disable_loganalyzer "
214+
COMMON_EXTRA_PARAMS: "--disable_loganalyzer --disable_sai_validation "
212215
KVM_IMAGE_BRANCH: $(BUILD_BRANCH)
213216
MGMT_BRANCH: $(BUILD_BRANCH)
214217

@@ -237,6 +240,7 @@ stages:
237240
NUM_ASIC: 4
238241
KVM_IMAGE_BRANCH: $(BUILD_BRANCH)
239242
MGMT_BRANCH: $(BUILD_BRANCH)
243+
COMMON_EXTRA_PARAMS: "--disable_sai_validation "
240244

241245
- job: impacted_area_t0_sonic_elastictest
242246
displayName: "impacted-area-kvmtest-t0-sonic by Elastictest"
@@ -262,7 +266,7 @@ stages:
262266
MIN_WORKER: $(INSTANCE_NUMBER)
263267
MAX_WORKER: $(INSTANCE_NUMBER)
264268
KVM_IMAGE_BRANCH: $(BUILD_BRANCH)
265-
COMMON_EXTRA_PARAMS: "--neighbor_type=sonic "
269+
COMMON_EXTRA_PARAMS: "--neighbor_type=sonic --disable_sai_validation "
266270
VM_TYPE: vsonic
267271
MGMT_BRANCH: $(BUILD_BRANCH)
268272
SPECIFIC_PARAM: '[
@@ -294,6 +298,7 @@ stages:
294298
MAX_WORKER: $(INSTANCE_NUMBER)
295299
KVM_IMAGE_BRANCH: $(BUILD_BRANCH)
296300
MGMT_BRANCH: $(BUILD_BRANCH)
301+
COMMON_EXTRA_PARAMS: "--disable_sai_validation "
297302
SPECIFIC_PARAM: '[
298303
{"name": "dash/test_dash_vnet.py", "param": "--skip_dataplane_checking"}
299304
]'
@@ -325,3 +330,4 @@ stages:
325330
NUM_ASIC: 4
326331
KVM_IMAGE_BRANCH: $(BUILD_BRANCH)
327332
MGMT_BRANCH: $(BUILD_BRANCH)
333+
COMMON_EXTRA_PARAMS: "--disable_sai_validation "
Lines changed: 71 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,80 @@
1-
from concurrent.futures import Future, as_completed
2-
from concurrent.futures.thread import ThreadPoolExecutor
3-
from typing import Optional, List
1+
import multiprocessing.pool
2+
from multiprocessing.pool import ThreadPool
3+
from typing import List
44

55

6-
class SafeThreadPoolExecutor(ThreadPoolExecutor):
7-
"""An enhanced thread pool executor
6+
class SafeThreadPoolExecutor:
7+
"""
8+
A thread pool executor that collects all AsyncResult objects and waits for their completion.
9+
10+
Example Usage:
11+
12+
with SafeThreadPoolExecutor(max_workers=len(duthosts)) as executor:
13+
for duthost in duthosts:
14+
executor.submit(example_func, duthost, localhost)
815
9-
Everytime we submit a task, it will store the feature in self.features
10-
On the __exit__ function, it will wait all the tasks to be finished,
11-
And check any exceptions that are raised during the task executing
16+
Behavior Summary:
17+
1. On instantiation, starts `max_workers` threads via ThreadPool.
18+
2. Each thread runs the submitted function (e.g., `example_func(arg1, arg2)`) in parallel.
19+
3. When the `with` block scope ends, execution moves to `__exit__`, where it blocks on each `AsyncResult.get()`
20+
in turn to wait for all tasks to finish.
21+
4. If all threads succeed without raising, the pool is shut down cleanly.
22+
5. If any thread raises an exception, `.get()` re-raises that exception in the main thread.
1223
"""
13-
def __init__(self, *args, **kwargs):
14-
super().__init__(*args, **kwargs)
15-
self.features: Optional[List[Future]] = []
1624

17-
def submit(self, __fn, *args, **kwargs):
18-
f = super().submit(__fn, *args, **kwargs)
19-
self.features.append(f)
20-
return f
25+
def __init__(self, max_workers, *args, **kwargs):
26+
"""
27+
Create a ThreadPool with `max_workers` threads and initialize an empty list to collect results.
28+
29+
Args:
30+
max_workers: number of worker threads (maps to ThreadPool's `processes` parameter).
31+
*args, **kwargs: ignored (only here to match ThreadPoolExecutor signature).
32+
"""
33+
self._pool = ThreadPool(processes=max_workers)
34+
self._results: List["multiprocessing.pool.ApplyResult"] = []
35+
36+
def submit(self, fn, *args, **kwargs):
37+
"""
38+
Schedule fn(*args, **kwargs) to run in a worker thread.
39+
Returns an ApplyResult object whose .get() will return the result or re-raise any exception from the worker.
40+
"""
41+
# Wrap the user‐provided fn in a wrapper to catch any BaseException, and convert that BaseException into
42+
# a regular RuntimeError so ThreadPool's "except Exception" block will catch and enqueue it.
43+
def _wrapper(*fn_args, **fn_kwargs):
44+
try:
45+
return fn(*fn_args, **fn_kwargs)
46+
except BaseException as be:
47+
raise RuntimeError("Thread worker aborted: " + repr(be))
48+
49+
async_res = self._pool.apply_async(_wrapper, args, kwargs)
50+
self._results.append(async_res)
51+
return async_res
52+
53+
def shutdown(self, wait=True):
54+
"""
55+
Stop accepting new tasks and optionally wait for running ones to finish.
56+
"""
57+
# Prevent new tasks
58+
self._pool.close()
59+
if wait:
60+
# Wait for all tasks to finish
61+
self._pool.join()
62+
63+
def __enter__(self):
64+
"""
65+
Support the "with" statement.
66+
"""
67+
return self
2168

2269
def __exit__(self, exc_type, exc_val, exc_tb):
23-
for future in as_completed(self.features):
24-
# if exception caught in the sub-thread, .result() will raise it in the main thread
25-
_ = future.result()
70+
"""
71+
Wait for each submitted task to complete and surface exceptions.
72+
"""
73+
for async_res in self._results:
74+
# .get() will block until the task finishes, and re-raise any exception to the main thread.
75+
async_res.get()
76+
77+
# Shut down the pool by close + join.
2678
self.shutdown(wait=True)
79+
# Returning False to ensure that any exception in the "with" statement is not suppressed.
2780
return False

0 commit comments

Comments
 (0)