-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathworkflow_state.py
More file actions
120 lines (97 loc) · 5 KB
/
workflow_state.py
File metadata and controls
120 lines (97 loc) · 5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
from abc import ABC
from typing import Generic, TypeVar, get_args
from iwf.command_request import CommandRequest
from iwf.command_results import CommandResults
from iwf.communication import Communication
from iwf.persistence import Persistence
from iwf.state_decision import StateDecision
from iwf.workflow_context import WorkflowContext
from iwf.workflow_state_options import WorkflowStateOptions
T = TypeVar("T")
not_implemented_error_msg = "This implementation shouldn't be invoked"
class WorkflowState(ABC, Generic[T]):
"""WorkflowState is the interface to define a workflow state."""
def wait_until(
self,
ctx: WorkflowContext,
input: T,
persistence: Persistence,
communication: Communication,
) -> CommandRequest:
"""
WaitUntil is the method to set up commands set up to wait for, before `execute` API is invoked.
It's optional -- execute will be invoked instead if this is not implemented.
Args:
ctx: the context info of this API invocation, like workflow start time, workflowId, etc
input: input: the state input
persistence: the API for
1) data attributes: defined by ObjectWorkflow interface
2) search attributes: defined by ObjectWorkflow interface
3) stateExecutionLocals: for passing data within the state execution
4) recordEvent: for storing some tracking info(e.g. RPC call input/output) when executing the API.
Note that any write API will be recorded to server after the whole waitUntil API response is accepted
communication: the API right now only for publishing value to internalChannel
Note that any write API will be recorded to server after the whole waitUntil API response is accepted.
Returns: the requested command
"""
raise NotImplementedError(not_implemented_error_msg)
def execute(
self,
ctx: WorkflowContext,
input: T,
command_results: CommandResults,
persistence: Persistence,
communication: Communication,
) -> StateDecision:
"""
Execute is the method to execute and decide what to do next. Invoke after commands from WaitUntil are completed, or there is WaitUntil is not implemented for the state.
Args:
ctx: the context info of this API invocation, like workflow start time, workflowId, etc
input: the state input
command_results: the results of the command that executed by WaitUntil
persistence: the API for
1) data attributes: defined by ObjectWorkflow interface
2) search attributes: defined by ObjectWorkflow interface
3) stateExecutionLocals: for passing data within the state execution
4) recordEvent: for storing some tracking info(e.g. RPC call input/output) when executing the API.
Note that any write API will be recorded to server after the whole waitUntil API response is accepted
communication: the API right now only for publishing value to internalChannel.
Note that any write API will be recorded to server after the whole execute API response is accepted.
Returns: the decision of what to do next(e.g. transition to next states or closing workflow)
"""
raise NotImplementedError(not_implemented_error_msg)
def get_state_options(self) -> WorkflowStateOptions:
"""GetStateOptions can just return nil to use the default Options
StateOptions is optional configuration to adjust the state behaviors. Default values:
StateId: name of the implementation class
waitUntilApiFailurePolicy: FAIL_WORKFLOW_ON_FAILURE
PersistenceLoadingPolicy for dataAttributes/searchAttributes: LOAD_ALL_WITHOUT_LOCKING
waitUntil/execute API:
timeout: 30s
retryPolicy:
InitialIntervalSeconds: 1
MaxInternalSeconds:100
MaximumAttempts: 0
BackoffCoefficient: 2
Returns: WorkflowStateOptions
"""
return WorkflowStateOptions()
def get_state_id(state: WorkflowState) -> str:
options = state.get_state_options()
if options is None or options.state_id is None:
return state.__class__.__name__
return options.state_id
def get_state_id_by_class(state: type[WorkflowState]) -> str:
return state.__name__
def should_skip_wait_until(state: WorkflowState) -> bool:
func_name = state.wait_until.__name__
parent_method = getattr(super(type(state), state), func_name)
return parent_method == state.wait_until
def get_input_type(state):
bases = state.__orig_bases__
for b in bases:
if b.__origin__ == WorkflowState:
return get_args(b)[0]
return None
def get_state_execution_id(state: type[WorkflowState], number: int):
return f"{get_state_id_by_class(state)}-{number}"