Skip to content

Commit 8ef468a

Browse files
committed
fix: add envs to script op template
fix: add workflow actions: terminate, delete, resubmit, resume, retry, stop, suspend Signed-off-by: zjgemi <[email protected]>
1 parent 751fe88 commit 8ef468a

File tree

3 files changed

+88
-5
lines changed

3 files changed

+88
-5
lines changed

src/dflow/op_template.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
V1alpha1ResourceTemplate,
1010
V1alpha1ScriptTemplate,
1111
V1alpha1Template,
12-
V1ConfigMapKeySelector,
12+
V1ConfigMapKeySelector, V1EnvVar,
1313
V1ResourceRequirements, V1Volume,
1414
V1VolumeMount)
1515
from argo.workflows.client.configuration import Configuration
@@ -108,6 +108,7 @@ class ScriptOPTemplate(OPTemplate):
108108
annotations: annotations for the OP template
109109
requests: a dict of resource requests
110110
limits: a dict of resource limits
111+
envs: environment variables
111112
"""
112113

113114
def __init__(
@@ -130,6 +131,7 @@ def __init__(
130131
image_pull_policy: str = None,
131132
requests: Dict[str, str] = None,
132133
limits: Dict[str, str] = None,
134+
envs: Dict[str, str] = None,
133135
**kwargs,
134136
) -> None:
135137
super().__init__(name=name, inputs=inputs, outputs=outputs,
@@ -153,6 +155,7 @@ def __init__(
153155
self.image_pull_policy = image_pull_policy
154156
self.requests = requests
155157
self.limits = limits
158+
self.envs = envs
156159

157160
def convert_to_argo(self, memoize_prefix=None,
158161
memoize_configmap="dflow"):
@@ -170,6 +173,10 @@ def convert_to_argo(self, memoize_prefix=None,
170173
volumes=self.volumes,
171174
resource=self.resource)
172175
else:
176+
if self.envs is not None:
177+
env = [V1EnvVar(name=k, value=v) for k, v in self.envs.items()]
178+
else:
179+
env = None
173180
return \
174181
V1alpha1Template(name=self.name,
175182
metadata=V1alpha1Metadata(
@@ -187,7 +194,8 @@ def convert_to_argo(self, memoize_prefix=None,
187194
volume_mounts=self.mounts,
188195
resources=V1ResourceRequirements(
189196
limits=self.limits,
190-
requests=self.requests)))
197+
requests=self.requests),
198+
env=env))
191199

192200

193201
class ShellOPTemplate(ScriptOPTemplate):
@@ -212,6 +220,7 @@ class ShellOPTemplate(ScriptOPTemplate):
212220
annotations: annotations for the OP template
213221
requests: a dict of resource requests
214222
limits: a dict of resource limits
223+
envs: environment variables
215224
"""
216225

217226
def __init__(
@@ -233,6 +242,7 @@ def __init__(
233242
image_pull_policy: str = None,
234243
requests: Dict[str, str] = None,
235244
limits: Dict[str, str] = None,
245+
envs: Dict[str, str] = None,
236246
**kwargs,
237247
) -> None:
238248
if command is None:
@@ -243,7 +253,7 @@ def __init__(
243253
init_progress=init_progress, timeout=timeout,
244254
retry_strategy=retry_strategy, memoize_key=memoize_key, pvcs=pvcs,
245255
image_pull_policy=image_pull_policy, annotations=annotations,
246-
requests=requests, limits=limits)
256+
requests=requests, limits=limits, envs=envs)
247257

248258

249259
class PythonScriptOPTemplate(ScriptOPTemplate):
@@ -268,6 +278,7 @@ class PythonScriptOPTemplate(ScriptOPTemplate):
268278
annotations: annotations for the OP template
269279
requests: a dict of resource requests
270280
limits: a dict of resource limits
281+
envs: environment variables
271282
"""
272283

273284
def __init__(
@@ -289,6 +300,7 @@ def __init__(
289300
image_pull_policy: str = None,
290301
requests: Dict[str, str] = None,
291302
limits: Dict[str, str] = None,
303+
envs: Dict[str, str] = None,
292304
**kwargs,
293305
) -> None:
294306
if command is None:
@@ -299,4 +311,4 @@ def __init__(
299311
init_progress=init_progress, timeout=timeout,
300312
retry_strategy=retry_strategy, memoize_key=memoize_key, pvcs=pvcs,
301313
image_pull_policy=image_pull_policy, annotations=annotations,
302-
requests=requests, limits=limits)
314+
requests=requests, limits=limits, envs=envs)

src/dflow/python/python_op_template.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ class PythonOPTemplate(PythonScriptOPTemplate):
101101
image_pull_policy: Always, IfNotPresent, Never
102102
requests: a dict of resource requests
103103
limits: a dict of resource limits
104+
envs: environment variables
104105
"""
105106

106107
def __init__(self,
@@ -130,6 +131,7 @@ def __init__(self,
130131
requests: Dict[str, str] = None,
131132
limits: Dict[str, str] = None,
132133
upload_dflow: bool = True,
134+
envs: Dict[str, str] = None,
133135
**kwargs,
134136
) -> None:
135137
op = None
@@ -167,7 +169,7 @@ def __init__(self,
167169
super().__init__(name="%s-%s" % (class_name, "".join(random.sample(
168170
string.digits + string.ascii_lowercase, 5))), inputs=Inputs(),
169171
outputs=Outputs(), volumes=volumes, mounts=mounts,
170-
requests=requests, limits=limits)
172+
requests=requests, limits=limits, envs=envs)
171173
self.slices = slices
172174
if timeout is not None:
173175
self.timeout = "%ss" % timeout

src/dflow/workflow.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,3 +355,72 @@ def query_keys_of_steps(
355355
a list of keys
356356
"""
357357
return [step.key for step in self.query_step() if step.key is not None]
358+
359+
def terminate(self) -> None:
360+
"""
361+
Terminate the workflow
362+
"""
363+
if self.id is None:
364+
raise RuntimeError("Workflow ID is None")
365+
self.api_instance.api_client.call_api(
366+
'/api/v1/workflows/%s/%s/terminate' % (self.namespace, self.id),
367+
'PUT')
368+
369+
def delete(self) -> None:
370+
"""
371+
Delete the workflow
372+
"""
373+
if self.id is None:
374+
raise RuntimeError("Workflow ID is None")
375+
self.api_instance.api_client.call_api(
376+
'/api/v1/workflows/%s/%s' % (self.namespace, self.id), 'DELETE')
377+
378+
def resubmit(self) -> None:
379+
"""
380+
Resubmit the workflow
381+
"""
382+
if self.id is None:
383+
raise RuntimeError("Workflow ID is None")
384+
self.api_instance.api_client.call_api(
385+
'/api/v1/workflows/%s/%s/resubmit' % (self.namespace, self.id),
386+
'PUT')
387+
388+
def resume(self) -> None:
389+
"""
390+
Resume the workflow
391+
"""
392+
if self.id is None:
393+
raise RuntimeError("Workflow ID is None")
394+
self.api_instance.api_client.call_api(
395+
'/api/v1/workflows/%s/%s/resume' % (self.namespace, self.id),
396+
'PUT')
397+
398+
def retry(self) -> None:
399+
"""
400+
Retry the workflow
401+
"""
402+
if self.id is None:
403+
raise RuntimeError("Workflow ID is None")
404+
self.api_instance.api_client.call_api(
405+
'/api/v1/workflows/%s/%s/retry' % (self.namespace, self.id),
406+
'PUT')
407+
408+
def stop(self) -> None:
409+
"""
410+
Stop the workflow
411+
"""
412+
if self.id is None:
413+
raise RuntimeError("Workflow ID is None")
414+
self.api_instance.api_client.call_api(
415+
'/api/v1/workflows/%s/%s/stop' % (self.namespace, self.id),
416+
'PUT')
417+
418+
def suspend(self) -> None:
419+
"""
420+
Suspend the workflow
421+
"""
422+
if self.id is None:
423+
raise RuntimeError("Workflow ID is None")
424+
self.api_instance.api_client.call_api(
425+
'/api/v1/workflows/%s/%s/suspend' % (self.namespace, self.id),
426+
'PUT')

0 commit comments

Comments
 (0)