Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
64a078f
poc: proof of concept for a server that detects configmap changes and…
Vafilor Jun 12, 2020
9c356df
update: workspace url is no longer stored in db but is auto generated…
Vafilor Jun 15, 2020
9f6a1be
fix: removed reference to workspace.url
Vafilor Jun 15, 2020
a983e26
fix: removed db reference to workspace url
Vafilor Jun 15, 2020
d56b728
update: added logic to inject runtime variables in Workspace templates
Vafilor Jun 16, 2020
ba72b14
clean: code cleanup and minor documentation
Vafilor Jun 16, 2020
3074622
Merge branch 'dev' into feat/onepanelio.core-348.system.updates
Vafilor Jun 16, 2020
56777b4
fix: parameter name
Vafilor Jun 16, 2020
504c4bf
workspaceSpec => spec
rushtehrani Jun 17, 2020
c91efc7
fix: added missing migration
Vafilor Jun 17, 2020
b6c67dd
fix: added missing volume
Vafilor Jun 17, 2020
63011e1
fix: issue where watch was started multiple times.
Vafilor Jun 17, 2020
7df9ac6
test: locks instead of channels
Vafilor Jun 18, 2020
fddf2d6
fix: fixed issue where channels were not working for restarting server.
Vafilor Jun 18, 2020
2434864
poc: proof of concept for a server that detects configmap changes and…
Vafilor Jun 12, 2020
72d6892
update: workspace url is no longer stored in db but is auto generated…
Vafilor Jun 15, 2020
0e2eb15
fix: removed reference to workspace.url
Vafilor Jun 15, 2020
8c8d4e2
fix: removed db reference to workspace url
Vafilor Jun 15, 2020
8684e44
update: added logic to inject runtime variables in Workspace templates
Vafilor Jun 16, 2020
6fe23c7
clean: code cleanup and minor documentation
Vafilor Jun 16, 2020
efffe1f
fix: added missing migration
Vafilor Jun 17, 2020
aaf6d31
fix: issue where watch was started multiple times.
Vafilor Jun 17, 2020
9e236aa
test: locks instead of channels
Vafilor Jun 18, 2020
d823a95
fix: fixed issue where channels were not working for restarting server.
Vafilor Jun 18, 2020
e1ade14
Merge branch 'feat/onepanelio.core-348.system.updates' of github.com:…
Vafilor Jun 18, 2020
ce9fd4b
update: config updates and related usage updates.
Vafilor Jun 18, 2020
49c1c6a
fix: issue where getting workspace did not have the runtime parameters
Vafilor Jun 18, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@ k8s.io/apimachinery v0.0.0-20191219145857-f69eda767ee8/go.mod h1:mhhO3hoLkWO+2eC
k8s.io/apimachinery v0.16.4/go.mod h1:llRdnznGEAqC3DcNm6yEj472xaFVfLM7hnYofMb12tQ=
k8s.io/apimachinery v0.16.7-beta.0 h1:1cNiN7ZXJzlWq7dnWojG5UcrX1AIfQqpbyuzhu7Bhsc=
k8s.io/apimachinery v0.16.7-beta.0/go.mod h1:mhhO3hoLkWO+2eCvqjPtH2Ly92l9nJDwsswzWKpkN2w=
k8s.io/apimachinery v0.18.3 h1:pOGcbVAhxADgUYnjS08EFXs9QMl8qaH5U4fr5LGUrSk=
k8s.io/client-go v0.0.0-20191225075139-73fd2ddc9180/go.mod h1:ksVkYlACXo9hR9AV+cYyCkuWL1xnWcGtAFxsfqMcozg=
k8s.io/client-go v0.16.4 h1:sf+FEZXYhJNjpTZapQDLvvN+0kBeUTxCYxlXcVdhv2E=
k8s.io/client-go v0.16.4/go.mod h1:ZgxhFDxSnoKY0J0U2/Y1C8obKDdlhGPZwA7oHH863Ok=
Expand Down
121 changes: 102 additions & 19 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ import (
"flag"
"fmt"
_ "github.com/onepanelio/core/db"
corev1 "k8s.io/api/core/v1"
apiv1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
k8runtime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"net"
"net/http"

Expand Down Expand Up @@ -37,29 +43,44 @@ var (
func main() {
flag.Parse()

kubeConfig := v1.NewConfig()
client, err := v1.NewClient(kubeConfig, nil, nil)
if err != nil {
log.Fatalf("Failed to connect to Kubernetes cluster: %v", err)
}
sysConfig, err := client.GetSystemConfig()
if err != nil {
log.Fatalf("Failed to get system config: %v", err)
}
// stopCh is used to indicate when the RPC server should reload.
// We do this when the configuration has been changed, so the server has the latest configuration
stopCh := make(chan struct{})

databaseDataSourceName := fmt.Sprintf("host=%v user=%v password=%v dbname=%v sslmode=disable",
sysConfig["databaseHost"], sysConfig["databaseUsername"], sysConfig["databasePassword"], sysConfig["databaseName"])
go func() {
for {
kubeConfig := v1.NewConfig()
client, err := v1.NewClient(kubeConfig, nil, nil)
if err != nil {
log.Fatalf("Failed to connect to Kubernetes cluster: %v", err)
}

db := sqlx.MustConnect(sysConfig["databaseDriverName"], databaseDataSourceName)
if err := goose.Run("up", db.DB, "db"); err != nil {
log.Fatalf("Failed to run database migrations: %v", err)
}
client.ClearSystemConfigCache()
sysConfig, err := client.GetSystemConfig()
if err != nil {
log.Fatalf("Failed to get system config: %v", err)
}

databaseDataSourceName := fmt.Sprintf("host=%v user=%v password=%v dbname=%v sslmode=disable",
sysConfig["databaseHost"], sysConfig["databaseUsername"], sysConfig["databasePassword"], sysConfig["databaseName"])

db := sqlx.MustConnect(sysConfig["databaseDriverName"], databaseDataSourceName)
if err := goose.Run("up", db.DB, "db"); err != nil {
log.Fatalf("Failed to run database migrations: %v", err)
}

s := startRPCServer(db, kubeConfig, sysConfig, stopCh)

<-stopCh

s.Stop()
}
}()

go startRPCServer(db, kubeConfig, sysConfig)
startHTTPProxy()
}

func startRPCServer(db *v1.DB, kubeConfig *v1.Config, sysConfig v1.SystemConfig) {
func startRPCServer(db *v1.DB, kubeConfig *v1.Config, sysConfig v1.SystemConfig, stopCh chan struct{}) *grpc.Server {
log.Printf("Starting RPC server on port %v", *rpcPort)
lis, err := net.Listen("tcp", *rpcPort)
if err != nil {
Expand Down Expand Up @@ -103,9 +124,25 @@ func startRPCServer(db *v1.DB, kubeConfig *v1.Config, sysConfig v1.SystemConfig)
api.RegisterWorkspaceTemplateServiceServer(s, server.NewWorkspaceTemplateServer())
api.RegisterWorkspaceServiceServer(s, server.NewWorkspaceServer())

if err := s.Serve(lis); err != nil {
log.Fatalf("Failed to serve RPC server: %v", err)
client, err := v1.NewClient(kubeConfig, db, sysConfig)
if err != nil {
log.Fatalf("Unable to create client: %v", err)
}

go watchConfigmapChanges(client, "onepanel", stopCh, func(configMap *corev1.ConfigMap) error {
log.Printf("Configmap changed")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get rid of this, it's more for debugging.

stopCh <- struct{}{}

return nil
})

go func() {
if err := s.Serve(lis); err != nil {
log.Fatalf("Failed to serve RPC server: %v", err)
}
}()

return s
}

func startHTTPProxy() {
Expand Down Expand Up @@ -159,3 +196,49 @@ func registerHandler(register registerFunc, ctx context.Context, mux *runtime.Se
log.Fatalf("Failed to register handler: %v", err)
}
}

// watchConfigmapChanges sets up a listener for configmap changes and calls the onChange function when it happens
func watchConfigmapChanges(client *v1.Client, namespace string, stopCh <-chan struct{}, onChange func(*corev1.ConfigMap) error) {
restClient := client.CoreV1().RESTClient()
resource := "configmaps"
fieldSelector := fields.ParseSelectorOrDie(fmt.Sprintf("metadata.name=%s", "onepanel"))
listFunc := func(options apiv1.ListOptions) (k8runtime.Object, error) {
options.FieldSelector = fieldSelector.String()
req := restClient.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, apiv1.ParameterCodec)
return req.Do().Get()
}
watchFunc := func(options apiv1.ListOptions) (watch.Interface, error) {
options.Watch = true
options.FieldSelector = fieldSelector.String()
req := restClient.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, apiv1.ParameterCodec)
return req.Watch()
}
source := &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
_, controller := cache.NewInformer(
source,
&corev1.ConfigMap{},
0,
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, new interface{}) {
oldCM := old.(*corev1.ConfigMap)
newCM := new.(*corev1.ConfigMap)
if oldCM.ResourceVersion == newCM.ResourceVersion {
return
}
if newCm, ok := new.(*corev1.ConfigMap); ok {
log.Infof("Detected ConfigMap update.")
if err := onChange(newCm); err != nil {
log.Errorf("Error on calling onChange callback: %v", err)
}
}
},
})
controller.Run(stopCh)
log.Infof("Watching config map updates")
}
6 changes: 6 additions & 0 deletions pkg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ func (c *Client) getConfigMap(namespace, name string) (configMap *ConfigMap, err
return
}

// ClearSystemConfigCache wipes out the cached system configuration so that the next call to
// GetSystemConfig will pull it from the resources
func (c *Client) ClearSystemConfigCache() {
c.systemConfig = nil
}

// GetSystemConfig loads various system configurations and bundles them into a map.
// The configuration is cached once it is loaded, and that cached value is used from here on out.
func (c *Client) GetSystemConfig() (config map[string]string, err error) {
Expand Down
33 changes: 30 additions & 3 deletions pkg/workflow_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"errors"
"fmt"
sq "github.com/Masterminds/squirrel"
"github.com/ghodss/yaml"
"github.com/onepanelio/core/pkg/util/label"
"github.com/onepanelio/core/pkg/util/pagination"
"github.com/onepanelio/core/pkg/util/ptr"
uid2 "github.com/onepanelio/core/pkg/util/uid"
"gopkg.in/yaml.v2"
"io"
"io/ioutil"
"k8s.io/apimachinery/pkg/watch"
Expand Down Expand Up @@ -328,7 +328,9 @@ func (c *Client) ValidateWorkflowExecution(namespace string, manifest []byte) (e
return
}

func (c *Client) CreateWorkflowExecution(namespace string, workflow *WorkflowExecution) (*WorkflowExecution, error) {
// CreateWorkflowExecution creates an argo workflow execution and related resources.
// Note that the workflow template is loaded from the database/k8s, so workflow.WorkflowTemplate.Manifest is not used.
func (c *Client) CreateWorkflowExecution(namespace string, workflow *WorkflowExecution, runtimeVars *RuntimeVars) (*WorkflowExecution, error) {
workflowTemplate, err := c.GetWorkflowTemplate(namespace, workflow.WorkflowTemplate.UID, workflow.WorkflowTemplate.Version)
if err != nil {
log.WithFields(log.Fields{
Expand All @@ -339,6 +341,31 @@ func (c *Client) CreateWorkflowExecution(namespace string, workflow *WorkflowExe
return nil, util.NewUserError(codes.NotFound, "Error with getting workflow template.")
}

if runtimeVars != nil && workflowTemplate.ArgoWorkflowTemplate != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this section should be here. It's unique to Workspaces only. Other calls to CreateWorkflowExecution may never have "stateful-set-resource" or VirtualService. Can we inject these into the template before it's passed in here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rushtehrani I agree, we need to separate the concerns here.

The code above gets the workflow template and that also gets the k8s argo template. At the moment, we can't modify it beforehand because it gets it in this function.

We could change the function so that we pass in the workflow template with the argo contents pre-loaded.
That way it does less work and is more configurable.

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Vafilor parameters are showing and configuration reload is working. Is this the only change that remains?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rushtehrani Yes, I should have it ready by tomorrow morning.

argoTemplate := workflowTemplate.ArgoWorkflowTemplate
for _, param := range runtimeVars.AdditionalParameters {
argoTemplate.Spec.Arguments.Parameters = append(argoTemplate.Spec.Arguments.Parameters, wfv1.Parameter{
Name: param.Name,
Value: param.Value,
})
}

finalTemplates := make([]wfv1.Template, 0)
for i := range argoTemplate.Spec.Templates {
template := &argoTemplate.Spec.Templates[i]

if template.Name == "stateful-set-resource" {
template.Resource.Manifest = runtimeVars.StatefulSetManifest
}

if template.Name != runtimeVars.VirtualService.Name {
finalTemplates = append(finalTemplates, *template)
}
}
finalTemplates = append(finalTemplates, *runtimeVars.VirtualService)
workflowTemplate.ArgoWorkflowTemplate.Spec.Templates = finalTemplates
}

// TODO: Need to pull system parameters from k8s config/secret here, example: HOST
opts := &WorkflowExecutionOptions{
Labels: &map[string]string{},
Expand Down Expand Up @@ -416,7 +443,7 @@ func (c *Client) CloneWorkflowExecution(namespace, uid string) (*WorkflowExecuti
return nil, err
}

return c.CreateWorkflowExecution(namespace, workflowExecution)
return c.CreateWorkflowExecution(namespace, workflowExecution, nil)
}

func (c *Client) insertPreWorkflowExecutionStatistic(namespace, name string, workflowTemplateVersionId uint64, createdAt time.Time, uid string, parameters []Parameter) (newId uint64, err error) {
Expand Down
19 changes: 14 additions & 5 deletions pkg/workspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,21 @@ func injectWorkspaceSystemParameters(namespace string, workspace *Workspace, wor
}

func (c *Client) createWorkspace(namespace string, parameters []byte, workspace *Workspace) (*Workspace, error) {
_, err := c.CreateWorkflowExecution(namespace, &WorkflowExecution{
systemConfig, err := c.GetSystemConfig()
if err != nil {
return nil, err
}

runtimeVars, err := workspace.WorkspaceTemplate.RuntimeVars(systemConfig)
if err != nil {
return nil, err
}

_, err = c.CreateWorkflowExecution(namespace, &WorkflowExecution{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Vafilor in reference to this comment. Ideally this we want to make the override before they're passed into CreateWorkflowExecution here.

Parameters: workspace.Parameters,
WorkflowTemplate: workspace.WorkspaceTemplate.WorkflowTemplate,
})
}, runtimeVars)

if err != nil {
return nil, err
}
Expand All @@ -116,7 +127,6 @@ func (c *Client) createWorkspace(namespace string, parameters []byte, workspace
"started_at": time.Now().UTC(),
"workspace_template_id": workspace.WorkspaceTemplate.ID,
"workspace_template_version": workspace.WorkspaceTemplate.Version,
"url": workspace.URL,
}).
Suffix("RETURNING id, created_at").
RunWith(c.DB).
Expand Down Expand Up @@ -154,7 +164,6 @@ func (c *Client) CreateWorkspace(namespace string, workspace *Workspace) (*Works
if sysHost == nil {
return nil, fmt.Errorf("sys-host parameter not found")
}
workspace.URL = *sysHost

existingWorkspace, err := c.GetWorkspace(namespace, workspace.UID)
if err != nil {
Expand Down Expand Up @@ -394,7 +403,7 @@ func (c *Client) updateWorkspace(namespace, uid, workspaceAction, resourceAction
_, err = c.CreateWorkflowExecution(namespace, &WorkflowExecution{
Parameters: workspace.Parameters,
WorkflowTemplate: workspace.WorkspaceTemplate.WorkflowTemplate,
})
}, nil)
if err != nil {
return
}
Expand Down
Loading