Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
332b668
fixed gitops methods args
Sep 26, 2022
7f567e2
removed redundant import
Sep 26, 2022
cb33217
restructured gitlab & github gitops methods location, added GetCommit…
Sep 27, 2022
4184154
update statusTime for timeline status
Sep 27, 2022
eb62e25
Merge branch 'main' into pipeline-status
Oct 4, 2022
574effa
Merge branch 'main' into pipeline-status
Oct 4, 2022
f18e872
added commitTime in pipelineOverride
Oct 5, 2022
0b2bd46
updated app status update event handling logic
Oct 5, 2022
6d42d24
updated cron service
Oct 6, 2022
055ba1a
updated notification event bug in cron
Oct 6, 2022
2f1545d
updated query
Oct 6, 2022
3f11267
handled deployment success event in cron
Oct 8, 2022
10c8978
updated handling of healthy status handling in first app update
Oct 8, 2022
b90ee98
Merge branch 'main' into pipeline-status
Oct 9, 2022
461c511
updated cron for updating pipeline status for stucked timelines
kartik-579 Oct 12, 2022
94af909
added condition to stop duplicate timeline update
kartik-579 Oct 12, 2022
51fac3a
updated sync call for resource tree fetch call
kartik-579 Oct 12, 2022
3dd1771
Merge branch 'main' into pipeline-status
kartik-579 Oct 13, 2022
3143439
fixed stream for argo pipeline timeline update event
kartik-579 Oct 13, 2022
1e4a8a0
added dummy event for ARGO_PIPELINE_STATUS_UPDATE_TOPIC
kartik-579 Oct 13, 2022
f762cfe
wip
kartik-579 Oct 13, 2022
1357d29
updated AddStream method to support update stream
kartik-579 Oct 13, 2022
26119ea
updated spec
kartik-579 Oct 13, 2022
c0651f5
fixed method to get pipelines
kartik-579 Oct 13, 2022
d3b55ce
fixed query
kartik-579 Oct 14, 2022
406f4d2
wip
kartik-579 Oct 14, 2022
58e7b44
added stream before subscribe in CdApplicationStatusUpdateHandler
kartik-579 Oct 14, 2022
069c2fa
wip
kartik-579 Oct 14, 2022
d1f1fc5
fix
kartik-579 Oct 14, 2022
3de6496
added logs
kartik-579 Oct 14, 2022
da4478d
added logs
kartik-579 Oct 14, 2022
aff0e93
updated query
kartik-579 Oct 17, 2022
5fe02d6
Merge branch 'main' into pipeline-status
kartik-579 Oct 17, 2022
ae6768d
updated queries
kartik-579 Oct 19, 2022
a54c35a
minor optimisations
kartik-579 Oct 20, 2022
5920458
Merge branch 'main' into pipeline-status
kartik-579 Oct 20, 2022
3e131ec
minor fix
kartik-579 Oct 21, 2022
8d3bd78
Merge branch 'main' into pipeline-status
kartik-579 Oct 21, 2022
613c6a6
updated sql script no.
kartik-579 Oct 21, 2022
d5ad778
updated condition for syncing timeline in resource tree call
kartik-579 Oct 21, 2022
6fe26b9
removed redundant method
kartik-579 Oct 21, 2022
2dac26e
review changes
kartik-579 Oct 21, 2022
084e4fa
Merge branch 'main' into pipeline-status
kartik-579 Oct 21, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,8 @@ func InitializeApp() (*App, error) {
rbac.NewEnforcerUtilImpl,
wire.Bind(new(rbac.EnforcerUtil), new(*rbac.EnforcerUtilImpl)),

app.NewDeploymentFailureHandlerImpl,
wire.Bind(new(app.DeploymentFailureHandler), new(*app.DeploymentFailureHandlerImpl)),
app.NewDeploymentEventHandlerImpl,
wire.Bind(new(app.DeploymentEventHandler), new(*app.DeploymentEventHandlerImpl)),
chartConfig.NewPipelineConfigRepository,
wire.Bind(new(chartConfig.PipelineConfigRepository), new(*chartConfig.PipelineConfigRepositoryImpl)),

Expand Down
72 changes: 41 additions & 31 deletions api/restHandler/AppListingRestHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
client "github.com/devtron-labs/devtron/api/helm-app"
"github.com/devtron-labs/devtron/api/restHandler/common"
"github.com/devtron-labs/devtron/client/argocdServer/application"
"github.com/devtron-labs/devtron/client/cron"
"github.com/devtron-labs/devtron/internal/constants"
"github.com/devtron-labs/devtron/internal/util"
"github.com/devtron-labs/devtron/pkg/app"
Expand Down Expand Up @@ -63,21 +64,22 @@ type AppListingRestHandler interface {
}

type AppListingRestHandlerImpl struct {
application application.ServiceClient
appListingService app.AppListingService
teamService team.TeamService
enforcer casbin.Enforcer
pipeline pipeline.PipelineBuilder
logger *zap.SugaredLogger
enforcerUtil rbac.EnforcerUtil
deploymentGroupService deploymentGroup.DeploymentGroupService
userService user.UserService
helmAppClient client.HelmAppClient
clusterService cluster.ClusterService
helmAppService client.HelmAppService
argoUserService argo.ArgoUserService
k8sApplicationService k8s.K8sApplicationService
installedAppService service1.InstalledAppService
application application.ServiceClient
appListingService app.AppListingService
teamService team.TeamService
enforcer casbin.Enforcer
pipeline pipeline.PipelineBuilder
logger *zap.SugaredLogger
enforcerUtil rbac.EnforcerUtil
deploymentGroupService deploymentGroup.DeploymentGroupService
userService user.UserService
helmAppClient client.HelmAppClient
clusterService cluster.ClusterService
helmAppService client.HelmAppService
argoUserService argo.ArgoUserService
k8sApplicationService k8s.K8sApplicationService
installedAppService service1.InstalledAppService
cdApplicationStatusUpdateHandler cron.CdApplicationStatusUpdateHandler
}

type AppStatus struct {
Expand All @@ -96,23 +98,25 @@ func NewAppListingRestHandlerImpl(application application.ServiceClient,
logger *zap.SugaredLogger, enforcerUtil rbac.EnforcerUtil,
deploymentGroupService deploymentGroup.DeploymentGroupService, userService user.UserService,
helmAppClient client.HelmAppClient, clusterService cluster.ClusterService, helmAppService client.HelmAppService,
argoUserService argo.ArgoUserService, k8sApplicationService k8s.K8sApplicationService, installedAppService service1.InstalledAppService) *AppListingRestHandlerImpl {
argoUserService argo.ArgoUserService, k8sApplicationService k8s.K8sApplicationService, installedAppService service1.InstalledAppService,
cdApplicationStatusUpdateHandler cron.CdApplicationStatusUpdateHandler) *AppListingRestHandlerImpl {
appListingHandler := &AppListingRestHandlerImpl{
application: application,
appListingService: appListingService,
logger: logger,
teamService: teamService,
pipeline: pipeline,
enforcer: enforcer,
enforcerUtil: enforcerUtil,
deploymentGroupService: deploymentGroupService,
userService: userService,
helmAppClient: helmAppClient,
clusterService: clusterService,
helmAppService: helmAppService,
argoUserService: argoUserService,
k8sApplicationService: k8sApplicationService,
installedAppService: installedAppService,
application: application,
appListingService: appListingService,
logger: logger,
teamService: teamService,
pipeline: pipeline,
enforcer: enforcer,
enforcerUtil: enforcerUtil,
deploymentGroupService: deploymentGroupService,
userService: userService,
helmAppClient: helmAppClient,
clusterService: clusterService,
helmAppService: helmAppService,
argoUserService: argoUserService,
k8sApplicationService: k8sApplicationService,
installedAppService: installedAppService,
cdApplicationStatusUpdateHandler: cdApplicationStatusUpdateHandler,
}
return appListingHandler
}
Expand Down Expand Up @@ -722,6 +726,12 @@ func (handler AppListingRestHandlerImpl) fetchResourceTree(w http.ResponseWriter
}
appDetail.ResourceTree = util2.InterfaceToMapAdapter(resp)
handler.logger.Debugw("application environment status", "appId", appId, "envId", envId, "resp", resp)
if resp.Status == string(health.HealthStatusHealthy) {
err = handler.cdApplicationStatusUpdateHandler.SyncPipelineStatusForResourceTreeCall(acdAppName, appId, envId)
if err != nil {
handler.logger.Errorw("error in syncing pipeline status", "err", err)
}
}
} else if len(appDetail.AppName) > 0 && len(appDetail.EnvironmentName) > 0 && util.IsHelmApp(appDetail.DeploymentAppType) {
config, err := handler.helmAppService.GetClusterConf(appDetail.ClusterId)
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions api/router/pubsub/ApplicationStatusUpdateHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package pubsub

import (
"encoding/json"
"time"

v1alpha12 "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
"github.com/devtron-labs/devtron/client/pubsub"
Expand Down Expand Up @@ -68,6 +69,7 @@ func NewApplicationStatusUpdateHandlerImpl(logger *zap.SugaredLogger, pubsubClie
type ApplicationDetail struct {
Application *v1alpha12.Application `json:"application"`
OldApplication *v1alpha12.Application `json:"oldApplication"`
StatusTime time.Time `json:"statusTime"`
}

func (impl *ApplicationStatusUpdateHandlerImpl) Subscribe() error {
Expand All @@ -87,8 +89,10 @@ func (impl *ApplicationStatusUpdateHandlerImpl) Subscribe() error {
return
}
//impl.logger.Infow("app update request", "application", newApp)

isHealthy, err := impl.appService.UpdateApplicationStatusAndCheckIsHealthy(newApp, oldApp)
if applicationDetail.StatusTime.IsZero() {
applicationDetail.StatusTime = time.Now()
}
isHealthy, err := impl.appService.UpdateApplicationStatusAndCheckIsHealthy(newApp, oldApp, applicationDetail.StatusTime)
if err != nil {
impl.logger.Errorw("error on application status update", "err", err, "msg", string(msg.Data))

Expand Down
9 changes: 6 additions & 3 deletions client/argocdServer/application/Application.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ type ResourceTreeResponse struct {
*v1alpha1.ApplicationTree
NewGenerationReplicaSets []string `json:"newGenerationReplicaSets"`
Status string `json:"status"`
RevisionHash string `json:"revisionHash"`
PodMetadata []*PodMetadata `json:"podMetadata"`
Conditions []v1alpha1.ApplicationCondition `json:"conditions"`
}
Expand Down Expand Up @@ -389,10 +390,12 @@ func (c ServiceClientImpl) ResourceTree(ctxt context.Context, query *application
app, err := asc.Watch(ctxt, &appQuery)
var conditions = make([]v1alpha1.ApplicationCondition, 0)
status := "Unknown"
hash := ""
if app != nil {
appResp, err := app.Recv()
if err == nil {
status = string(appResp.Application.Status.Health.Status)
hash = appResp.Application.Status.Sync.Revision
conditions = appResp.Application.Status.Conditions
for _, condition := range conditions {
if condition.Type != v1alpha1.ApplicationConditionSharedResourceWarning {
Expand All @@ -404,7 +407,7 @@ func (c ServiceClientImpl) ResourceTree(ctxt context.Context, query *application
}
}
}
return &ResourceTreeResponse{resp, newReplicaSets, status, podMetadata, conditions}, err
return &ResourceTreeResponse{resp, newReplicaSets, status, hash, podMetadata, conditions}, err
}

func (c ServiceClientImpl) buildPodMetadata(resp *v1alpha1.ApplicationTree, responses []*Result) (podMetaData []*PodMetadata, newReplicaSets []string) {
Expand All @@ -426,15 +429,15 @@ func (c ServiceClientImpl) buildPodMetadata(resp *v1alpha1.ApplicationTree, resp
err := json.Unmarshal([]byte(manifestFromResponse), &manifest)
if err != nil {
c.logger.Error(err)
}else{
} else {
rolloutManifests = append(rolloutManifests, manifest)
}
} else if kind == "Deployment" {
manifest := make(map[string]interface{})
err := json.Unmarshal([]byte(manifestFromResponse), &manifest)
if err != nil {
c.logger.Error(err)
}else{
} else {
deploymentManifests = append(deploymentManifests, manifest)
}
} else if kind == "StatefulSet" {
Expand Down
135 changes: 118 additions & 17 deletions client/cron/CdApplicationStatusUpdateHandler.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package cron

import (
"encoding/json"
"fmt"
"github.com/caarlos0/env"
client2 "github.com/devtron-labs/devtron/client/events"
"github.com/devtron-labs/devtron/client/pubsub"
"github.com/devtron-labs/devtron/internal/sql/repository/pipelineConfig"
"github.com/devtron-labs/devtron/pkg/app"
"github.com/devtron-labs/devtron/pkg/appStore/deployment/service"
"github.com/devtron-labs/devtron/pkg/pipeline"
"github.com/devtron-labs/devtron/util"
"github.com/nats-io/nats.go"
"github.com/robfig/cron/v3"
"go.uber.org/zap"
"strconv"
Expand All @@ -14,16 +20,22 @@ import (
type CdApplicationStatusUpdateHandler interface {
HelmApplicationStatusUpdate()
ArgoApplicationStatusUpdate()
ArgoPipelineTimelineUpdate()
Subscribe() error
SyncPipelineStatusForResourceTreeCall(acdAppName string, appId, envId int) error
}

type CdApplicationStatusUpdateHandlerImpl struct {
logger *zap.SugaredLogger
cron *cron.Cron
appService app.AppService
workflowDagExecutor pipeline.WorkflowDagExecutor
installedAppService service.InstalledAppService
CdHandler pipeline.CdHandler
AppStatusConfig *AppStatusConfig
logger *zap.SugaredLogger
cron *cron.Cron
appService app.AppService
workflowDagExecutor pipeline.WorkflowDagExecutor
installedAppService service.InstalledAppService
CdHandler pipeline.CdHandler
AppStatusConfig *AppStatusConfig
pubsubClient *pubsub.PubSubClient
pipelineStatusTimelineRepository pipelineConfig.PipelineStatusTimelineRepository
eventClient client2.EventClient
}

type AppStatusConfig struct {
Expand All @@ -43,20 +55,34 @@ func GetAppStatusConfig() (*AppStatusConfig, error) {

func NewCdApplicationStatusUpdateHandlerImpl(logger *zap.SugaredLogger, appService app.AppService,
workflowDagExecutor pipeline.WorkflowDagExecutor, installedAppService service.InstalledAppService,
CdHandler pipeline.CdHandler, AppStatusConfig *AppStatusConfig) *CdApplicationStatusUpdateHandlerImpl {
CdHandler pipeline.CdHandler, AppStatusConfig *AppStatusConfig, pubsubClient *pubsub.PubSubClient,
pipelineStatusTimelineRepository pipelineConfig.PipelineStatusTimelineRepository,
eventClient client2.EventClient) *CdApplicationStatusUpdateHandlerImpl {
cron := cron.New(
cron.WithChain())
cron.Start()
impl := &CdApplicationStatusUpdateHandlerImpl{
logger: logger,
cron: cron,
appService: appService,
workflowDagExecutor: workflowDagExecutor,
installedAppService: installedAppService,
CdHandler: CdHandler,
AppStatusConfig: AppStatusConfig,
}
_, err := cron.AddFunc(AppStatusConfig.CdPipelineStatusCronTime, impl.HelmApplicationStatusUpdate)
logger: logger,
cron: cron,
appService: appService,
workflowDagExecutor: workflowDagExecutor,
installedAppService: installedAppService,
CdHandler: CdHandler,
AppStatusConfig: AppStatusConfig,
pubsubClient: pubsubClient,
pipelineStatusTimelineRepository: pipelineStatusTimelineRepository,
eventClient: eventClient,
}
err := util.AddStream(pubsubClient.JetStrCtxt, util.ORCHESTRATOR_STREAM)
if err != nil {
return nil
}
err = impl.Subscribe()
if err != nil {
logger.Errorw("error on subscribe", "err", err)
return nil
}
_, err = cron.AddFunc(AppStatusConfig.CdPipelineStatusCronTime, impl.HelmApplicationStatusUpdate)
if err != nil {
logger.Errorw("error in starting helm application status update cron job", "err", err)
return nil
Expand All @@ -66,9 +92,38 @@ func NewCdApplicationStatusUpdateHandlerImpl(logger *zap.SugaredLogger, appServi
logger.Errorw("error in starting argo application status update cron job", "err", err)
return nil
}
_, err = cron.AddFunc("@every 1m", impl.ArgoPipelineTimelineUpdate)
if err != nil {
logger.Errorw("error in starting argo application status update cron job", "err", err)
return nil
}
return impl
}

func (impl *CdApplicationStatusUpdateHandlerImpl) Subscribe() error {
_, err := impl.pubsubClient.JetStrCtxt.QueueSubscribe(util.ARGO_PIPELINE_STATUS_UPDATE_TOPIC, util.ARGO_PIPELINE_STATUS_UPDATE_GROUP, func(msg *nats.Msg) {
impl.logger.Debug("received argo pipeline status update request")
defer msg.Ack()
statusUpdateEvent := pipeline.ArgoPipelineStatusEvent{}
err := json.Unmarshal([]byte(string(msg.Data)), &statusUpdateEvent)
if err != nil {
impl.logger.Errorw("unmarshal error on argo pipeline status update event", "err", err)
return
}
impl.logger.Infow("ARGO_PIPELINE_STATUS_UPDATE_REQ", "stage", "subscribeDataUnmarshal", "data", statusUpdateEvent)
err = impl.CdHandler.UpdatePipelineTimelineAndStatusByLiveResourceTreeFetch(statusUpdateEvent.ArgoAppName, statusUpdateEvent.AppId, statusUpdateEvent.EnvId, statusUpdateEvent.IgnoreFailedWorkflowStatus)
if err != nil {
impl.logger.Errorw("error on argo pipeline status update", "err", err, "msg", string(msg.Data))
return
}
}, nats.Durable(util.ARGO_PIPELINE_STATUS_UPDATE_DURABLE), nats.DeliverLast(), nats.ManualAck(), nats.BindStream(util.ORCHESTRATOR_STREAM))
if err != nil {
impl.logger.Error("error in subscribing to argo application status update topic", "err", err)
return err
}
return nil
}

func (impl *CdApplicationStatusUpdateHandlerImpl) HelmApplicationStatusUpdate() {
degradedTime, err := strconv.Atoi(impl.AppStatusConfig.PipelineDegradedTime)
if err != nil {
Expand Down Expand Up @@ -97,3 +152,49 @@ func (impl *CdApplicationStatusUpdateHandlerImpl) ArgoApplicationStatusUpdate()
}
return
}

func (impl *CdApplicationStatusUpdateHandlerImpl) ArgoPipelineTimelineUpdate() {
err := impl.CdHandler.CheckArgoPipelineTimelineStatusPeriodicallyAndUpdateInDb(30)
if err != nil {
impl.logger.Errorw("error argo app status update - cron job", "err", err)
return
}
return
}

func (impl *CdApplicationStatusUpdateHandlerImpl) SyncPipelineStatusForResourceTreeCall(acdAppName string, appId, envId int) error {
timeline, err := impl.pipelineStatusTimelineRepository.FetchLatestTimelineByAppIdAndEnvId(appId, envId)
if err != nil {
impl.logger.Errorw("error in getting timeline", "err", err)
return err
}

if !IsTerminalTimelineStatus(timeline.Status) {
//create new nats event
statusUpdateEvent := pipeline.ArgoPipelineStatusEvent{
ArgoAppName: acdAppName,
AppId: appId,
EnvId: envId,
IgnoreFailedWorkflowStatus: true,
}
//write event
err := impl.eventClient.WriteNatsEvent(util.ARGO_PIPELINE_STATUS_UPDATE_TOPIC, statusUpdateEvent)
if err != nil {
impl.logger.Errorw("error in writing nats event", "topic", util.ARGO_PIPELINE_STATUS_UPDATE_TOPIC, "payload", statusUpdateEvent)
return err
}
}
return nil
}

func IsTerminalTimelineStatus(timeline pipelineConfig.TimelineStatus) bool {
switch timeline {
case
pipelineConfig.TIMELINE_STATUS_APP_HEALTHY,
pipelineConfig.TIMELINE_STATUS_APP_DEGRADED,
pipelineConfig.TIMELINE_STATUS_DEPLOYMENT_FAILED,
pipelineConfig.TIMELINE_STATUS_GIT_COMMIT_FAILED:
return true
}
return false
}
Loading