Skip to content

Commit 7079635

Browse files
authored
Merge pull request #4699 from StackStorm/orquesta-manual-fail
Refactor workflow service to handle tasks that run on fail
2 parents 7c275b6 + 7f6697f commit 7079635

File tree

15 files changed

+238
-19
lines changed

15 files changed

+238
-19
lines changed

CHANGELOG.rst

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,17 @@ Changelog
44
in development
55
--------------
66

7+
Changed
8+
~~~~~~~
9+
10+
* Allow the orquesta st2kv function to return default for nonexistent key. (improvement) #4678
11+
* Update requests library to latest version (2.22.0) in requirements. (improvement) #4680
12+
713
Fixed
814
~~~~~
915

1016
* Fix orquesta st2kv to return empty string and null values. (bug fix) #4678
11-
* Allow the orquesta st2kv function to return default for nonexistent key. (improvement) #4678
12-
* Update requests library to latest version (2.22.0) in requirements. (improvement) #4680
17+
* Allow tasks defined in the same task transition with ``fail`` to run for orquesta. (bug fix)
1318

1419
3.0.1 - May 24, 2019
1520
--------------------
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
name: orquesta-fail-manually
3+
description: A workflow that demonstrates how to fail manually.
4+
runner_type: orquesta
5+
entry_point: workflows/orquesta-fail-manually.yaml
6+
enabled: true
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
version: 1.0
2+
3+
description: A workflow that demonstrates how to fail manually.
4+
5+
tasks:
6+
# On task failure, we want to run a task that
7+
# logs the error before failing the workflow.
8+
task1:
9+
action: core.local cmd="exit 1"
10+
next:
11+
- when: <% failed() %>
12+
publish:
13+
- task_name: <% task().task_name %>
14+
- task_exit_code: <% task().result.stdout %>
15+
do:
16+
- log
17+
- fail
18+
19+
log:
20+
action: core.echo
21+
input:
22+
message: "<% ctx().task_name %> failed with exit code: <% ctx().task_exit_code %>"
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
git+https://github.com/StackStorm/orquesta.git@v0.5#egg=orquesta
1+
git+https://github.com/StackStorm/orquesta.git@67fb92fb652073b37fe9678f9a9b9934ba8ac739#egg=orquesta
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
# Don't edit this file. It's generated automatically!
2-
git+https://github.com/StackStorm/orquesta.git@v0.5#egg=orquesta
2+
git+https://github.com/StackStorm/orquesta.git@67fb92fb652073b37fe9678f9a9b9934ba8ac739#egg=orquesta

contrib/runners/orquesta_runner/tests/unit/test_error_handling.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -722,3 +722,115 @@ def test_output_on_error(self):
722722
ac_ex_db = ex_db_access.ActionExecution.get_by_id(str(ac_ex_db.id))
723723
self.assertEqual(ac_ex_db.status, ac_const.LIVEACTION_STATUS_FAILED)
724724
self.assertDictEqual(ac_ex_db.result, expected_result)
725+
726+
def test_fail_manually(self):
727+
wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'fail-manually.yaml')
728+
lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
729+
lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
730+
wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))[0]
731+
732+
# Assert task1 and workflow execution failed due to fail in the task transition.
733+
query_filters = {'workflow_execution': str(wf_ex_db.id), 'task_id': 'task1'}
734+
tk1_ex_db = wf_db_access.TaskExecution.query(**query_filters)[0]
735+
tk1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk1_ex_db.id))[0]
736+
tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk1_ac_ex_db.liveaction['id'])
737+
self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_FAILED)
738+
wf_svc.handle_action_execution_completion(tk1_ac_ex_db)
739+
wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_db.id)
740+
self.assertEqual(wf_ex_db.status, wf_statuses.FAILED)
741+
742+
# Assert log task is scheduled even though the workflow execution failed manually.
743+
query_filters = {'workflow_execution': str(wf_ex_db.id), 'task_id': 'log'}
744+
tk2_ex_db = wf_db_access.TaskExecution.query(**query_filters)[0]
745+
tk2_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk2_ex_db.id))[0]
746+
tk2_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk2_ac_ex_db.liveaction['id'])
747+
self.assertEqual(tk2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
748+
wf_svc.handle_action_execution_completion(tk2_ac_ex_db)
749+
wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_db.id)
750+
self.assertEqual(wf_ex_db.status, wf_statuses.FAILED)
751+
752+
# Check errors and output.
753+
expected_errors = [
754+
{
755+
'task_id': 'fail',
756+
'type': 'error',
757+
'message': 'Execution failed. See result for details.'
758+
},
759+
{
760+
'task_id': 'task1',
761+
'type': 'error',
762+
'message': 'Execution failed. See result for details.',
763+
'result': {
764+
'failed': True,
765+
'return_code': 1,
766+
'stderr': '',
767+
'stdout': '',
768+
'succeeded': False
769+
}
770+
}
771+
]
772+
773+
self.assertListEqual(self.sort_wf_runtime_errors(wf_ex_db.errors), expected_errors)
774+
775+
def test_fail_manually_with_recovery_failure(self):
776+
wf_file = 'fail-manually-with-recovery-failure.yaml'
777+
wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, wf_file)
778+
lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
779+
lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
780+
wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))[0]
781+
782+
# Assert task1 and workflow execution failed due to fail in the task transition.
783+
query_filters = {'workflow_execution': str(wf_ex_db.id), 'task_id': 'task1'}
784+
tk1_ex_db = wf_db_access.TaskExecution.query(**query_filters)[0]
785+
tk1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk1_ex_db.id))[0]
786+
tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk1_ac_ex_db.liveaction['id'])
787+
self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_FAILED)
788+
wf_svc.handle_action_execution_completion(tk1_ac_ex_db)
789+
wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_db.id)
790+
self.assertEqual(wf_ex_db.status, wf_statuses.FAILED)
791+
792+
# Assert recover task is scheduled even though the workflow execution failed manually.
793+
# The recover task in the workflow is setup to fail.
794+
query_filters = {'workflow_execution': str(wf_ex_db.id), 'task_id': 'recover'}
795+
tk2_ex_db = wf_db_access.TaskExecution.query(**query_filters)[0]
796+
tk2_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk2_ex_db.id))[0]
797+
tk2_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk2_ac_ex_db.liveaction['id'])
798+
self.assertEqual(tk2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_FAILED)
799+
wf_svc.handle_action_execution_completion(tk2_ac_ex_db)
800+
wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_db.id)
801+
self.assertEqual(wf_ex_db.status, wf_statuses.FAILED)
802+
803+
# Check errors and output.
804+
expected_errors = [
805+
{
806+
'task_id': 'fail',
807+
'type': 'error',
808+
'message': 'Execution failed. See result for details.'
809+
},
810+
{
811+
'task_id': 'recover',
812+
'type': 'error',
813+
'message': 'Execution failed. See result for details.',
814+
'result': {
815+
'failed': True,
816+
'return_code': 1,
817+
'stderr': '',
818+
'stdout': '',
819+
'succeeded': False
820+
}
821+
},
822+
{
823+
'task_id': 'task1',
824+
'type': 'error',
825+
'message': 'Execution failed. See result for details.',
826+
'result': {
827+
'failed': True,
828+
'return_code': 1,
829+
'stderr': '',
830+
'stdout': '',
831+
'succeeded': False
832+
}
833+
}
834+
]
835+
836+
self.assertListEqual(self.sort_wf_runtime_errors(wf_ex_db.errors), expected_errors)

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ cryptography==2.6.1
88
eventlet==0.24.1
99
flex==6.14.0
1010
git+https://github.com/Kami/logshipper.git@stackstorm_patched#egg=logshipper
11-
git+https://github.com/StackStorm/orquesta.git@v0.5#egg=orquesta
11+
git+https://github.com/StackStorm/orquesta.git@67fb92fb652073b37fe9678f9a9b9934ba8ac739#egg=orquesta
1212
git+https://github.com/StackStorm/python-mistralclient.git#egg=python-mistralclient
1313
git+https://github.com/StackStorm/st2-auth-backend-flat-file.git@master#egg=st2-auth-backend-flat-file
1414
gitpython==2.1.11

st2common/in-requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ jsonschema
99
kombu
1010
mongoengine
1111
networkx
12-
git+https://github.com/StackStorm/orquesta.git@v0.5#egg=orquesta
12+
git+https://github.com/StackStorm/orquesta.git@67fb92fb652073b37fe9678f9a9b9934ba8ac739#egg=orquesta
1313
oslo.config
1414
paramiko
1515
pyyaml

st2common/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ apscheduler==3.6.0
44
cryptography==2.6.1
55
eventlet==0.24.1
66
flex==6.14.0
7-
git+https://github.com/StackStorm/orquesta.git@v0.5#egg=orquesta
7+
git+https://github.com/StackStorm/orquesta.git@67fb92fb652073b37fe9678f9a9b9934ba8ac739#egg=orquesta
88
greenlet==0.4.15
99
ipaddr
1010
jinja2==2.10.1

st2common/st2common/services/workflows.py

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -849,12 +849,6 @@ def request_next_tasks(wf_ex_db, task_ex_id=None):
849849
LOG.info('[%s] No tasks identified to execute next.', wf_ac_ex_id)
850850
update_execution_records(wf_ex_db, conductor)
851851

852-
# If workflow execution is no longer active, then stop processing here.
853-
if wf_ex_db.status in statuses.COMPLETED_STATUSES:
854-
msg = '[%s] Workflow execution is in completed status "%s".'
855-
LOG.info(msg, wf_ac_ex_id, wf_ex_db.status)
856-
return
857-
858852
# Iterate while there are next tasks identified for processing. In the case for
859853
# task with no action execution defined, the task execution will complete
860854
# immediately with a new set of tasks available.
@@ -893,12 +887,6 @@ def request_next_tasks(wf_ex_db, task_ex_id=None):
893887
LOG.debug('[%s] %s', wf_ac_ex_id, conductor.serialize())
894888
update_execution_records(wf_ex_db, conductor)
895889

896-
# If workflow execution is no longer active, then stop processing here.
897-
if wf_ex_db.status in statuses.COMPLETED_STATUSES:
898-
msg = '[%s] Workflow execution is in completed status "%s".'
899-
LOG.info(msg, wf_ac_ex_id, wf_ex_db.status)
900-
break
901-
902890
# Request task execution for the tasks.
903891
for task in next_tasks:
904892
try:

0 commit comments

Comments
 (0)