From 8dec15a044076b2a5c3f04d7db4db0e08c174c46 Mon Sep 17 00:00:00 2001 From: W Chan Date: Tue, 2 Oct 2018 03:32:52 +0000 Subject: [PATCH] Fix race when parallel inquiries set the execution status When a workflow is paused and there are multiple requests to resume the workflow, there is a race and one of the request fails because the workflow is already in an active state. Instead of erroring, log the situation and there is no need to resume since the workflow is already active. --- st2common/st2common/services/workflows.py | 8 ++++++-- st2tests/integration/orquesta/base.py | 3 ++- st2tests/integration/orquesta/test_wiring_inquiry.py | 2 +- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/st2common/st2common/services/workflows.py b/st2common/st2common/services/workflows.py index b1f4c728fd..c191ae99e9 100644 --- a/st2common/st2common/services/workflows.py +++ b/st2common/st2common/services/workflows.py @@ -304,7 +304,9 @@ def request_resume(ac_ex_db): raise wf_exc.WorkflowExecutionIsCompletedException(str(wf_ex_db.id)) if wf_ex_db.status in states.RUNNING_STATES: - raise wf_exc.WorkflowExecutionIsRunningException(str(wf_ex_db.id)) + msg = '[%s] Workflow execution "%s" is not resumed because it is already active.' + LOG.info(msg, wf_ac_ex_id, str(wf_ex_db.id)) + return conductor = deserialize_conductor(wf_ex_db) @@ -312,7 +314,9 @@ def request_resume(ac_ex_db): raise wf_exc.WorkflowExecutionIsCompletedException(str(wf_ex_db.id)) if conductor.get_workflow_state() in states.RUNNING_STATES: - raise wf_exc.WorkflowExecutionIsRunningException(str(wf_ex_db.id)) + msg = '[%s] Workflow execution "%s" is not resumed because it is already active.' + LOG.info(msg, wf_ac_ex_id, str(wf_ex_db.id)) + return conductor.request_workflow_state(states.RESUMING) diff --git a/st2tests/integration/orquesta/base.py b/st2tests/integration/orquesta/base.py index b6cdeff3da..89edd2f360 100644 --- a/st2tests/integration/orquesta/base.py +++ b/st2tests/integration/orquesta/base.py @@ -101,7 +101,8 @@ def _wait_for_state(self, ex, states): if ex.status in action_constants.LIVEACTION_COMPLETED_STATES: raise Exception( 'Execution is in completed state "%s" and ' - 'does not match expected state(s).' % ex.status + 'does not match expected state(s). %s' % + (ex.status, ex.result) ) else: raise diff --git a/st2tests/integration/orquesta/test_wiring_inquiry.py b/st2tests/integration/orquesta/test_wiring_inquiry.py index 2575357dc7..0b78765c7d 100644 --- a/st2tests/integration/orquesta/test_wiring_inquiry.py +++ b/st2tests/integration/orquesta/test_wiring_inquiry.py @@ -66,7 +66,7 @@ def test_parallel_inquiries(self): t1_ac_exs = self._wait_for_task(ex, 'ask_jack', ac_const.LIVEACTION_STATUS_SUCCEEDED) # Allow some time for the first inquiry to get processed. - eventlet.sleep(3) + eventlet.sleep(1) # Respond to the second inquiry. t2_ac_exs = self._wait_for_task(ex, 'ask_jill', ac_const.LIVEACTION_STATUS_PENDING)