Skip to content
Open
5 changes: 5 additions & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ func Start(conf *config.Config, eventHandler handlers.Handler) {
} else {
namespace = ciCfg.DefaultNamespace
}
clusterCfg := &ClusterConfig{}
err = env.Parse(clusterCfg)
Comment on lines +171 to +172
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's do this once for both ci and cd informer

if clusterCfg.ClusterType == ClusterTypeAll && !externalCD.External {
startSystemWorkflowInformer(logger)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is being called two times and why externalCD.External

}
stopCh := make(chan struct{})
defer close(stopCh)
startWorkflowInformer(namespace, logger, pubsub.WORKFLOW_STATUS_UPDATE_TOPIC, stopCh, dynamicClient, externalCD)
Expand Down
74 changes: 44 additions & 30 deletions pkg/informer/K8sInformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,19 @@ const (
INFORMER_ALREADY_EXIST_MESSAGE = "INFORMER_ALREADY_EXIST"
ADD = "add"
UPDATE = "update"
CI_WORKFLOW_NAME = "ci"
CD_WORKFLOW_NAME = "cd"
)

type K8sInformer interface {
startSystemWorkflowInformerForCluster(clusterInfo ClusterInfo) error
syncSystemWorkflowInformer(clusterId int) error
stopSystemWorkflowInformer(clusterId int)
startSystemWorkflowInformer(clusterId int) error
BuildInformerForAllClusters() error
}

type K8sInformerImpl struct {
logger *zap.SugaredLogger
mutex sync.Mutex
informerStopper map[int]chan struct{}
CdInformerStopper map[int]chan struct{}
CiInformerStopper map[int]chan struct{}
clusterRepository repository.ClusterRepository
DefaultK8sConfig *rest.Config
pubSubClient *pubsub.PubSubClientServiceImpl
Expand All @@ -65,7 +64,8 @@ func NewK8sInformerImpl(logger *zap.SugaredLogger, clusterRepository repository.
}
defaultK8sConfig, _ := utils.GetDefaultK8sConfig("kubeconfigK8s")
informerFactory.DefaultK8sConfig = defaultK8sConfig
informerFactory.informerStopper = make(map[int]chan struct{})
informerFactory.CdInformerStopper = make(map[int]chan struct{})
informerFactory.CiInformerStopper = make(map[int]chan struct{})
return informerFactory
}

Expand All @@ -78,7 +78,7 @@ func (impl *K8sInformerImpl) BuildInformerForAllClusters() error {
return err
}
for _, model := range models {
impl.startSystemWorkflowInformer(model.Id)
impl.startSystemWorkflowInformerForCiCd(model.Id)
}
return nil
}
Expand All @@ -100,7 +100,7 @@ func (impl *K8sInformerImpl) startSystemWorkflowInformerForCluster(clusterInfo C
restConfig.Insecure = true
}

err := impl.startSystemWorkflowInformer(clusterInfo.ClusterId)
err := impl.startSystemWorkflowInformerForCiCd(clusterInfo.ClusterId)
if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) {
impl.logger.Error("error in creating informer for new cluster", "err", err)
return err
Expand Down Expand Up @@ -190,11 +190,8 @@ func (impl *K8sInformerImpl) handleClusterDelete(clusterId int) bool {
impl.logger.Errorw("Error in fetching cluster by id", "cluster-id ", clusterId, "err", err)
return true
}
impl.stopSystemWorkflowInformer(deleteClusterInfo.Id)
if err != nil {
impl.logger.Errorw("error in updating informer for cluster", "id", clusterId, "err", err)
return true
}
impl.stopSystemWorkflowInformerForCiCd(deleteClusterInfo.Id)

return false
}

Expand All @@ -209,8 +206,8 @@ func (impl *K8sInformerImpl) handleClusterChangeEvent(secretObject *coreV1.Secre
var err error

if string(action) == ADD {
err = impl.startSystemWorkflowInformer(clusterId)
if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) {
err := impl.startSystemWorkflowInformerForCiCd(clusterId)
if err != nil {
impl.logger.Error("error in adding informer for cluster", "id", clusterId, "err", err)
return
}
Expand All @@ -233,36 +230,53 @@ func (impl *K8sInformerImpl) syncSystemWorkflowInformer(clusterId int) error {
}
//before creating new informer for cluster, close existing one
impl.logger.Debugw("stopping informer for cluster - ", "cluster-name", clusterInfo.ClusterName, "cluster-id", clusterInfo.Id)
impl.stopSystemWorkflowInformer(clusterInfo.Id)
impl.stopSystemWorkflowInformerForCiCd(clusterInfo.Id)
impl.logger.Debugw("informer stopped", "cluster-name", clusterInfo.ClusterName, "cluster-id", clusterInfo.Id)
//create new informer for cluster with new config
err = impl.startSystemWorkflowInformer(clusterId)
err = impl.startSystemWorkflowInformerForCiCd(clusterId)
if err != nil {
impl.logger.Errorw("error in starting informer for ", "cluster name", clusterInfo.ClusterName)
return err
}
return nil
}

func (impl *K8sInformerImpl) stopSystemWorkflowInformer(clusterId int) {
stopper := impl.informerStopper[clusterId]
func (impl *K8sInformerImpl) stopSystemWorkflowInformerForCiCd(clusterId int) {
stopper := impl.CdInformerStopper[clusterId]
if stopper != nil {
close(stopper)
delete(impl.CdInformerStopper, clusterId)
}
stopper = impl.CiInformerStopper[clusterId]
if stopper != nil {
close(stopper)
delete(impl.informerStopper, clusterId)
delete(impl.CiInformerStopper, clusterId)
}
return
}

func (impl *K8sInformerImpl) startSystemWorkflowInformer(clusterId int) error {
func (impl *K8sInformerImpl) startSystemWorkflowInformerForCiCd(clusterId int) error {
err := impl.startSystemWorkflowInformer(clusterId, impl.CdInformerStopper, pubsub.CD_WORKFLOW_STATUS_UPDATE, CD_WORKFLOW_NAME)
if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) {
return err
}
err = impl.startSystemWorkflowInformer(clusterId, impl.CiInformerStopper, pubsub.WORKFLOW_STATUS_UPDATE_TOPIC, CI_WORKFLOW_NAME)
if err != nil && err != errors.New(INFORMER_ALREADY_EXIST_MESSAGE) {
return err
}
return nil
}

func (impl *K8sInformerImpl) startSystemWorkflowInformer(clusterId int, informerStopper map[int]chan struct{}, eventType string, workflowName string) error {

clusterInfo, err := impl.clusterRepository.FindById(clusterId)
if err != nil {
impl.logger.Errorw("error in fetching cluster","clusterId",clusterId, "err", err)
impl.logger.Errorw("error in fetching cluster", "clusterId", clusterId, "err", err)
return err
}

if _, ok := impl.informerStopper[clusterId]; ok {
impl.logger.Debug(fmt.Sprintf("informer for %s already exist", clusterInfo.ClusterName))
if _, ok := informerStopper[clusterId]; ok {
impl.logger.Debug(fmt.Sprintf("%s informer for %s already exist", eventType, clusterInfo.ClusterName))
return errors.New(INFORMER_ALREADY_EXIST_MESSAGE)
}
impl.logger.Infow("starting informer for cluster", "clusterId", clusterInfo.Id, "clusterName", clusterInfo.ClusterName)
Expand All @@ -282,7 +296,7 @@ func (impl *K8sInformerImpl) startSystemWorkflowInformer(clusterId int) error {
if podObj, ok := newObj.(*coreV1.Pod); ok {
impl.logger.Debugw("Event received in Pods update informer", "time", time.Now(), "podObjStatus", podObj.Status)
nodeStatus := impl.assessNodeStatus(podObj)
workflowStatus := impl.getWorkflowStatus(podObj, nodeStatus)
workflowStatus := impl.getWorkflowStatus(podObj, nodeStatus, workflowName)
wfJson, err := json.Marshal(workflowStatus)
if err != nil {
impl.logger.Errorw("error occurred while marshalling workflowJson", "err", err)
Expand All @@ -294,7 +308,7 @@ func (impl *K8sInformerImpl) startSystemWorkflowInformer(clusterId int) error {
return
}

err = impl.pubSubClient.Publish(pubsub.CD_WORKFLOW_STATUS_UPDATE, string(wfJson))
err = impl.pubSubClient.Publish(eventType, string(wfJson))
if err != nil {
impl.logger.Errorw("Error while publishing Request", "err", err)
return
Expand All @@ -305,7 +319,7 @@ func (impl *K8sInformerImpl) startSystemWorkflowInformer(clusterId int) error {
})
informerFactory.Start(stopper)
impl.logger.Infow("informer started for cluster", "clusterId", clusterInfo.Id, "clusterName", clusterInfo.ClusterName)
impl.informerStopper[clusterId] = stopper
informerStopper[clusterId] = stopper
return nil
}

Expand Down Expand Up @@ -499,7 +513,7 @@ func (impl *K8sInformerImpl) inferFailedReason(pod *coreV1.Pod) (v1alpha1.NodePh
return v1alpha1.NodeSucceeded, ""
}

func (impl *K8sInformerImpl) getWorkflowStatus(podObj *coreV1.Pod, nodeStatus v1alpha1.NodeStatus) *v1alpha1.WorkflowStatus {
func (impl *K8sInformerImpl) getWorkflowStatus(podObj *coreV1.Pod, nodeStatus v1alpha1.NodeStatus, templateName string) *v1alpha1.WorkflowStatus {
workflowStatus := &v1alpha1.WorkflowStatus{}
workflowPhase := v1alpha1.WorkflowPhase(nodeStatus.Phase)
if workflowPhase == v1alpha1.WorkflowPending {
Expand All @@ -509,9 +523,9 @@ func (impl *K8sInformerImpl) getWorkflowStatus(podObj *coreV1.Pod, nodeStatus v1
workflowStatus.FinishedAt = nodeStatus.FinishedAt
}
workflowStatus.Phase = workflowPhase
nodeNameVsStatus := make(map[string]v1alpha1.NodeStatus,1)
nodeNameVsStatus := make(map[string]v1alpha1.NodeStatus, 1)
nodeStatus.ID = podObj.Name
nodeStatus.TemplateName = "cd"
nodeStatus.TemplateName = templateName
nodeStatus.Name = nodeStatus.ID
nodeStatus.BoundaryID = impl.getPodOwnerName(podObj)
nodeNameVsStatus[podObj.Name] = nodeStatus
Expand Down