Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
cb7bb79
Adding code to support sidecars and accessing them.
aleksandrmelnikov Oct 30, 2020
2d45b0a
Using the right yaml library to marshal.
aleksandrmelnikov Oct 30, 2020
d2dfd4c
Updating the host placeholder.
aleksandrmelnikov Oct 30, 2020
3e1a4dd
Refactoring code since it's empty initialization.
aleksandrmelnikov Oct 30, 2020
af361d5
Renaming the side-car check per feedback.
aleksandrmelnikov Oct 30, 2020
e5f2f3e
Adding k8s-service-resource and k8s-routes-resource as tasks
aleksandrmelnikov Nov 4, 2020
b46e7fd
Excluding istio side-car from resource creation.
aleksandrmelnikov Nov 5, 2020
2fef7d7
Fixing typo.
aleksandrmelnikov Nov 5, 2020
6d42a44
Removing condition tests.
aleksandrmelnikov Nov 5, 2020
f5fb44c
Port and TargetPort are assumed to be the same.
aleksandrmelnikov Nov 5, 2020
530e241
Simplifying prefix to "/"
aleksandrmelnikov Nov 5, 2020
d903bae
Adding uuid generating library
aleksandrmelnikov Nov 5, 2020
7b7142d
Generating names using uuid.
aleksandrmelnikov Nov 5, 2020
7698b70
Removing unused code.
aleksandrmelnikov Nov 5, 2020
e1c3dc3
Enabling Istio for workflows with sidecars.
aleksandrmelnikov Nov 6, 2020
0f226eb
Adding google uuid library to go.mod.
aleksandrmelnikov Nov 6, 2020
d8101bf
Checking for istio injection by checking if the sidecar has "tty"
aleksandrmelnikov Nov 7, 2020
4f28882
Adding go.sum changes.
aleksandrmelnikov Nov 7, 2020
af47ffb
Updating logic that checks if a sidecar needs routing and service.
aleksandrmelnikov Nov 7, 2020
70007d6
Codacy: Fixing variable names.
aleksandrmelnikov Nov 7, 2020
ccb9912
Adding code to clean-up the service and virtualservice after the
aleksandrmelnikov Nov 9, 2020
a4c40df
Removing jwt-go library since it's no longer used.
aleksandrmelnikov Nov 9, 2020
8057098
Fixing envoyfilter by adding "--" to the sub-domain url that gets
aleksandrmelnikov Nov 9, 2020
5449911
Changing the label that maps between the service and pod.
aleksandrmelnikov Nov 9, 2020
d4846b5
Putting the services and virtualservices for sidecars logic, into
aleksandrmelnikov Nov 10, 2020
04a1db0
Adding "sys-" prefix to onepanel injected services and virtualservices.
aleksandrmelnikov Nov 10, 2020
5bd7f22
Removing the workflow.uid, because it causes a doubly long uid string.
aleksandrmelnikov Nov 10, 2020
c699b5b
Changed the order of tasks per feedback.
aleksandrmelnikov Nov 10, 2020
5e4d5c0
Changed the order of tasks for the exit handler.
aleksandrmelnikov Nov 10, 2020
74752ef
Adding output parameters to the template with the sidecars.
aleksandrmelnikov Nov 10, 2020
806d15f
Adjusting the exit-handler task order.
aleksandrmelnikov Nov 10, 2020
a67012f
Tweaking the dependencies of the sys-exit task.
aleksandrmelnikov Nov 10, 2020
a5ec80e
If tty is set to true, mirror the main container volume mounts to
aleksandrmelnikov Nov 10, 2020
3f70f26
use stop instead of terminate
rushtehrani Nov 11, 2020
f7e48a0
don't update phase for already terminated workflow
rushtehrani Nov 11, 2020
4fec204
use correct where statement
rushtehrani Nov 12, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/ghodss/yaml v1.0.0
github.com/go-sql-driver/mysql v1.5.0 // indirect
github.com/golang/protobuf v1.4.1
github.com/google/uuid v1.1.2
github.com/gorilla/handlers v1.4.2
github.com/grpc-ecosystem/go-grpc-middleware v1.2.0
github.com/grpc-ecosystem/grpc-gateway v1.14.4
Expand Down Expand Up @@ -43,5 +44,4 @@ require (
k8s.io/apimachinery v0.16.7-beta.0
k8s.io/client-go v0.16.4
sigs.k8s.io/yaml v1.2.0
github.com/dgrijalva/jwt-go v3.2.0+incompatible
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm4
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
Expand Down
281 changes: 278 additions & 3 deletions pkg/workflow_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
sq "github.com/Masterminds/squirrel"
"github.com/google/uuid"
"github.com/onepanelio/core/pkg/util/gcs"
"github.com/onepanelio/core/pkg/util/label"
"github.com/onepanelio/core/pkg/util/ptr"
Expand All @@ -18,8 +19,11 @@ import (
"gopkg.in/yaml.v2"
"io"
"io/ioutil"
networking "istio.io/api/networking/v1alpha3"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/watch"
"net/http"
yaml2 "sigs.k8s.io/yaml"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -275,6 +279,15 @@ func (c *Client) injectAutomatedFields(namespace string, wf *wfv1.Workflow, opts
template.Metadata.Annotations = make(map[string]string)
}
template.Metadata.Annotations["sidecar.istio.io/inject"] = "false"
//For workflows with accessible sidecars, we need istio
//Istio does not prevent the main container from stopping
for _, s := range template.Sidecars {
if s.TTY == true {
template.Metadata.Annotations["sidecar.istio.io/inject"] = "true"
//Only need one instance to require istio injection
break
}
}

if template.Container != nil {
// Mount dev/shm
Expand Down Expand Up @@ -408,6 +421,11 @@ func (c *Client) createWorkflow(namespace string, workflowTemplateID uint64, wor
return nil, err
}

newTemplateOrder, err := c.injectAccessForSidecars(namespace, wf)
if err != nil {
return nil, err
}
wf.Spec.Templates = newTemplateOrder
createdArgoWorkflow, err := c.ArgoprojV1alpha1().Workflows(namespace).Create(wf)
if err != nil {
return nil, err
Expand Down Expand Up @@ -437,6 +455,260 @@ func (c *Client) createWorkflow(namespace string, workflowTemplateID uint64, wor
return
}

func (c *Client) injectAccessForSidecars(namespace string, wf *wfv1.Workflow) ([]wfv1.Template, error) {
var newTemplateOrder []wfv1.Template
taskSysSendStatusName := "sys-send-status"
taskSysSendExitStats := "sys-send-exit-stats"
for tIdx, t := range wf.Spec.Templates {
//Inject services, virtual routes
for si, s := range t.Sidecars {
//If TTY is true, sidecar needs to be accessible by HTTP
//Otherwise, we skip the sidecar
if s.TTY != true {
continue
}
if len(s.Ports) == 0 {
msg := fmt.Sprintf("sidecar %s must have at least one port.", s.Name)
return nil, util.NewUserError(codes.InvalidArgument, msg)
}

t.Sidecars[si].MirrorVolumeMounts = ptr.Bool(true)
serviceNameUID := "s" + uuid.New().String() + "--" + namespace
serviceNameUIDDNSCompliant, err := uid2.GenerateUID(serviceNameUID, 63)
if err != nil {
return nil, util.NewUserError(codes.InvalidArgument, err.Error())
}

serviceName := serviceNameUIDDNSCompliant + "." + *c.systemConfig.Domain()

serviceTemplateName := uuid.New().String()
serviceTemplateNameAdd := "sys-k8s-service-template-add-" + serviceTemplateName
serviceTemplateNameDelete := "sys-k8s-service-template-delete-" + serviceTemplateName
serviceTaskName := "service-" + uuid.New().String()
serviceAddTaskName := "sys-add-" + serviceTaskName
serviceDeleteTaskName := "sys-delete-" + serviceTaskName
virtualServiceTemplateName := uuid.New().String()
virtualServiceTemplateNameAdd := "sys-k8s-virtual-service-template-add-" + virtualServiceTemplateName
virtualServiceTemplateNameDelete := "sys-k8s-virtual-service-template-delete-" + virtualServiceTemplateName
virtualServiceTaskName := "virtual-service-" + uuid.New().String()
virtualServiceAddTaskName := "sys-add-" + virtualServiceTaskName
virtualServiceDeleteTaskName := "sys-delete-" + virtualServiceTaskName
var servicePorts []corev1.ServicePort
var routes []*networking.HTTPRoute
for _, port := range s.Ports {
servicePort := corev1.ServicePort{
Name: port.Name,
Protocol: port.Protocol,
Port: port.ContainerPort,
TargetPort: intstr.FromInt(int(port.ContainerPort)),
}
servicePorts = append(servicePorts, servicePort)
route := networking.HTTPRoute{
Match: []*networking.HTTPMatchRequest{
{
Uri: &networking.StringMatch{
MatchType: &networking.StringMatch_Prefix{
Prefix: "/"},
},
},
},
Route: []*networking.HTTPRouteDestination{
{
Destination: &networking.Destination{
Host: serviceNameUIDDNSCompliant,
Port: &networking.PortSelector{
Number: uint32(port.ContainerPort),
},
},
},
},
}
routes = append(routes, &route)
}
service := corev1.Service{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Service",
},
ObjectMeta: metav1.ObjectMeta{
Name: serviceNameUIDDNSCompliant,
},
Spec: corev1.ServiceSpec{
Ports: servicePorts,
Selector: map[string]string{
serviceTaskName: serviceNameUIDDNSCompliant,
},
},
}
//Istio needs to know which pod to setup the route to
if wf.Spec.Templates[tIdx].Metadata.Labels == nil {
wf.Spec.Templates[tIdx].Metadata.Labels = make(map[string]string)
}
wf.Spec.Templates[tIdx].Metadata.Labels[serviceTaskName] = serviceNameUIDDNSCompliant
serviceManifestBytes, err := yaml2.Marshal(service)
if err != nil {
return nil, err
}
serviceManifest := string(serviceManifestBytes)
templateServiceResource := wfv1.Template{
Name: serviceTemplateNameAdd,
Metadata: wfv1.Metadata{
Annotations: map[string]string{
"sidecar.istio.io/inject": "false",
},
},
Resource: &wfv1.ResourceTemplate{
Action: "create",
Manifest: serviceManifest,
},
}
newTemplateOrder = append(newTemplateOrder, templateServiceResource)
//routes
virtualServiceNameUUID := "vs-" + uuid.New().String()
hosts := []string{serviceName}
wf.Spec.Templates[tIdx].Outputs.Parameters = append(wf.Spec.Templates[tIdx].Outputs.Parameters,
wfv1.Parameter{Name: "sys-sidecar-url--" + s.Name, Value: &serviceName},
)
virtualService := map[string]interface{}{
"apiVersion": "networking.istio.io/v1alpha3",
"kind": "VirtualService",
"metadata": metav1.ObjectMeta{
Name: virtualServiceNameUUID,
},
"spec": networking.VirtualService{
Http: routes,
Gateways: []string{"istio-system/ingressgateway"},
Hosts: hosts,
},
}

virtualServiceManifestBytes, err := yaml2.Marshal(virtualService)
if err != nil {
return nil, err
}
virtualServiceManifest := string(virtualServiceManifestBytes)

templateRouteResource := wfv1.Template{
Name: virtualServiceTemplateNameAdd,
Metadata: wfv1.Metadata{
Annotations: map[string]string{
"sidecar.istio.io/inject": "false",
},
},
Resource: &wfv1.ResourceTemplate{
Action: "create",
Manifest: virtualServiceManifest,
},
}
newTemplateOrder = append(newTemplateOrder, templateRouteResource)

for i2, t2 := range wf.Spec.Templates {
if t2.Name == wf.Spec.Entrypoint {
if t2.DAG != nil {
tasks := wf.Spec.Templates[i2].DAG.Tasks
t := tasks[0]
sysDepFound := false
for _, d := range t.Dependencies {
if d == taskSysSendStatusName {
sysDepFound = true
wf.Spec.Templates[i2].DAG.Tasks[0].Dependencies =
[]string{virtualServiceAddTaskName}
}
}
if sysDepFound == false {
wf.Spec.Templates[i2].DAG.Tasks[0].Dependencies = append(wf.Spec.Templates[i2].DAG.Tasks[0].Dependencies, virtualServiceAddTaskName)
}

wf.Spec.Templates[i2].DAG.Tasks = append(tasks, []wfv1.DAGTask{
{
Name: serviceAddTaskName,
Template: serviceTemplateNameAdd,
Dependencies: []string{taskSysSendStatusName},
},
{
Name: virtualServiceAddTaskName,
Template: virtualServiceTemplateNameAdd,
Dependencies: []string{serviceAddTaskName},
},
}...)
}
}
}
//Inject clean-up for service and virtualservice
templateServiceDeleteResource := wfv1.Template{
Name: serviceTemplateNameDelete,
Metadata: wfv1.Metadata{
Annotations: map[string]string{
"sidecar.istio.io/inject": "false",
},
},
Resource: &wfv1.ResourceTemplate{
Action: "delete",
Manifest: serviceManifest,
},
}
newTemplateOrder = append(newTemplateOrder, templateServiceDeleteResource)

templateRouteDeleteResource := wfv1.Template{
Name: virtualServiceTemplateNameDelete,
Metadata: wfv1.Metadata{
Annotations: map[string]string{
"sidecar.istio.io/inject": "false",
},
},
Resource: &wfv1.ResourceTemplate{
Action: "delete",
Manifest: virtualServiceManifest,
},
}

newTemplateOrder = append(newTemplateOrder, templateRouteDeleteResource)

dagTasks := []wfv1.DAGTask{
{
Name: serviceDeleteTaskName,
Template: serviceTemplateNameDelete,
},
{
Name: virtualServiceDeleteTaskName,
Template: virtualServiceTemplateNameDelete,
Dependencies: []string{serviceDeleteTaskName},
},
}
if wf.Spec.OnExit != "" {
for _, t := range wf.Spec.Templates {
if t.Name == wf.Spec.OnExit {
t.DAG.Tasks = append(t.DAG.Tasks, dagTasks...)
sysExitDepFound := false
for dti, dt := range t.DAG.Tasks {
if dt.Name == taskSysSendExitStats {
sysExitDepFound = true
t.DAG.Tasks[dti].Dependencies = append(t.DAG.Tasks[dti].Dependencies, virtualServiceDeleteTaskName)
}
}
if sysExitDepFound == false {
t.DAG.Tasks[0].Dependencies = append(t.DAG.Tasks[0].Dependencies, virtualServiceDeleteTaskName)
}
break
}
}
} else {
exitHandlerDAG := wfv1.Template{
Name: "exit-handler",
DAG: &wfv1.DAGTemplate{
Tasks: dagTasks,
},
}
wf.Spec.OnExit = "exit-handler"
wf.Spec.Templates = append(wf.Spec.Templates, exitHandlerDAG)
}
}
newTemplateOrder = append(newTemplateOrder, wf.Spec.Templates[tIdx])

}
return newTemplateOrder, nil
}

func ensureWorkflowRunsOnDedicatedNode(wf *wfv1.Workflow, config SystemConfig) (*wfv1.Workflow, error) {
antiAffinityLabelKey := "onepanel.io/reserves-instance-type"
nodeSelectorVal := ""
Expand Down Expand Up @@ -633,7 +905,7 @@ func (c *Client) createWorkflowExecutionDB(namespace string, workflowExecution *
return
}

func (c *Client) FinishWorkflowExecutionStatisticViaExitHandler(namespace, name string, workflowTemplateID int64, phase wfv1.NodePhase, startedAt time.Time) (err error) {
func (c *Client) FinishWorkflowExecutionStatisticViaExitHandler(namespace, name string, phase wfv1.NodePhase, startedAt time.Time) (err error) {
_, err = sb.Update("workflow_executions").
SetMap(sq.Eq{
"started_at": startedAt.UTC(),
Expand All @@ -642,7 +914,10 @@ func (c *Client) FinishWorkflowExecutionStatisticViaExitHandler(namespace, name
"finished_at": time.Now().UTC(),
"phase": phase,
}).
Where(sq.Eq{"name": name}).
Where(sq.And{
sq.Eq{"name": name},
sq.NotEq{"phase": "Terminated"},
}).
RunWith(c.DB).
Exec()

Expand Down Expand Up @@ -1252,7 +1527,7 @@ func (c *Client) TerminateWorkflowExecution(namespace, uid string) (err error) {
return err
}

err = argoutil.TerminateWorkflow(c.ArgoprojV1alpha1().Workflows(namespace), uid)
err = argoutil.StopWorkflow(c.ArgoprojV1alpha1().Workflows(namespace), uid, "", "")

return
}
Expand Down
3 changes: 1 addition & 2 deletions server/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,7 @@ func (s *WorkflowServer) AddWorkflowExecutionStatistics(ctx context.Context, req
return &empty.Empty{}, err
}

err = client.FinishWorkflowExecutionStatisticViaExitHandler(req.Namespace, req.Uid,
req.Statistics.WorkflowTemplateId, phase, workflow.Status.StartedAt.UTC())
err = client.FinishWorkflowExecutionStatisticViaExitHandler(req.Namespace, req.Uid, phase, workflow.Status.StartedAt.UTC())

if err != nil {
return &empty.Empty{}, err
Expand Down