Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/pipeline/CiConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type CiConfig struct {
OrchestratorToken string `env:"ORCH_TOKEN" envDefault:""`
BuildxK8sDriverOptions string `env:"BUILDX_K8S_DRIVER_OPTIONS" envDefault:""`
BuildxProvenanceMode string `env:"BUILDX_PROVENANCE_MODE" envDefault:""` //provenance is set to false if this flag is not set
CIAutoTriggerBatchSize int `env:"CI_SUCCESS_AUTO_TRIGGER_BATCH_SIZE" envDefault:"1"`
}

type CiVolumeMount struct {
Expand Down
41 changes: 36 additions & 5 deletions pkg/pipeline/WebhookService.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"go.uber.org/zap"
"strconv"
"strings"
"sync"
"time"
)

Expand All @@ -58,6 +59,7 @@ type WebhookService interface {

type WebhookServiceImpl struct {
ciArtifactRepository repository.CiArtifactRepository
ciConfig *CiConfig
logger *zap.SugaredLogger
ciPipelineRepository pipelineConfig.CiPipelineRepository
ciWorkflowRepository pipelineConfig.CiWorkflowRepository
Expand All @@ -70,6 +72,7 @@ type WebhookServiceImpl struct {

func NewWebhookServiceImpl(
ciArtifactRepository repository.CiArtifactRepository,
ciConfig *CiConfig,
logger *zap.SugaredLogger,
ciPipelineRepository pipelineConfig.CiPipelineRepository,
appService app.AppService, eventClient client.EventClient,
Expand All @@ -78,6 +81,7 @@ func NewWebhookServiceImpl(
workflowDagExecutor WorkflowDagExecutor, ciHandler CiHandler) *WebhookServiceImpl {
return &WebhookServiceImpl{
ciArtifactRepository: ciArtifactRepository,
ciConfig: ciConfig,
logger: logger,
ciPipelineRepository: ciPipelineRepository,
appService: appService,
Expand Down Expand Up @@ -239,13 +243,40 @@ func (impl WebhookServiceImpl) HandleCiSuccessEvent(ciPipelineId int, request *C
impl.logger.Debugw("Trigger (manual) by user", "userId", request.UserId)
}
async := false
for _, ciArtifact := range ciArtifactArr {
err = impl.workflowDagExecutor.HandleCiSuccessEvent(ciArtifact, isCiManual, async, request.UserId)
if err != nil {
impl.logger.Errorw("error on handle ci success event", "err", err)
return 0, err

// execute auto trigger in batch on CI success event
totalCIArtifactCount := len(ciArtifactArr)
batchSize := impl.ciConfig.CIAutoTriggerBatchSize
// handling to avoid infinite loop
if batchSize <= 0 {
batchSize = 1
}
start := time.Now()
impl.logger.Infow("Started: auto trigger for children Stage/CD pipelines", "Artifact count", totalCIArtifactCount)
for i := 0; i < totalCIArtifactCount; {
//requests left to process
remainingBatch := totalCIArtifactCount - i
if remainingBatch < batchSize {
batchSize = remainingBatch
}
var wg sync.WaitGroup
for j := 0; j < batchSize; j++ {
wg.Add(1)
index := i + j
go func(index int) {
defer wg.Done()
ciArtifact := ciArtifactArr[index]
// handle individual CiArtifact success event
err = impl.workflowDagExecutor.HandleCiSuccessEvent(ciArtifact, isCiManual, async, request.UserId)
if err != nil {
impl.logger.Errorw("error on handle ci success event", "ciArtifactId", ciArtifact.Id, "err", err)
}
}(index)
}
wg.Wait()
i += batchSize
}
impl.logger.Debugw("Completed: auto trigger for children Stage/CD pipelines", "Time taken", time.Since(start).Seconds())
return artifact.Id, err
}

Expand Down
2 changes: 1 addition & 1 deletion wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.