Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 57 additions & 99 deletions pkg/providers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,16 @@ package providers

import (
"context"
"errors"
"fmt"
"os"
"sync"
"time"

"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
networkingv1 "k8s.io/client-go/listers/networking/v1"
Expand Down Expand Up @@ -98,7 +97,11 @@ func NewController(cfg *config.Config) (*Controller, error) {
podName = os.Getenv("HOSTNAME")
}
if podName == "" {
podName = "apisix-ingress-controller"
var err error
podName, err = os.Hostname()
if err != nil {
return nil, err
}
}
podNamespace := os.Getenv("POD_NAMESPACE")
if podNamespace == "" {
Expand Down Expand Up @@ -148,49 +151,20 @@ func (c *Controller) Run(ctx context.Context) error {
rootCtx, rootCancel := context.WithCancel(ctx)
defer rootCancel()

c.MetricsCollector.ResetLeader(false)

go func() {
log.Info("start api server")
// todo: propagate context instead
if err := c.apiServer.Run(rootCtx.Done()); err != nil {
log.Errorf("failed to launch API Server: %s", err)
}
}()

if err := c.setupLeaderElection(); err != nil {
log.Errorw("failed to setup leader election")
return err
}

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
log.Info("start leader election")
c.runLeaderElection(rootCtx)
}()

// ensure that the leader has been elected
if err := wait.PollUntilContextTimeout(rootCtx, 200*time.Millisecond, 10*time.Second, false, func(ctx context.Context) (bool, error) {
if c.elector.GetLeader() != "" {
return true, nil
}
return false, nil
}); err != nil {
if err == context.Canceled {
return nil
}
return err
}

c.run(rootCtx)
// warm up informers
c.informers = c.initSharedInformers()

wg.Wait()
return nil
}
c.MetricsCollector.ResetLeader(false)

func (c *Controller) setupLeaderElection() error {
lock := &resourcelock.LeaseLock{
leaderElectionLeaseLock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Namespace: c.namespace,
Name: c.cfg.Kubernetes.ElectionID,
Expand All @@ -201,23 +175,30 @@ func (c *Controller) setupLeaderElection() error {
EventRecorder: c,
},
}
cfg := leaderelection.LeaderElectionConfig{
Lock: lock,
LeaseDuration: 15 * time.Second,
RenewDeadline: 5 * time.Second,
RetryPeriod: 2 * time.Second,

leaderElectionConfig := leaderelection.LeaderElectionConfig{
ReleaseOnCancel: true,
Name: "ingress-apisix",
Lock: leaderElectionLeaseLock,
LeaseDuration: 15 * time.Second,
RenewDeadline: 5 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
c.MetricsCollector.ResetLeader(true)
log.Infow("controller now is running as leader",
zap.String("namespace", c.namespace),
zap.String("pod", c.name),
)

c.MetricsCollector.ResetLeader(true)
err := c.run(rootCtx)
if err != nil {
log.Errorf("controller run failed: %v", err)
}
rootCancel()
},
OnNewLeader: func(identity string) {
log.Warnf("found a new leader %s", identity)
if identity != c.name {
if identity != leaderElectionLeaseLock.LockConfig.Identity {
log.Infow("controller now is running as a candidate",
zap.String("namespace", c.namespace),
zap.String("pod", c.name),
Expand All @@ -227,39 +208,30 @@ func (c *Controller) setupLeaderElection() error {
},
OnStoppedLeading: func() {
c.MetricsCollector.ResetLeader(false)
log.Infow("controller now is running as a candidate",
log.Infow("controller lost leader, exiting",
zap.String("namespace", c.namespace),
zap.String("pod", c.name),
)
// rootCancel might be to slow, and controllers may have bugs that cause them to not yield
// the safest way to step down is to simply cause a pod restart
os.Exit(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense

},
},
ReleaseOnCancel: true,
Name: "ingress-apisix",
}
elector, err := leaderelection.NewLeaderElector(cfg)

leaderElector, err := leaderelection.NewLeaderElector(leaderElectionConfig)
if err != nil {
log.Errorf("failed to create leader elector: %s", err.Error())
return err
panic(err)
}
c.elector = elector
return nil
}

func (c *Controller) runLeaderElection(ctx context.Context) {
election:
leaderCtx, leaderCancel := context.WithCancel(ctx)
c.elector.Run(leaderCtx)
leaderCancel()

select {
case <-ctx.Done():
log.Infow("controller will exit the leader election",
zap.String("namespace", c.namespace),
zap.String("pod", c.name),
)
default:
goto election
if leaderElectionConfig.WatchDog != nil {
leaderElectionConfig.WatchDog.SetLeaderElection(leaderElector)
}
// todo: this should never be necessary if only one controller runs
c.elector = leaderElector

leaderElector.Run(rootCtx)

return nil
}

func (c *Controller) initSharedInformers() *providertypes.ListerInformer {
Expand Down Expand Up @@ -382,7 +354,7 @@ func (c *Controller) initSharedInformers() *providertypes.ListerInformer {
return listerInformer
}

func (c *Controller) run(ctx context.Context) {
func (c *Controller) run(ctx context.Context) error {
log.Infow("controller tries to leading ...",
zap.String("namespace", c.namespace),
zap.String("pod", c.name),
Expand All @@ -406,30 +378,23 @@ func (c *Controller) run(ctx context.Context) {
CacheSynced: !c.cfg.EtcdServer.Enabled,
SSLKeyEncryptSalt: c.cfg.EtcdServer.SSLKeyEncryptSalt,
}

// TODO: needs retry logic
err := c.apisix.AddCluster(ctx, clusterOpts)
if err != nil && err != apisix.ErrDuplicatedCluster {
// TODO give up the leader role
log.Errorf("failed to add default cluster: %s", err)
return
return err
}

if err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).HasSynced(ctx); err != nil {
// TODO give up the leader role
log.Errorf("failed to wait the default cluster to be ready: %s", err)

// re-create apisix cluster, used in next c.run
if err = c.apisix.UpdateCluster(ctx, clusterOpts); err != nil {
log.Errorf("failed to update default cluster: %s", err)
return
}
return
return err
}

// Creation Phase

log.Info("creating controller")

c.informers = c.initSharedInformers()
common := &providertypes.Common{
ControllerNamespace: c.namespace,
ListerInformer: c.informers,
Expand All @@ -443,14 +408,12 @@ func (c *Controller) run(ctx context.Context) {

c.namespaceProvider, err = namespace.NewWatchingNamespaceProvider(ctx, c.kubeClient, c.cfg, c.resourceSyncCh)
if err != nil {
ctx.Done()
return
return err
}

c.podProvider, err = pod.NewProvider(common, c.namespaceProvider)
if err != nil {
ctx.Done()
return
return err
}

c.translator = translation.NewTranslator(&translation.TranslatorOptions{
Expand All @@ -466,20 +429,17 @@ func (c *Controller) run(ctx context.Context) {

c.apisixProvider, c.apisixTranslator, err = apisixprovider.NewProvider(common, c.namespaceProvider, c.translator)
if err != nil {
ctx.Done()
return
return err
}

c.ingressProvider, err = ingressprovider.NewProvider(common, c.namespaceProvider, c.translator, c.apisixTranslator)
if err != nil {
ctx.Done()
return
return err
}

c.kubeProvider, err = k8s.NewProvider(common, c.translator, c.namespaceProvider, c.apisixProvider, c.ingressProvider)
if err != nil {
ctx.Done()
return
return err
}

if c.cfg.Kubernetes.EnableGatewayAPI {
Expand All @@ -495,8 +455,7 @@ func (c *Controller) run(ctx context.Context) {
ListerInformer: common.ListerInformer,
})
if err != nil {
ctx.Done()
return
return err
}
}

Expand All @@ -505,25 +464,22 @@ func (c *Controller) run(ctx context.Context) {
log.Info("init namespaces")

if err = c.namespaceProvider.Init(ctx); err != nil {
ctx.Done()
return
return err
}

log.Info("wait for resource sync")

// Wait for resource sync
if ok := c.informers.StartAndWaitForCacheSync(ctx); !ok {
ctx.Done()
return
return errors.New("StartAndWaitForCacheSync failed")
}

log.Info("init providers")

// Compare resource
if !c.cfg.EtcdServer.Enabled {
if err = c.apisixProvider.Init(ctx); err != nil {
ctx.Done()
return
return err
}
}

Expand Down Expand Up @@ -573,6 +529,8 @@ func (c *Controller) run(ctx context.Context) {
log.Error("Start failed, abort...")
cancelFunc()
}

return nil
}

func (c *Controller) checkClusterHealth(ctx context.Context, cancelFunc context.CancelFunc) {
Expand Down