diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 1ae903ce48b..5d93f232769 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -130,6 +130,7 @@ stages: MAX_WORKER: $(INSTANCE_NUMBER) KVM_IMAGE_BRANCH: $(BUILD_BRANCH) MGMT_BRANCH: $(BUILD_BRANCH) + COMMON_EXTRA_PARAMS: "--disable_sai_validation " - job: impacted_area_t0_2vlans_elastictest displayName: "impacted-area-kvmtest-t0-2vlans by Elastictest" @@ -156,6 +157,7 @@ stages: DEPLOY_MG_EXTRA_PARAMS: "-e vlan_config=two_vlan_a" KVM_IMAGE_BRANCH: $(BUILD_BRANCH) MGMT_BRANCH: $(BUILD_BRANCH) + COMMON_EXTRA_PARAMS: "--disable_sai_validation " - job: impacted_area_t1_lag_elastictest displayName: "impacted-area-kvmtest-t1-lag by Elastictest" @@ -183,6 +185,7 @@ stages: MAX_WORKER: $(INSTANCE_NUMBER) KVM_IMAGE_BRANCH: $(BUILD_BRANCH) MGMT_BRANCH: $(BUILD_BRANCH) + COMMON_EXTRA_PARAMS: "--disable_sai_validation " - job: impacted_area_dualtor_elastictest displayName: "impacted-area-kvmtest-dualtor by Elastictest" @@ -208,7 +211,7 @@ stages: SCRIPTS: $(SCRIPTS) MIN_WORKER: $(INSTANCE_NUMBER) MAX_WORKER: $(INSTANCE_NUMBER) - COMMON_EXTRA_PARAMS: "--disable_loganalyzer " + COMMON_EXTRA_PARAMS: "--disable_loganalyzer --disable_sai_validation " KVM_IMAGE_BRANCH: $(BUILD_BRANCH) MGMT_BRANCH: $(BUILD_BRANCH) @@ -237,6 +240,7 @@ stages: NUM_ASIC: 4 KVM_IMAGE_BRANCH: $(BUILD_BRANCH) MGMT_BRANCH: $(BUILD_BRANCH) + COMMON_EXTRA_PARAMS: "--disable_sai_validation " - job: impacted_area_t0_sonic_elastictest displayName: "impacted-area-kvmtest-t0-sonic by Elastictest" @@ -262,7 +266,7 @@ stages: MIN_WORKER: $(INSTANCE_NUMBER) MAX_WORKER: $(INSTANCE_NUMBER) KVM_IMAGE_BRANCH: $(BUILD_BRANCH) - COMMON_EXTRA_PARAMS: "--neighbor_type=sonic " + COMMON_EXTRA_PARAMS: "--neighbor_type=sonic --disable_sai_validation " VM_TYPE: vsonic MGMT_BRANCH: $(BUILD_BRANCH) SPECIFIC_PARAM: '[ @@ -294,6 +298,7 @@ stages: MAX_WORKER: $(INSTANCE_NUMBER) KVM_IMAGE_BRANCH: $(BUILD_BRANCH) MGMT_BRANCH: $(BUILD_BRANCH) + COMMON_EXTRA_PARAMS: "--disable_sai_validation " SPECIFIC_PARAM: '[ {"name": "dash/test_dash_vnet.py", "param": "--skip_dataplane_checking"} ]' @@ -325,3 +330,4 @@ stages: NUM_ASIC: 4 KVM_IMAGE_BRANCH: $(BUILD_BRANCH) MGMT_BRANCH: $(BUILD_BRANCH) + COMMON_EXTRA_PARAMS: "--disable_sai_validation " diff --git a/tests/common/helpers/multi_thread_utils.py b/tests/common/helpers/multi_thread_utils.py index 09c7ca7cab6..b56e8423851 100644 --- a/tests/common/helpers/multi_thread_utils.py +++ b/tests/common/helpers/multi_thread_utils.py @@ -1,27 +1,80 @@ -from concurrent.futures import Future, as_completed -from concurrent.futures.thread import ThreadPoolExecutor -from typing import Optional, List +import multiprocessing.pool +from multiprocessing.pool import ThreadPool +from typing import List -class SafeThreadPoolExecutor(ThreadPoolExecutor): - """An enhanced thread pool executor +class SafeThreadPoolExecutor: + """ + A thread pool executor that collects all AsyncResult objects and waits for their completion. + + Example Usage: + + with SafeThreadPoolExecutor(max_workers=len(duthosts)) as executor: + for duthost in duthosts: + executor.submit(example_func, duthost, localhost) - Everytime we submit a task, it will store the feature in self.features - On the __exit__ function, it will wait all the tasks to be finished, - And check any exceptions that are raised during the task executing + Behavior Summary: + 1. On instantiation, starts `max_workers` threads via ThreadPool. + 2. Each thread runs the submitted function (e.g., `example_func(arg1, arg2)`) in parallel. + 3. When the `with` block scope ends, execution moves to `__exit__`, where it blocks on each `AsyncResult.get()` + in turn to wait for all tasks to finish. + 4. If all threads succeed without raising, the pool is shut down cleanly. + 5. If any thread raises an exception, `.get()` re-raises that exception in the main thread. """ - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.features: Optional[List[Future]] = [] - def submit(self, __fn, *args, **kwargs): - f = super().submit(__fn, *args, **kwargs) - self.features.append(f) - return f + def __init__(self, max_workers, *args, **kwargs): + """ + Create a ThreadPool with `max_workers` threads and initialize an empty list to collect results. + + Args: + max_workers: number of worker threads (maps to ThreadPool's `processes` parameter). + *args, **kwargs: ignored (only here to match ThreadPoolExecutor signature). + """ + self._pool = ThreadPool(processes=max_workers) + self._results: List["multiprocessing.pool.ApplyResult"] = [] + + def submit(self, fn, *args, **kwargs): + """ + Schedule fn(*args, **kwargs) to run in a worker thread. + Returns an ApplyResult object whose .get() will return the result or re-raise any exception from the worker. + """ + # Wrap the user‐provided fn in a wrapper to catch any BaseException, and convert that BaseException into + # a regular RuntimeError so ThreadPool's "except Exception" block will catch and enqueue it. + def _wrapper(*fn_args, **fn_kwargs): + try: + return fn(*fn_args, **fn_kwargs) + except BaseException as be: + raise RuntimeError("Thread worker aborted: " + repr(be)) + + async_res = self._pool.apply_async(_wrapper, args, kwargs) + self._results.append(async_res) + return async_res + + def shutdown(self, wait=True): + """ + Stop accepting new tasks and optionally wait for running ones to finish. + """ + # Prevent new tasks + self._pool.close() + if wait: + # Wait for all tasks to finish + self._pool.join() + + def __enter__(self): + """ + Support the "with" statement. + """ + return self def __exit__(self, exc_type, exc_val, exc_tb): - for future in as_completed(self.features): - # if exception caught in the sub-thread, .result() will raise it in the main thread - _ = future.result() + """ + Wait for each submitted task to complete and surface exceptions. + """ + for async_res in self._results: + # .get() will block until the task finishes, and re-raise any exception to the main thread. + async_res.get() + + # Shut down the pool by close + join. self.shutdown(wait=True) + # Returning False to ensure that any exception in the "with" statement is not suppressed. return False