Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions api/api.swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -3165,6 +3165,17 @@
"items": {
"$ref": "#/definitions/KeyValue"
}
},
"metadata": {
"$ref": "#/definitions/WorkflowExecutionMetadata"
}
}
},
"WorkflowExecutionMetadata": {
"type": "object",
"properties": {
"url": {
"type": "string"
}
}
},
Expand Down
728 changes: 402 additions & 326 deletions api/workflow.pb.go

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions api/workflow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ message LogEntry {
string content = 2;
}

message WorkflowExecutionMetadata {
string url = 1;
}

message WorkflowExecution {
string createdAt = 1;
string uid = 2;
Expand All @@ -198,6 +202,8 @@ message WorkflowExecution {
WorkflowTemplate workflowTemplate = 9;

repeated KeyValue labels = 10;

WorkflowExecutionMetadata metadata = 11;
}

message ArtifactResponse {
Expand Down
24 changes: 24 additions & 0 deletions pkg/client.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package v1

import (
"fmt"
sq "github.com/Masterminds/squirrel"
argoprojv1alpha1 "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/onepanelio/core/pkg/util/gcs"
"github.com/onepanelio/core/pkg/util/router"
"github.com/onepanelio/core/pkg/util/s3"
log "github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -89,3 +91,25 @@ func (c *Client) GetS3Client(namespace string, config *ArtifactRepositoryS3Provi
func (c *Client) GetGCSClient(namespace string, config *ArtifactRepositoryGCSProvider) (gcsClient *gcs.Client, err error) {
return gcs.NewClient(namespace, config.ServiceAccountJSON)
}

// GetWebRouter creates a new web router using the system configuration
func (c *Client) GetWebRouter() (router.Web, error) {
sysConfig, err := c.GetSystemConfig()
if err != nil {
return nil, err
}

fqdn := sysConfig.FQDN()
if fqdn == nil {
return nil, fmt.Errorf("unable to get fqdn")
}

protocol := sysConfig.APIProtocol()
if protocol == nil {
return nil, fmt.Errorf("unable to get protcol")
}

webRouter, err := router.NewWebRouter(*protocol, *fqdn)

return webRouter, err
}
31 changes: 31 additions & 0 deletions pkg/util/router/router.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package router

import (
"fmt"
)

// Web provides methods to generate urls for the web client
// this can be used to generate urls for workspaces or workflows when they are ready.
type Web interface {
WorkflowExecution(namespace, uid string) string
}

// web is a basic implementation of WebRouter
type web struct {
protocol string
fqdn string
}

// WorkflowExecution generates a url to view a specific workflow
func (w *web) WorkflowExecution(namespace, uid string) string {
// <protocol>:<fqdn>/<namespace>/workflows/<uid>
return fmt.Sprintf("%v%v/%v/workflows/%v", w.protocol, w.fqdn, namespace, uid)
}

// NewWebRouter creates a new web router used to generate urls for the web client
func NewWebRouter(protocol, fqdn string) (Web, error) {
return &web{
protocol: protocol,
fqdn: fqdn,
}, nil
}
2 changes: 1 addition & 1 deletion pkg/workflow_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,7 @@ func (c *Client) WatchWorkflowExecution(namespace, uid string) (<-chan *Workflow
CreatedAt: workflow.CreationTimestamp.UTC(),
StartedAt: ptr.Time(workflow.Status.StartedAt.UTC()),
FinishedAt: ptr.Time(workflow.Status.FinishedAt.UTC()),
UID: string(workflow.UID),
UID: workflow.Name,
Name: workflow.Name,
Manifest: string(manifest),
}
Expand Down
1 change: 1 addition & 0 deletions pkg/workflow_execution_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type WorkflowExecution struct {
CreatedAt time.Time `db:"created_at"`
UID string
Name string
Namespace string
GenerateName string
Parameters []Parameter
ParametersBytes []byte `db:"parameters"` // to load from database
Expand Down
2 changes: 1 addition & 1 deletion server/cron_workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func apiCronWorkflow(cwf *v1.CronWorkflow) (cronWorkflow *api.CronWorkflow) {
}

if cwf.WorkflowExecution != nil {
cronWorkflow.WorkflowExecution = GenApiWorkflowExecution(cwf.WorkflowExecution)
cronWorkflow.WorkflowExecution = apiWorkflowExecution(cwf.WorkflowExecution, nil)
for _, param := range cwf.WorkflowExecution.Parameters {
convertedParam := &api.Parameter{
Name: param.Name,
Expand Down
64 changes: 51 additions & 13 deletions server/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/onepanelio/core/pkg/util"
"github.com/onepanelio/core/pkg/util/pagination"
"github.com/onepanelio/core/pkg/util/router"
"github.com/onepanelio/core/server/converter"
"google.golang.org/grpc/codes"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -26,11 +27,9 @@ func NewWorkflowServer() *WorkflowServer {
return &WorkflowServer{}
}

func GenApiWorkflowExecution(wf *v1.WorkflowExecution) (workflow *api.WorkflowExecution) {
return apiWorkflowExecution(wf)
}

func apiWorkflowExecution(wf *v1.WorkflowExecution) (workflow *api.WorkflowExecution) {
// apiWorkflowExecution converts a package workflow execution to the api version
// router is optional
func apiWorkflowExecution(wf *v1.WorkflowExecution, router router.Web) (workflow *api.WorkflowExecution) {
workflow = &api.WorkflowExecution{
CreatedAt: wf.CreatedAt.Format(time.RFC3339),
Uid: wf.UID,
Expand All @@ -46,11 +45,9 @@ func apiWorkflowExecution(wf *v1.WorkflowExecution) (workflow *api.WorkflowExecu
if wf.FinishedAt != nil && !wf.FinishedAt.IsZero() {
workflow.FinishedAt = wf.FinishedAt.Format(time.RFC3339)
}

if wf.WorkflowTemplate != nil {
workflow.WorkflowTemplate = apiWorkflowTemplate(wf.WorkflowTemplate)
}

if wf.ParametersBytes != nil {
parameters, err := wf.LoadParametersFromBytes()
if err != nil {
Expand All @@ -60,6 +57,12 @@ func apiWorkflowExecution(wf *v1.WorkflowExecution) (workflow *api.WorkflowExecu
workflow.Parameters = converter.ParametersToAPI(parameters)
}

if router != nil {
workflow.Metadata = &api.WorkflowExecutionMetadata{
Url: router.WorkflowExecution(wf.Namespace, wf.UID),
}
}

return
}

Expand Down Expand Up @@ -93,8 +96,14 @@ func (s *WorkflowServer) CreateWorkflowExecution(ctx context.Context, req *api.C
if err != nil {
return nil, err
}
wf.Namespace = req.Namespace

return apiWorkflowExecution(wf), nil
webRouter, err := client.GetWebRouter()
if err != nil {
return nil, err
}

return apiWorkflowExecution(wf, webRouter), nil
}

func (s *WorkflowServer) CloneWorkflowExecution(ctx context.Context, req *api.CloneWorkflowExecutionRequest) (*api.WorkflowExecution, error) {
Expand All @@ -108,8 +117,14 @@ func (s *WorkflowServer) CloneWorkflowExecution(ctx context.Context, req *api.Cl
if err != nil {
return nil, err
}
wf.Namespace = req.Namespace

webRouter, err := client.GetWebRouter()
if err != nil {
return nil, err
}

return apiWorkflowExecution(wf), nil
return apiWorkflowExecution(wf, webRouter), nil
}

func (s *WorkflowServer) AddWorkflowExecutionStatistics(ctx context.Context, req *api.AddWorkflowExecutionStatisticRequest) (*empty.Empty, error) {
Expand Down Expand Up @@ -171,8 +186,13 @@ func (s *WorkflowServer) GetWorkflowExecution(ctx context.Context, req *api.GetW
if labels, ok := mappedLabels[wf.ID]; ok {
wf.Labels = labels
}
wf.Namespace = req.Namespace

return apiWorkflowExecution(wf), nil
webRouter, err := client.GetWebRouter()
if err != nil {
return nil, err
}
return apiWorkflowExecution(wf, webRouter), nil
}

func (s *WorkflowServer) WatchWorkflowExecution(req *api.WatchWorkflowExecutionRequest, stream api.WorkflowService_WatchWorkflowExecutionServer) error {
Expand All @@ -187,11 +207,17 @@ func (s *WorkflowServer) WatchWorkflowExecution(req *api.WatchWorkflowExecutionR
return err
}

webRouter, err := client.GetWebRouter()
if err != nil {
return err
}

for wf := range watcher {
if wf == nil {
break
}
if err := stream.Send(apiWorkflowExecution(wf)); err != nil {
wf.Namespace = req.Namespace
if err := stream.Send(apiWorkflowExecution(wf, webRouter)); err != nil {
return err
}
}
Expand Down Expand Up @@ -269,9 +295,15 @@ func (s *WorkflowServer) ListWorkflowExecutions(ctx context.Context, req *api.Li
return nil, err
}

webRouter, err := client.GetWebRouter()
if err != nil {
return nil, err
}

var apiWorkflowExecutions []*api.WorkflowExecution
for _, wf := range workflows {
apiWorkflowExecutions = append(apiWorkflowExecutions, apiWorkflowExecution(wf))
wf.Namespace = req.Namespace
apiWorkflowExecutions = append(apiWorkflowExecutions, apiWorkflowExecution(wf, webRouter))
}

count, err := client.CountWorkflowExecutions(req.Namespace, req.WorkflowTemplateUid, req.WorkflowTemplateVersion)
Expand Down Expand Up @@ -300,7 +332,13 @@ func (s *WorkflowServer) ResubmitWorkflowExecution(ctx context.Context, req *api
return nil, err
}

return apiWorkflowExecution(wf), nil
wf.Namespace = req.Namespace
webRouter, err := client.GetWebRouter()
if err != nil {
return nil, err
}

return apiWorkflowExecution(wf, webRouter), nil
}

func (s *WorkflowServer) TerminateWorkflowExecution(ctx context.Context, req *api.TerminateWorkflowExecutionRequest) (*empty.Empty, error) {
Expand Down