Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 7 additions & 20 deletions pkg/cmd/cmd.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package cmd

import (
"flag"
"fmt"
"log"
"net/http"
"time"

"github.com/go-logr/logr"
"github.com/k3s-io/helm-controller/pkg/controllers"
"github.com/k3s-io/helm-controller/pkg/controllers/common"
"github.com/k3s-io/helm-controller/pkg/crds"
Expand Down Expand Up @@ -39,25 +39,12 @@ type HelmController struct {
PprofPort int
}

func (hc *HelmController) SetupDebug() error {
logging := flag.NewFlagSet("", flag.PanicOnError)
klog.InitFlags(logging)
func (hc *HelmController) SetupLogging() (logr.Logger, error) {
klog.EnableContextualLogging(true)
if hc.Debug {
logrus.SetLevel(logrus.DebugLevel)
if err := logging.Parse([]string{
fmt.Sprintf("-v=%d", hc.DebugLevel),
}); err != nil {
return err
}
} else {
if err := logging.Parse([]string{
"-v=0",
}); err != nil {
return err
}
}

return nil
return common.NewLogrusSink(nil).AsLogr(), nil
}

func (hc *HelmController) Run(app *cli.Context) error {
Expand All @@ -69,9 +56,9 @@ func (hc *HelmController) Run(app *cli.Context) error {
log.Println(http.ListenAndServe(fmt.Sprintf("localhost:%d", hc.PprofPort), nil))
}()
}
err := hc.SetupDebug()
logger, err := hc.SetupLogging()
if err != nil {
panic("failed to setup debug logging: " + err.Error())
return err
}

cfg := hc.GetNonInteractiveClientConfig()
Expand All @@ -84,7 +71,7 @@ func (hc *HelmController) Run(app *cli.Context) error {
return err
}

ctx := app.Context
ctx := klog.NewContext(app.Context, logger)

crds, err := crds.List()
if err != nil {
Expand Down
66 changes: 51 additions & 15 deletions pkg/controllers/chart/chart.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,21 @@ import (
)

const (
Label = "helmcharts.helm.cattle.io/chart"
Annotation = "helmcharts.helm.cattle.io/configHash"
Unmanaged = "helmcharts.helm.cattle.io/unmanaged"
SecretType = "helmcharts.helm.cattle.io/values"
ManagedBy = "helmcharts.cattle.io/managed-by"
CRDName = "helmcharts.helm.cattle.io"
ConfigCRDName = "helmchartconfigs.helm.cattle.io"

TaintExternalCloudProvider = "node.cloudprovider.kubernetes.io/uninitialized"
LabelNodeRolePrefix = "node-role.kubernetes.io/"
LabelControlPlaneSuffix = "control-plane"
LabelEtcdSuffix = "etcd"

AnnotationChartURL = "helm.cattle.io/chart-url"
AnnotationConfigHash = "helmcharts.helm.cattle.io/configHash"
AnnotationManagedBy = "helmcharts.cattle.io/managed-by"
AnnotationUnmanaged = "helmcharts.helm.cattle.io/unmanaged"

LabelChartName = "helmcharts.helm.cattle.io/chart"
LabelNodeRolePrefix = "node-role.kubernetes.io/"
LabelControlPlaneSuffix = "control-plane"
LabelEtcdSuffix = "etcd"

FailurePolicyReinstall = "reinstall"
FailurePolicyAbort = "abort"
Expand Down Expand Up @@ -320,7 +323,8 @@ func (c *Controller) OnChange(chart *v1.HelmChart, chartStatus v1.HelmChartStatu
}

// emit an event to indicate that this Helm chart is being applied
c.recorder.Eventf(chart, corev1.EventTypeNormal, "ApplyJob", "Applying HelmChart using Job %s/%s", job.Namespace, job.Name)
annotations := map[string]string{AnnotationConfigHash: job.Spec.Template.ObjectMeta.Annotations[AnnotationConfigHash]}
c.recorder.AnnotatedEventf(chart, annotations, corev1.EventTypeNormal, "ApplyJob", "Applying HelmChart from %s using Job %s/%s ", chartSource(chart), job.Namespace, job.Name)

return append(objs, job), chartStatus, nil
}
Expand Down Expand Up @@ -430,10 +434,10 @@ func (c *Controller) shouldManage(chart *v1.HelmChart) (bool, error) {
return false, nil
}
if chart.Annotations != nil {
if _, ok := chart.Annotations[Unmanaged]; ok {
if _, ok := chart.Annotations[AnnotationUnmanaged]; ok {
return false, nil
}
managedBy, ok := chart.Annotations[ManagedBy]
managedBy, ok := chart.Annotations[AnnotationManagedBy]
if ok {
// if the label exists, only handle this if the managedBy label matches that of this controller
return managedBy == c.managedBy, nil
Expand All @@ -444,10 +448,10 @@ func (c *Controller) shouldManage(chart *v1.HelmChart) (bool, error) {
chartCopy := chart.DeepCopy()
if chartCopy.Annotations == nil {
chartCopy.SetAnnotations(map[string]string{
ManagedBy: c.managedBy,
AnnotationManagedBy: c.managedBy,
})
} else {
chartCopy.Annotations[ManagedBy] = c.managedBy
chartCopy.Annotations[AnnotationManagedBy] = c.managedBy
}
_, err := c.helms.Update(chartCopy)
return false, err
Expand Down Expand Up @@ -573,15 +577,15 @@ func job(chart *v1.HelmChart, apiServerPort string) (*batch.Job, *corev1.Secret,
Name: fmt.Sprintf("helm-%s-%s", action, chart.Name),
Namespace: chart.Namespace,
Labels: map[string]string{
Label: chart.Name,
LabelChartName: chart.Name,
},
},
Spec: batch.JobSpec{
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{},
Labels: map[string]string{
Label: chart.Name,
LabelChartName: chart.Name,
},
},
Spec: corev1.PodSpec{
Expand Down Expand Up @@ -1174,7 +1178,7 @@ func hashObjects(job *batch.Job, objs ...metav1.Object) {
}
}

job.Spec.Template.ObjectMeta.Annotations[Annotation] = fmt.Sprintf("SHA256=%X", hash.Sum(nil))
job.Spec.Template.ObjectMeta.Annotations[AnnotationConfigHash] = fmt.Sprintf("SHA256=%X", hash.Sum(nil))
}

func setBackOffLimit(job *batch.Job, backOffLimit *int32) {
Expand All @@ -1190,3 +1194,35 @@ func setSecurityContext(job *batch.Job, chart *v1.HelmChart) {
job.Spec.Template.Spec.Containers[0].SecurityContext = chart.Spec.SecurityContext
}
}

// chartSource returns a string describing the source of the chart:
// chartContent, chart URL, or repo+version
func chartSource(chart *v1.HelmChart) string {
if chart == nil {
return "<unknown>"
}

if chart.Spec.ChartContent != "" {
if url := chart.Annotations[AnnotationChartURL]; url != "" {
return fmt.Sprintf("inline spec.chartContent from %s", url)
}
return "inline spec.chartContent"
}

if strings.HasPrefix(chart.Spec.Chart, "oci://") {
if chart.Spec.Version != "" {
return fmt.Sprintf("version %s from OCI registry %s", chart.Spec.Version, chart.Spec.Chart)
}
return fmt.Sprintf("latest stable version from OCI registry %s", chart.Spec.Chart)
}

if strings.Contains(chart.Spec.Chart, "://") {
return chart.Spec.Chart
}

if chart.Spec.Version != "" {
return fmt.Sprintf("%s version %s from chart repository %s", chart.Spec.Chart, chart.Spec.Version, chart.Spec.Repo)
}

return fmt.Sprintf("latest stable version of %s from chart repository %s", chart.Spec.Chart, chart.Spec.Repo)
}
2 changes: 1 addition & 1 deletion pkg/controllers/chart/chart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestHashObjects(t *testing.T) {
b, _ := yaml.ToBytes([]runtime.Object{job})
t.Logf("Generated Job:\n%s", b)

assert.Equalf(test.hash, job.Spec.Template.ObjectMeta.Annotations[Annotation], "%s annotation value does not match", Annotation)
assert.Equalf(test.hash, job.Spec.Template.ObjectMeta.Annotations[AnnotationConfigHash], "%s annotation value does not match", AnnotationConfigHash)
})
}
}
Expand Down
89 changes: 89 additions & 0 deletions pkg/controllers/common/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package common

import (
"fmt"

"github.com/go-logr/logr"
"github.com/sirupsen/logrus"
)

// implicit interface check
var _ logr.LogSink = &LogrusSink{}

// mapLevel maps logr log verbosities to logrus log levels
// logr does not have "log levels", but Info prints at verbosity 0
// while logrus's LevelInfo is unit32(4). This means:
// * panic/fatal/warn are unused,
// * 0 is info
// * 1 is debug
// * >=2 are trace
func mapLevel(level int) logrus.Level {
if level >= 2 {
return logrus.TraceLevel
}
return logrus.Level(level + 4)
}

// mapKV maps a list of keys and values to logrus Fields
func mapKV(kvs []any) logrus.Fields {
fields := logrus.Fields{}
for i := 0; i < len(kvs); i += 2 {
k, ok := kvs[i].(string)
if !ok {
k = fmt.Sprint(kvs[i])
}
if len(kvs) > i+1 {
fields[k] = kvs[i+1]
} else {
fields[k] = ""
}
}
return fields
}

// LogrusSink wraps logrus the Logger/Entry types for use as a logr LogSink.
type LogrusSink struct {
e *logrus.Entry
ri logr.RuntimeInfo
}

func NewLogrusSink(l *logrus.Logger) *LogrusSink {
if l == nil {
l = logrus.StandardLogger()
}
return &LogrusSink{e: logrus.NewEntry(l)}
}

func (ls *LogrusSink) AsLogr() logr.Logger {
return logr.New(ls)
}

func (ls *LogrusSink) Init(ri logr.RuntimeInfo) {
ls.ri = ri
}

func (ls *LogrusSink) Enabled(level int) bool {
return ls.e.Logger.IsLevelEnabled(mapLevel(level))
}

func (ls *LogrusSink) Info(level int, msg string, kvs ...any) {
ls.e.WithFields(mapKV(kvs)).Log(mapLevel(level), msg)
}

func (ls *LogrusSink) Error(err error, msg string, kvs ...any) {
ls.e.WithError(err).WithFields(mapKV(kvs)).Error(msg)
}

func (ls *LogrusSink) WithValues(kvs ...any) logr.LogSink {
return &LogrusSink{
e: ls.e.WithFields(mapKV(kvs)),
ri: ls.ri,
}
}

func (ls *LogrusSink) WithName(name string) logr.LogSink {
if base, ok := ls.e.Data["logger"]; ok {
name = fmt.Sprintf("%s/%s", base, name)
}
return ls.WithValues("logger", name)
}
39 changes: 23 additions & 16 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controllers

import (
"context"
"os"
"time"

"github.com/k3s-io/helm-controller/pkg/controllers/chart"
Expand All @@ -23,7 +24,6 @@ import (
"github.com/rancher/wrangler/v3/pkg/ratelimit"
"github.com/rancher/wrangler/v3/pkg/schemes"
"github.com/rancher/wrangler/v3/pkg/start"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
Expand All @@ -35,6 +35,10 @@ import (
"k8s.io/klog/v2"
)

const (
eventLogLevel klog.Level = 0
)

type appContext struct {
helmcontroller.Interface

Expand All @@ -55,16 +59,17 @@ func (a *appContext) start(ctx context.Context) error {
}

func Register(ctx context.Context, systemNamespace, controllerName string, cfg clientcmd.ClientConfig, opts common.Options) error {
appCtx, err := newContext(cfg, systemNamespace, opts)
if err != nil {
return err
}

if len(controllerName) == 0 {
controllerName = "helm-controller"
}

appCtx.EventBroadcaster.StartLogging(logrus.Infof)
ctx = klog.NewContext(ctx, klog.FromContext(ctx).WithName(controllerName))
appCtx, err := newContext(ctx, cfg, systemNamespace, opts)
if err != nil {
return err
}

appCtx.EventBroadcaster.StartStructuredLogging(eventLogLevel)
appCtx.EventBroadcaster.StartRecordingToSink(&typedv1.EventSinkImpl{
Interface: appCtx.K8s.CoreV1().Events(systemNamespace),
})
Expand Down Expand Up @@ -99,21 +104,23 @@ func Register(ctx context.Context, systemNamespace, controllerName string, cfg c
appCtx.Core.Secret().Cache(),
)

klog.Infof("Starting helm controller with %d threads", opts.Threadiness)
klog.Infof("Using cluster role '%s' for jobs managing helm charts", opts.JobClusterRole)
klog.Infof("Using default image '%s' for jobs managing helm charts", chart.DefaultJobImage)
logger := klog.FromContext(ctx)
logger.Info("Starting helm controller", "threads", opts.Threadiness)
logger.Info("Using cluster role for jobs managing helm charts", "jobClusterRole", opts.JobClusterRole)
logger.Info("Using default image for jobs managing helm charts", "defaultJobImage", chart.DefaultJobImage)

if len(systemNamespace) == 0 {
systemNamespace = metav1.NamespaceSystem
klog.Infof("Starting %s for all namespaces with lock in %s", controllerName, systemNamespace)
logger.Info("Starting global controller", "leaseNamespace", systemNamespace)
} else {
klog.Infof("Starting %s for namespace %s", controllerName, systemNamespace)
logger.Info("Starting namespaced controller", "namespace", systemNamespace)
}

controllerLockName := controllerName + "-lock"
leader.RunOrDie(ctx, systemNamespace, controllerLockName, appCtx.K8s, func(ctx context.Context) {
if err := appCtx.start(ctx); err != nil {
klog.Fatal(err)
klog.Error(err, "failed to start controllers")
os.Exit(1)
}
klog.Info("All controllers have been started")
})
Expand All @@ -135,7 +142,7 @@ func controllerFactory(rest *rest.Config) (controller.SharedControllerFactory, e
}), nil
}

func newContext(cfg clientcmd.ClientConfig, systemNamespace string, opts common.Options) (*appContext, error) {
func newContext(ctx context.Context, cfg clientcmd.ClientConfig, systemNamespace string, opts common.Options) (*appContext, error) {
client, err := cfg.ClientConfig()
if err != nil {
return nil, err
Expand All @@ -146,7 +153,7 @@ func newContext(cfg clientcmd.ClientConfig, systemNamespace string, opts common.
if err != nil {
return nil, err
}
apply = apply.WithSetOwnerReference(false, false)
apply = apply.WithSetOwnerReference(false, false).WithContext(ctx)

k8s, err := kubernetes.NewForConfig(client)
if err != nil {
Expand Down Expand Up @@ -203,7 +210,7 @@ func newContext(cfg clientcmd.ClientConfig, systemNamespace string, opts common.
RBAC: rbacv,

Apply: apply,
EventBroadcaster: record.NewBroadcaster(),
EventBroadcaster: record.NewBroadcaster(record.WithContext(ctx)),

ClientConfig: cfg,
starters: []start.Starter{
Expand Down