Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
64a078f
poc: proof of concept for a server that detects configmap changes and…
Vafilor Jun 12, 2020
9c356df
update: workspace url is no longer stored in db but is auto generated…
Vafilor Jun 15, 2020
9f6a1be
fix: removed reference to workspace.url
Vafilor Jun 15, 2020
a983e26
fix: removed db reference to workspace url
Vafilor Jun 15, 2020
d56b728
update: added logic to inject runtime variables in Workspace templates
Vafilor Jun 16, 2020
ba72b14
clean: code cleanup and minor documentation
Vafilor Jun 16, 2020
3074622
Merge branch 'dev' into feat/onepanelio.core-348.system.updates
Vafilor Jun 16, 2020
56777b4
fix: parameter name
Vafilor Jun 16, 2020
504c4bf
workspaceSpec => spec
rushtehrani Jun 17, 2020
c91efc7
fix: added missing migration
Vafilor Jun 17, 2020
b6c67dd
fix: added missing volume
Vafilor Jun 17, 2020
63011e1
fix: issue where watch was started multiple times.
Vafilor Jun 17, 2020
7df9ac6
test: locks instead of channels
Vafilor Jun 18, 2020
fddf2d6
fix: fixed issue where channels were not working for restarting server.
Vafilor Jun 18, 2020
2434864
poc: proof of concept for a server that detects configmap changes and…
Vafilor Jun 12, 2020
72d6892
update: workspace url is no longer stored in db but is auto generated…
Vafilor Jun 15, 2020
0e2eb15
fix: removed reference to workspace.url
Vafilor Jun 15, 2020
8c8d4e2
fix: removed db reference to workspace url
Vafilor Jun 15, 2020
8684e44
update: added logic to inject runtime variables in Workspace templates
Vafilor Jun 16, 2020
6fe23c7
clean: code cleanup and minor documentation
Vafilor Jun 16, 2020
efffe1f
fix: added missing migration
Vafilor Jun 17, 2020
aaf6d31
fix: issue where watch was started multiple times.
Vafilor Jun 17, 2020
9e236aa
test: locks instead of channels
Vafilor Jun 18, 2020
d823a95
fix: fixed issue where channels were not working for restarting server.
Vafilor Jun 18, 2020
e1ade14
Merge branch 'feat/onepanelio.core-348.system.updates' of github.com:…
Vafilor Jun 18, 2020
ce9fd4b
update: config updates and related usage updates.
Vafilor Jun 18, 2020
49c1c6a
fix: issue where getting workspace did not have the runtime parameters
Vafilor Jun 18, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions db/20200617122509_remove_url_from_workspaces.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- +goose Up
ALTER TABLE workspaces DROP COLUMN url;

-- +goose Down
ALTER TABLE workspaces ADD COLUMN url TEXT;
UPDATE workspaces set url = '';
ALTER TABLE workspaces ALTER COLUMN url SET NOT NULL;
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@ k8s.io/apimachinery v0.0.0-20191219145857-f69eda767ee8/go.mod h1:mhhO3hoLkWO+2eC
k8s.io/apimachinery v0.16.4/go.mod h1:llRdnznGEAqC3DcNm6yEj472xaFVfLM7hnYofMb12tQ=
k8s.io/apimachinery v0.16.7-beta.0 h1:1cNiN7ZXJzlWq7dnWojG5UcrX1AIfQqpbyuzhu7Bhsc=
k8s.io/apimachinery v0.16.7-beta.0/go.mod h1:mhhO3hoLkWO+2eCvqjPtH2Ly92l9nJDwsswzWKpkN2w=
k8s.io/apimachinery v0.18.3 h1:pOGcbVAhxADgUYnjS08EFXs9QMl8qaH5U4fr5LGUrSk=
k8s.io/client-go v0.0.0-20191225075139-73fd2ddc9180/go.mod h1:ksVkYlACXo9hR9AV+cYyCkuWL1xnWcGtAFxsfqMcozg=
k8s.io/client-go v0.16.4 h1:sf+FEZXYhJNjpTZapQDLvvN+0kBeUTxCYxlXcVdhv2E=
k8s.io/client-go v0.16.4/go.mod h1:ZgxhFDxSnoKY0J0U2/Y1C8obKDdlhGPZwA7oHH863Ok=
Expand Down
131 changes: 108 additions & 23 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ import (
"flag"
"fmt"
_ "github.com/onepanelio/core/db"
"net"
"net/http"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
corev1 "k8s.io/api/core/v1"
apiv1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
k8runtime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"net"
"net/http"

"github.com/gorilla/handlers"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
Expand Down Expand Up @@ -37,29 +42,55 @@ var (
func main() {
flag.Parse()

kubeConfig := v1.NewConfig()
client, err := v1.NewClient(kubeConfig, nil, nil)
if err != nil {
log.Fatalf("Failed to connect to Kubernetes cluster: %v", err)
}
sysConfig, err := client.GetSystemConfig()
if err != nil {
log.Fatalf("Failed to get system config: %v", err)
}
// stopCh is used to indicate when the RPC server should reload.
// We do this when the configuration has been changed, so the server has the latest configuration
stopCh := make(chan struct{})

databaseDataSourceName := fmt.Sprintf("host=%v user=%v password=%v dbname=%v sslmode=disable",
sysConfig["databaseHost"], sysConfig["databaseUsername"], sysConfig["databasePassword"], sysConfig["databaseName"])
go func() {
kubeConfig := v1.NewConfig()
client, err := v1.NewClient(kubeConfig, nil, nil)
if err != nil {
log.Fatalf("Failed to connect to Kubernetes cluster: %v", err)
}

db := sqlx.MustConnect(sysConfig["databaseDriverName"], databaseDataSourceName)
if err := goose.Run("up", db.DB, "db"); err != nil {
log.Fatalf("Failed to run database migrations: %v", err)
}
go watchConfigmapChanges(client, "onepanel", stopCh, func(configMap *corev1.ConfigMap) error {
log.Printf("Configmap changed")
stopCh <- struct{}{}

return nil
})

for {
client.ClearSystemConfigCache()
sysConfig, err := client.GetSystemConfig()
if err != nil {
log.Fatalf("Failed to get system config: %v", err)
}

databaseDataSourceName := fmt.Sprintf("host=%v user=%v password=%v dbname=%v sslmode=disable",
sysConfig["databaseHost"], sysConfig["databaseUsername"], sysConfig["databasePassword"], sysConfig["databaseName"])

// sqlx.MustConnect will panic when it can't connect to DB. In that case, this whole application will crash.
// This is okay, as the pod will restart and try connecting to DB again.
// dbDriverName may be nil, but sqlx will then panic.
dbDriverName := sysConfig.DatabaseDriverName()
db := sqlx.MustConnect(*dbDriverName, databaseDataSourceName)
if err := goose.Run("up", db.DB, "db"); err != nil {
log.Fatalf("Failed to run database migrations: %v", err)
}

s := startRPCServer(db, kubeConfig, sysConfig, stopCh)

<-stopCh

s.Stop()
}
}()

go startRPCServer(db, kubeConfig, sysConfig)
startHTTPProxy()
}

func startRPCServer(db *v1.DB, kubeConfig *v1.Config, sysConfig v1.SystemConfig) {
func startRPCServer(db *v1.DB, kubeConfig *v1.Config, sysConfig v1.SystemConfig, stopCh chan struct{}) *grpc.Server {
log.Printf("Starting RPC server on port %v", *rpcPort)
lis, err := net.Listen("tcp", *rpcPort)
if err != nil {
Expand Down Expand Up @@ -103,9 +134,15 @@ func startRPCServer(db *v1.DB, kubeConfig *v1.Config, sysConfig v1.SystemConfig)
api.RegisterWorkspaceTemplateServiceServer(s, server.NewWorkspaceTemplateServer())
api.RegisterWorkspaceServiceServer(s, server.NewWorkspaceServer())

if err := s.Serve(lis); err != nil {
log.Fatalf("Failed to serve RPC server: %v", err)
}
go func() {
if err := s.Serve(lis); err != nil {
log.Fatalf("Failed to serve RPC server: %v", err)
}

log.Printf("Server finished")
}()

return s
}

func startHTTPProxy() {
Expand Down Expand Up @@ -159,3 +196,51 @@ func registerHandler(register registerFunc, ctx context.Context, mux *runtime.Se
log.Fatalf("Failed to register handler: %v", err)
}
}

// watchConfigmapChanges sets up a listener for configmap changes and calls the onChange function when it happens
func watchConfigmapChanges(client *v1.Client, namespace string, stopCh <-chan struct{}, onChange func(*corev1.ConfigMap) error) {
restClient := client.CoreV1().RESTClient()
resource := "configmaps"
fieldSelector := fields.ParseSelectorOrDie(fmt.Sprintf("metadata.name=%s", "onepanel"))
listFunc := func(options apiv1.ListOptions) (k8runtime.Object, error) {
options.FieldSelector = fieldSelector.String()
req := restClient.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, apiv1.ParameterCodec)
return req.Do().Get()
}
watchFunc := func(options apiv1.ListOptions) (watch.Interface, error) {
options.Watch = true
options.FieldSelector = fieldSelector.String()
req := restClient.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, apiv1.ParameterCodec)
return req.Watch()
}
source := &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
_, controller := cache.NewInformer(
source,
&corev1.ConfigMap{},
0,
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, new interface{}) {
oldCM := old.(*corev1.ConfigMap)
newCM := new.(*corev1.ConfigMap)
if oldCM.ResourceVersion == newCM.ResourceVersion {
return
}
if newCm, ok := new.(*corev1.ConfigMap); ok {
log.Infof("Detected ConfigMap update.")
if err := onChange(newCm); err != nil {
log.Errorf("Error on calling onChange callback: %v", err)
}
}
},
})

// We don't want the watcher to ever stop, so give it a channel that will never be hit.
neverStopCh := make(chan struct{})
controller.Run(neverStopCh)
}
85 changes: 83 additions & 2 deletions pkg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,91 @@ package v1

import (
"encoding/base64"
"fmt"
"github.com/onepanelio/core/pkg/util"
"github.com/onepanelio/core/pkg/util/ptr"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/yaml"
"strings"
)

// SystemConfig is configuration loaded from kubernetes config and secrets that includes information about the
// database, server, etc.
type SystemConfig = map[string]string
type SystemConfig map[string]string

// GetValue returns the value in the underlying map if it exists, otherwise nil is returned
// If the value does not exist, it is also logged.
func (s SystemConfig) GetValue(name string) *string {
value, ok := s[name]
if !ok {
log.WithFields(log.Fields{
"Method": "SystemConfig.GetValue",
"Name": name,
"Error": "does not exist",
})

return nil
}

return &value
}

// Domain gets the ONEPANEL_DOMAIN value, or nil.
func (s SystemConfig) Domain() *string {
return s.GetValue("ONEPANEL_DOMAIN")
}

// APIURL gets the ONEPANEL_API_URL, or nil.
func (s SystemConfig) APIURL() *string {
return s.GetValue("ONEPANEL_API_URL")
}

// APIProtocol returns either http:// or https:// or nil.
// It is based on the ONEPANEL_API_URL config value and checks if it has https or http
func (s SystemConfig) APIProtocol() *string {
url := s.APIURL()
if url == nil {
return nil
}

if strings.HasPrefix(*url, "https://") {
return ptr.String("https://")
}

return ptr.String("http://")
}

// FQDN gets the ONEPANEL_FQDN value or nil.
func (s SystemConfig) FQDN() *string {
return s.GetValue("ONEPANEL_FQDN")
}

// NodePoolOptions gets the applicationNodePoolOptions value, or nil.
func (s SystemConfig) NodePoolOptions() *string {
return s.GetValue("applicationNodePoolOptions")
}

// ParsedNodePoolOptions loads and parses the applicationNodePoolOptions from the config.
// If there is no data, an error is returned.
func (s SystemConfig) ParsedNodePoolOptions() (options []*ParameterOption, err error) {
data := s.NodePoolOptions()
if data == nil {
return nil, fmt.Errorf("no nodePoolOptions in config")
}

if err = yaml.Unmarshal([]byte(*data), &options); err != nil {
return
}

return
}

// DatabaseDriverName gets the databaseDriverName value, or nil.
func (s SystemConfig) DatabaseDriverName() *string {
return s.GetValue("databaseDriverName")
}

func (c *Client) getConfigMap(namespace, name string) (configMap *ConfigMap, err error) {
cm, err := c.CoreV1().ConfigMaps(namespace).Get(name, metav1.GetOptions{})
Expand All @@ -26,9 +101,15 @@ func (c *Client) getConfigMap(namespace, name string) (configMap *ConfigMap, err
return
}

// ClearSystemConfigCache wipes out the cached system configuration so that the next call to
// GetSystemConfig will pull it from the resources
func (c *Client) ClearSystemConfigCache() {
c.systemConfig = nil
}

// GetSystemConfig loads various system configurations and bundles them into a map.
// The configuration is cached once it is loaded, and that cached value is used from here on out.
func (c *Client) GetSystemConfig() (config map[string]string, err error) {
func (c *Client) GetSystemConfig() (config SystemConfig, err error) {
if c.systemConfig != nil {
return c.systemConfig, nil
}
Expand Down
33 changes: 30 additions & 3 deletions pkg/workflow_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
"errors"
"fmt"
sq "github.com/Masterminds/squirrel"
"github.com/ghodss/yaml"
"github.com/onepanelio/core/pkg/util/label"
"github.com/onepanelio/core/pkg/util/pagination"
"github.com/onepanelio/core/pkg/util/ptr"
uid2 "github.com/onepanelio/core/pkg/util/uid"
"gopkg.in/yaml.v2"
"io"
"io/ioutil"
"k8s.io/apimachinery/pkg/watch"
Expand Down Expand Up @@ -328,7 +328,9 @@ func (c *Client) ValidateWorkflowExecution(namespace string, manifest []byte) (e
return
}

func (c *Client) CreateWorkflowExecution(namespace string, workflow *WorkflowExecution) (*WorkflowExecution, error) {
// CreateWorkflowExecution creates an argo workflow execution and related resources.
// Note that the workflow template is loaded from the database/k8s, so workflow.WorkflowTemplate.Manifest is not used.
func (c *Client) CreateWorkflowExecution(namespace string, workflow *WorkflowExecution, runtimeVars *RuntimeVars) (*WorkflowExecution, error) {
workflowTemplate, err := c.GetWorkflowTemplate(namespace, workflow.WorkflowTemplate.UID, workflow.WorkflowTemplate.Version)
if err != nil {
log.WithFields(log.Fields{
Expand All @@ -339,6 +341,31 @@ func (c *Client) CreateWorkflowExecution(namespace string, workflow *WorkflowExe
return nil, util.NewUserError(codes.NotFound, "Error with getting workflow template.")
}

if runtimeVars != nil && workflowTemplate.ArgoWorkflowTemplate != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this section should be here. It's unique to Workspaces only. Other calls to CreateWorkflowExecution may never have "stateful-set-resource" or VirtualService. Can we inject these into the template before it's passed in here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rushtehrani I agree, we need to separate the concerns here.

The code above gets the workflow template and that also gets the k8s argo template. At the moment, we can't modify it beforehand because it gets it in this function.

We could change the function so that we pass in the workflow template with the argo contents pre-loaded.
That way it does less work and is more configurable.

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Vafilor parameters are showing and configuration reload is working. Is this the only change that remains?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rushtehrani Yes, I should have it ready by tomorrow morning.

argoTemplate := workflowTemplate.ArgoWorkflowTemplate
for _, param := range runtimeVars.AdditionalParameters {
argoTemplate.Spec.Arguments.Parameters = append(argoTemplate.Spec.Arguments.Parameters, wfv1.Parameter{
Name: param.Name,
Value: param.Value,
})
}

finalTemplates := make([]wfv1.Template, 0)
for i := range argoTemplate.Spec.Templates {
template := &argoTemplate.Spec.Templates[i]

if template.Name == "stateful-set-resource" {
template.Resource.Manifest = runtimeVars.StatefulSetManifest
}

if template.Name != runtimeVars.VirtualService.Name {
finalTemplates = append(finalTemplates, *template)
}
}
finalTemplates = append(finalTemplates, *runtimeVars.VirtualService)
workflowTemplate.ArgoWorkflowTemplate.Spec.Templates = finalTemplates
}

// TODO: Need to pull system parameters from k8s config/secret here, example: HOST
opts := &WorkflowExecutionOptions{
Labels: &map[string]string{},
Expand Down Expand Up @@ -416,7 +443,7 @@ func (c *Client) CloneWorkflowExecution(namespace, uid string) (*WorkflowExecuti
return nil, err
}

return c.CreateWorkflowExecution(namespace, workflowExecution)
return c.CreateWorkflowExecution(namespace, workflowExecution, nil)
}

func (c *Client) insertPreWorkflowExecutionStatistic(namespace, name string, workflowTemplateVersionId uint64, createdAt time.Time, uid string, parameters []Parameter) (newId uint64, err error) {
Expand Down
Loading