Skip to content

Commit 0d672b0

Browse files
committed
Add check to ensure with items task has action property specified
Normal task is allowed to leave action property empty to perform action less task transition. With items task should not leave the action property empty since it will just consume unnecessary system resources to iterate thru the items list.
1 parent e69e806 commit 0d672b0

File tree

2 files changed

+33
-38
lines changed

2 files changed

+33
-38
lines changed

orquesta/specs/native/v1/models.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,20 @@ def has_cycles(self):
378378

379379
return False
380380

381+
def detect_actionless_with_items(self, parent=None):
382+
result = []
383+
384+
# Identify with items task with no action defined.
385+
for task_name, task_spec in six.iteritems(self):
386+
if task_spec.has_items() and not task_spec.action:
387+
message = 'The action property is required for with items task.'
388+
spec_path = parent.get('spec_path') + '.' + task_name
389+
schema_path = parent.get('schema_path') + '.patternProperties.^\\w+$'
390+
entry = {'message': message, 'spec_path': spec_path, 'schema_path': schema_path}
391+
result.append(entry)
392+
393+
return result
394+
381395
def detect_reserved_names(self, parent=None):
382396
result = []
383397

@@ -515,6 +529,7 @@ def inspect_semantics(self, parent=None):
515529
result = self.detect_reserved_names(parent=parent)
516530
result.extend(self.detect_undefined_tasks(parent=parent))
517531
result.extend(self.detect_unreachable_tasks(parent=parent))
532+
result.extend(self.detect_actionless_with_items(parent=parent))
518533

519534
return result
520535

orquesta/tests/unit/conducting/test_workflow_conductor_with_items.py

Lines changed: 18 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,13 @@ def test_bad_with_items_syntax(self):
103103
"""
104104

105105
expected_errors = {
106+
'semantics': [
107+
{
108+
'message': 'The action property is required for with items task.',
109+
'schema_path': 'properties.tasks.patternProperties.^\\w+$',
110+
'spec_path': 'tasks.task1'
111+
}
112+
],
106113
'syntax': [
107114
{
108115
'message': 'Additional properties are not allowed (\'action\' was unexpected)',
@@ -141,45 +148,18 @@ def test_with_items_that_is_action_less(self):
141148
- items: <% ctx(items) %>
142149
"""
143150

144-
spec = native_specs.WorkflowSpec(wf_def)
145-
self.assertDictEqual(spec.inspect(), {})
146-
147-
conductor = conducting.WorkflowConductor(spec)
148-
conductor.request_workflow_status(statuses.RUNNING)
149-
150-
# Mock the action execution for each item and assert expected task statuses.
151-
task_route = 0
152-
task_name = 'task1'
153-
task_ctx = {'xs': ['fee', 'fi', 'fo', 'fum']}
154-
task_action_specs = [{'action': None, 'input': None, 'item_id': i} for i in range(0, 4)]
155-
156-
mock_ac_ex_statuses = [statuses.SUCCEEDED] * 4
157-
expected_task_statuses = [statuses.RUNNING] * 3 + [statuses.SUCCEEDED]
158-
expected_workflow_statuses = [statuses.RUNNING] * 3 + [statuses.SUCCEEDED]
159-
160-
self.assert_task_items(
161-
conductor,
162-
task_name,
163-
task_route,
164-
task_ctx,
165-
task_ctx['xs'],
166-
task_action_specs,
167-
mock_ac_ex_statuses,
168-
expected_task_statuses,
169-
expected_workflow_statuses,
170-
mock_ac_ex_results=[None] * 4
171-
)
172-
173-
# Assert the task is removed from staging.
174-
self.assertIsNone(conductor.workflow_state.get_staged_task(task_name, task_route))
175-
176-
# Assert the workflow succeeded.
177-
self.assertEqual(conductor.get_workflow_status(), statuses.SUCCEEDED)
151+
expected_errors = {
152+
'semantics': [
153+
{
154+
'message': 'The action property is required for with items task.',
155+
'schema_path': 'properties.tasks.patternProperties.^\\w+$',
156+
'spec_path': 'tasks.task1'
157+
}
158+
]
159+
}
178160

179-
# Assert the workflow output is correct.
180-
conductor.render_workflow_output()
181-
expected_output = {'items': [None] * 4}
182-
self.assertDictEqual(conductor.get_workflow_output(), expected_output)
161+
spec = native_specs.WorkflowSpec(wf_def)
162+
self.assertDictEqual(spec.inspect(), expected_errors)
183163

184164
def test_basic_items_list(self):
185165
wf_def = """

0 commit comments

Comments
 (0)