Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ stages:
MAX_WORKER: $(INSTANCE_NUMBER)
KVM_IMAGE_BRANCH: $(BUILD_BRANCH)
MGMT_BRANCH: $(BUILD_BRANCH)
COMMON_EXTRA_PARAMS: "--disable_sai_validation "
Copy link
Copy Markdown
Contributor Author

@cyw233 cyw233 Jul 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temporarily disable SAI validation for now as it will not be compatible with Ubuntu 24.04 due to the usage of concurrent.futures. We will refactor the SAI validation and re-enable it later. Microsoft ADO to track the progress: 33758029


- job: impacted_area_t0_2vlans_elastictest
displayName: "impacted-area-kvmtest-t0-2vlans by Elastictest"
Expand All @@ -156,6 +157,7 @@ stages:
DEPLOY_MG_EXTRA_PARAMS: "-e vlan_config=two_vlan_a"
KVM_IMAGE_BRANCH: $(BUILD_BRANCH)
MGMT_BRANCH: $(BUILD_BRANCH)
COMMON_EXTRA_PARAMS: "--disable_sai_validation "

- job: impacted_area_t1_lag_elastictest
displayName: "impacted-area-kvmtest-t1-lag by Elastictest"
Expand Down Expand Up @@ -183,6 +185,7 @@ stages:
MAX_WORKER: $(INSTANCE_NUMBER)
KVM_IMAGE_BRANCH: $(BUILD_BRANCH)
MGMT_BRANCH: $(BUILD_BRANCH)
COMMON_EXTRA_PARAMS: "--disable_sai_validation "

- job: impacted_area_dualtor_elastictest
displayName: "impacted-area-kvmtest-dualtor by Elastictest"
Expand All @@ -208,7 +211,7 @@ stages:
SCRIPTS: $(SCRIPTS)
MIN_WORKER: $(INSTANCE_NUMBER)
MAX_WORKER: $(INSTANCE_NUMBER)
COMMON_EXTRA_PARAMS: "--disable_loganalyzer "
COMMON_EXTRA_PARAMS: "--disable_loganalyzer --disable_sai_validation "
KVM_IMAGE_BRANCH: $(BUILD_BRANCH)
MGMT_BRANCH: $(BUILD_BRANCH)

Expand Down Expand Up @@ -237,6 +240,7 @@ stages:
NUM_ASIC: 4
KVM_IMAGE_BRANCH: $(BUILD_BRANCH)
MGMT_BRANCH: $(BUILD_BRANCH)
COMMON_EXTRA_PARAMS: "--disable_sai_validation "

- job: impacted_area_t0_sonic_elastictest
displayName: "impacted-area-kvmtest-t0-sonic by Elastictest"
Expand All @@ -262,7 +266,7 @@ stages:
MIN_WORKER: $(INSTANCE_NUMBER)
MAX_WORKER: $(INSTANCE_NUMBER)
KVM_IMAGE_BRANCH: $(BUILD_BRANCH)
COMMON_EXTRA_PARAMS: "--neighbor_type=sonic "
COMMON_EXTRA_PARAMS: "--neighbor_type=sonic --disable_sai_validation "
VM_TYPE: vsonic
MGMT_BRANCH: $(BUILD_BRANCH)
SPECIFIC_PARAM: '[
Expand Down Expand Up @@ -294,6 +298,7 @@ stages:
MAX_WORKER: $(INSTANCE_NUMBER)
KVM_IMAGE_BRANCH: $(BUILD_BRANCH)
MGMT_BRANCH: $(BUILD_BRANCH)
COMMON_EXTRA_PARAMS: "--disable_sai_validation "
SPECIFIC_PARAM: '[
{"name": "dash/test_dash_vnet.py", "param": "--skip_dataplane_checking"}
]'
Expand Down Expand Up @@ -325,3 +330,4 @@ stages:
NUM_ASIC: 4
KVM_IMAGE_BRANCH: $(BUILD_BRANCH)
MGMT_BRANCH: $(BUILD_BRANCH)
COMMON_EXTRA_PARAMS: "--disable_sai_validation "
89 changes: 71 additions & 18 deletions tests/common/helpers/multi_thread_utils.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,80 @@
from concurrent.futures import Future, as_completed
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Optional, List
import multiprocessing.pool
from multiprocessing.pool import ThreadPool
from typing import List


class SafeThreadPoolExecutor(ThreadPoolExecutor):
"""An enhanced thread pool executor
class SafeThreadPoolExecutor:
"""
A thread pool executor that collects all AsyncResult objects and waits for their completion.

Example Usage:

with SafeThreadPoolExecutor(max_workers=len(duthosts)) as executor:
for duthost in duthosts:
executor.submit(example_func, duthost, localhost)

Everytime we submit a task, it will store the feature in self.features
On the __exit__ function, it will wait all the tasks to be finished,
And check any exceptions that are raised during the task executing
Behavior Summary:
1. On instantiation, starts `max_workers` threads via ThreadPool.
2. Each thread runs the submitted function (e.g., `example_func(arg1, arg2)`) in parallel.
3. When the `with` block scope ends, execution moves to `__exit__`, where it blocks on each `AsyncResult.get()`
in turn to wait for all tasks to finish.
4. If all threads succeed without raising, the pool is shut down cleanly.
5. If any thread raises an exception, `.get()` re-raises that exception in the main thread.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.features: Optional[List[Future]] = []

def submit(self, __fn, *args, **kwargs):
f = super().submit(__fn, *args, **kwargs)
self.features.append(f)
return f
def __init__(self, max_workers, *args, **kwargs):
"""
Create a ThreadPool with `max_workers` threads and initialize an empty list to collect results.

Args:
max_workers: number of worker threads (maps to ThreadPool's `processes` parameter).
*args, **kwargs: ignored (only here to match ThreadPoolExecutor signature).
"""
self._pool = ThreadPool(processes=max_workers)
self._results: List["multiprocessing.pool.ApplyResult"] = []

def submit(self, fn, *args, **kwargs):
"""
Schedule fn(*args, **kwargs) to run in a worker thread.
Returns an ApplyResult object whose .get() will return the result or re-raise any exception from the worker.
"""
# Wrap the user‐provided fn in a wrapper to catch any BaseException, and convert that BaseException into
# a regular RuntimeError so ThreadPool's "except Exception" block will catch and enqueue it.
def _wrapper(*fn_args, **fn_kwargs):
try:
return fn(*fn_args, **fn_kwargs)
except BaseException as be:
raise RuntimeError("Thread worker aborted: " + repr(be))

async_res = self._pool.apply_async(_wrapper, args, kwargs)
self._results.append(async_res)
return async_res

def shutdown(self, wait=True):
"""
Stop accepting new tasks and optionally wait for running ones to finish.
"""
# Prevent new tasks
self._pool.close()
if wait:
# Wait for all tasks to finish
self._pool.join()

def __enter__(self):
"""
Support the "with" statement.
"""
return self

def __exit__(self, exc_type, exc_val, exc_tb):
for future in as_completed(self.features):
# if exception caught in the sub-thread, .result() will raise it in the main thread
_ = future.result()
"""
Wait for each submitted task to complete and surface exceptions.
"""
for async_res in self._results:
# .get() will block until the task finishes, and re-raise any exception to the main thread.
async_res.get()

# Shut down the pool by close + join.
self.shutdown(wait=True)
# Returning False to ensure that any exception in the "with" statement is not suppressed.
return False
Loading