Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions pkg/workflow_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,21 +202,47 @@ func injectArtifactRepositoryConfig(artifact *wfv1.Artifact, namespaceConfig *Na
}
}

// injectHostPortToContainer adds a hostPort to the template container, if a nodeSelector is present.
// injectHostPortAndResourcesToContainer adds a hostPort to the template container, if a nodeSelector is present.
// Kubernetes will ensure that multiple containers with the same hostPort do not share the same node.
func (c *Client) injectHostPortToContainer(template *wfv1.Template) error {
func (c *Client) injectHostPortAndResourcesToContainer(template *wfv1.Template, opts *WorkflowExecutionOptions, config SystemConfig) error {
if template.NodeSelector == nil {
return nil
}

ports := []corev1.ContainerPort{
{Name: "node-capturer", HostPort: 80, ContainerPort: 80},
}

// Add resource limits for GPUs
nodePoolVal := ""
for _, v := range template.NodeSelector {
nodePoolVal = v
break
}
if strings.Contains(nodePoolVal, "{{workflow.") {
parts := strings.Split(strings.Replace(nodePoolVal, "}}", "", -1), ".")
paramName := parts[len(parts)-1]
for _, parameter := range opts.Parameters {
if parameter.Name == paramName {
nodePoolVal = *parameter.Value
}
}
}
n, err := config.NodePoolOptionByValue(nodePoolVal)
if err != nil {
return nil
}
if template.Container != nil {
template.Container.Ports = ports
if n != nil && n.Resources.Limits != nil {
template.Container.Resources = n.Resources
}
}
if template.Script != nil {
template.Script.Container.Ports = ports
if n != nil && n.Resources.Limits != nil {
template.Container.Resources = n.Resources
}
}
return nil
}
Expand Down Expand Up @@ -297,15 +323,15 @@ func (c *Client) injectAutomatedFields(namespace string, wf *wfv1.Workflow, opts
Name: "sys-dshm",
MountPath: "/dev/shm",
})
err = c.injectHostPortToContainer(template)
err = c.injectHostPortAndResourcesToContainer(template, opts, systemConfig)
if err != nil {
return err
}
injectEnvironmentVariables(template.Container, systemConfig)
}

if template.Script != nil {
err = c.injectHostPortToContainer(template)
err = c.injectHostPortAndResourcesToContainer(template, opts, systemConfig)
if err != nil {
return err
}
Expand Down
30 changes: 23 additions & 7 deletions pkg/workspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (c *Client) createWorkspace(namespace string, parameters []byte, workspace
templates := argoTemplate.Spec.Templates
for i, t := range templates {
if t.Name == WorkspaceStatefulSetResource {
resultManifest, err := c.addRuntimeFieldsToWorkspaceTemplate(t, systemConfig)
resultManifest, err := c.addRuntimeFieldsToWorkspaceTemplate(t, workspace, systemConfig)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -269,7 +269,7 @@ func (c *Client) createWorkspace(namespace string, parameters []byte, workspace

// addRuntimeFieldsToWorkspaceTemplate will take the workspace statefulset resource
// and attempt to figure out the resources it requests, based on the Node selected.
func (c *Client) addRuntimeFieldsToWorkspaceTemplate(t wfv1.Template, config SystemConfig) ([]byte, error) {
func (c *Client) addRuntimeFieldsToWorkspaceTemplate(t wfv1.Template, workspace *Workspace, config SystemConfig) ([]byte, error) {
//due to placeholders, we can't unmarshal into a k8s statefulset
statefulSet := map[string]interface{}{}
if err := yaml.Unmarshal([]byte(t.Resource.Manifest), &statefulSet); err != nil {
Expand All @@ -287,7 +287,7 @@ func (c *Client) addRuntimeFieldsToWorkspaceTemplate(t wfv1.Template, config Sys
if !ok {
return nil, errors.New("unable to type check statefulset manifest")
}
extraContainer := generateExtraContainerWithHostPortToSequesterNode()
extraContainer := generateNodeCaptureContainer(workspace, config)
if extraContainer != nil {
containers, ok := templateSpec["containers"].([]interface{})
if !ok {
Expand Down Expand Up @@ -326,11 +326,11 @@ func (c *Client) addRuntimeFieldsToWorkspaceTemplate(t wfv1.Template, config Sys
return resultManifest, nil
}

// generateExtraContainerWithHostPortToSequesterNode will add an extra container to a workspace.
// generateNodeCaptureContainer will add an extra container to a workspace.
// The extra container have a hostPort set. Kubernetes will ensure the hostPort does not get conflict
// between containers, scheduling a new node as needed.
// The container will sleep once started, and generally consume negligible resources.
func generateExtraContainerWithHostPortToSequesterNode() map[string]interface{} {
func generateNodeCaptureContainer(workspace *Workspace, config SystemConfig) map[string]interface{} {
extraContainer := map[string]interface{}{
"image": "alpine:latest",
"name": "node-capturer",
Expand All @@ -344,6 +344,22 @@ func generateExtraContainerWithHostPortToSequesterNode() map[string]interface{}
},
},
}

// Add resource limits for GPUs
nodePoolVal := ""
for _, parameter := range workspace.Parameters {
if parameter.Name == "sys-node-pool" {
nodePoolVal = *parameter.Value
}
}
n, err := config.NodePoolOptionByValue(nodePoolVal)
if err != nil {
return nil
}
if n != nil && n.Resources.Limits != nil {
extraContainer["resources"] = n.Resources
}

return extraContainer
}

Expand Down Expand Up @@ -398,7 +414,7 @@ func (c *Client) startWorkspace(namespace string, parameters []byte, workspace *
templates := argoTemplate.Spec.Templates
for i, t := range templates {
if t.Name == WorkspaceStatefulSetResource {
resultManifest, err := c.addRuntimeFieldsToWorkspaceTemplate(t, systemConfig)
resultManifest, err := c.addRuntimeFieldsToWorkspaceTemplate(t, workspace, systemConfig)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -761,7 +777,7 @@ func (c *Client) updateWorkspace(namespace, uid, workspaceAction, resourceAction
templates := workspace.WorkspaceTemplate.WorkflowTemplate.ArgoWorkflowTemplate.Spec.Templates
for i, t := range templates {
if t.Name == WorkspaceStatefulSetResource {
resultManifest, err := c.addRuntimeFieldsToWorkspaceTemplate(t, config)
resultManifest, err := c.addRuntimeFieldsToWorkspaceTemplate(t, workspace, config)
if err != nil {
return err
}
Expand Down