Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .features/pending/pagination-when-counting-workflows.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Component: UI
Issues: 13948
Description: Optimize pagination performance when counting workflows in archive.
Author: [Shuangkun Tian](https://github.com/shuangkun)

When querying archived workflows with pagination, the system now uses more efficient methods to check if there are more items available. Instead of performing expensive full table scans, the new implementation uses LIMIT queries to check if there are items beyond the current offset+limit, significantly improving performance for large datasets.
66 changes: 66 additions & 0 deletions persist/sqldb/mocks/WorkflowArchive.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions persist/sqldb/null_workflow_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ func (r *nullWorkflowArchive) CountWorkflows(ctx context.Context, options sutils
return 0, nil
}

func (r *nullWorkflowArchive) HasMoreWorkflows(ctx context.Context, options sutils.ListOptions) (bool, error) {
return false, nil
}

func (r *nullWorkflowArchive) GetWorkflow(ctx context.Context, uid string, namespace string, name string) (*wfv1.Workflow, error) {
return nil, fmt.Errorf("getting archived workflows not supported")
}
Expand Down
143 changes: 143 additions & 0 deletions persist/sqldb/workflow_archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ type WorkflowArchive interface {
// list workflows, with the most recently started workflows at the beginning (i.e. index 0 is the most recent)
ListWorkflows(ctx context.Context, options sutils.ListOptions) (wfv1.Workflows, error)
CountWorkflows(ctx context.Context, options sutils.ListOptions) (int64, error)
// HasMoreWorkflows efficiently checks if there are more workflows beyond the current offset+limit
// This is much faster than counting all workflows for pagination purposes
HasMoreWorkflows(ctx context.Context, options sutils.ListOptions) (bool, error)
GetWorkflow(ctx context.Context, uid string, namespace string, name string) (*wfv1.Workflow, error)
GetWorkflowForEstimator(ctx context.Context, namespace string, requirements []labels.Requirement) (*wfv1.Workflow, error)
DeleteWorkflow(ctx context.Context, uid string) error
Expand Down Expand Up @@ -275,8 +278,47 @@ func (r *workflowArchive) ListWorkflows(ctx context.Context, options sutils.List
}

func (r *workflowArchive) CountWorkflows(ctx context.Context, options sutils.ListOptions) (int64, error) {
if options.Limit > 0 && options.Offset > 0 {
return r.countWorkflowsOptimized(options)
}

total := &archivedWorkflowCount{}

if len(options.LabelRequirements) == 0 {
selector := r.session.SQL().
Select(db.Raw("count(*) as total")).
From(archiveTableName).
Where(r.clusterManagedNamespaceAndInstanceID()).
And(namespaceEqual(options.Namespace)).
And(namePrefixClause(options.NamePrefix)).
And(startedAtFromClause(options.MinStartedAt)).
And(startedAtToClause(options.MaxStartedAt)).
And(createdAfterClause(options.CreatedAfter)).
And(finishedBeforeClause(options.FinishedBefore))

if options.Name != "" {
nameFilter := options.NameFilter
if nameFilter == "" {
nameFilter = "Exact"
}
if nameFilter == "Exact" {
selector = selector.And(nameEqual(options.Name))
}
if nameFilter == "Contains" {
selector = selector.And(nameContainsClause(options.Name))
}
if nameFilter == "Prefix" {
selector = selector.And(namePrefixClause(options.Name))
}
}

err := selector.One(total)
if err != nil {
return 0, err
}
return int64(total.Total), nil
}

selector := r.session.SQL().
Select(db.Raw("count(*) as total")).
From(archiveTableName).
Expand All @@ -294,6 +336,107 @@ func (r *workflowArchive) CountWorkflows(ctx context.Context, options sutils.Lis
return int64(total.Total), nil
}

func (r *workflowArchive) countWorkflowsOptimized(options sutils.ListOptions) (int64, error) {
sampleSelector := r.session.SQL().
Select(db.Raw("count(*) as total")).
From(archiveTableName).
Where(r.clusterManagedNamespaceAndInstanceID()).
And(namespaceEqual(options.Namespace)).
And(namePrefixClause(options.NamePrefix)).
And(startedAtFromClause(options.MinStartedAt)).
And(startedAtToClause(options.MaxStartedAt)).
And(createdAfterClause(options.CreatedAfter)).
And(finishedBeforeClause(options.FinishedBefore))

if options.Name != "" {
nameFilter := options.NameFilter
if nameFilter == "" {
nameFilter = "Exact"
}
if nameFilter == "Exact" {
sampleSelector = sampleSelector.And(nameEqual(options.Name))
}
if nameFilter == "Contains" {
sampleSelector = sampleSelector.And(nameContainsClause(options.Name))
}
if nameFilter == "Prefix" {
sampleSelector = sampleSelector.And(namePrefixClause(options.Name))
}
}

if options.Offset < 1000 {
total := &archivedWorkflowCount{}
err := sampleSelector.One(total)
if err != nil {
return 0, err
}
return int64(total.Total), nil
}

sampleSize := 1000
sampleSelector = sampleSelector.Limit(sampleSize)

sampleTotal := &archivedWorkflowCount{}
err := sampleSelector.One(sampleTotal)
if err != nil {
return 0, err
}

if int64(sampleTotal.Total) < int64(sampleSize) {
return int64(sampleTotal.Total), nil
}

estimatedTotal := int64(options.Offset) + int64(sampleTotal.Total) + int64(options.Limit)
return estimatedTotal, nil
}

func (r *workflowArchive) HasMoreWorkflows(ctx context.Context, options sutils.ListOptions) (bool, error) {
selector := r.session.SQL().
Select("uid").
From(archiveTableName).
Where(r.clusterManagedNamespaceAndInstanceID()).
And(namespaceEqual(options.Namespace)).
And(namePrefixClause(options.NamePrefix)).
And(startedAtFromClause(options.MinStartedAt)).
And(startedAtToClause(options.MaxStartedAt)).
And(createdAfterClause(options.CreatedAfter)).
And(finishedBeforeClause(options.FinishedBefore))

if options.Name != "" {
nameFilter := options.NameFilter
if nameFilter == "" {
nameFilter = "Exact"
}
if nameFilter == "Exact" {
selector = selector.And(nameEqual(options.Name))
}
if nameFilter == "Contains" {
selector = selector.And(nameContainsClause(options.Name))
}
if nameFilter == "Prefix" {
selector = selector.And(namePrefixClause(options.Name))
}
}

if len(options.LabelRequirements) > 0 {
var err error
selector, err = BuildArchivedWorkflowSelector(selector, archiveTableName, archiveLabelsTableName, r.dbType, options, false)
if err != nil {
return false, err
}
}

selector = selector.Limit(1).Offset(options.Offset + options.Limit)

var result []struct{ UID string }
err := selector.All(&result)
if err != nil {
return false, err
}

return len(result) > 0, nil
}

func (r *workflowArchive) clusterManagedNamespaceAndInstanceID() *db.AndExpr {
return db.And(
db.Cond{"clustername": r.clusterName},
Expand Down
46 changes: 36 additions & 10 deletions server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,22 @@ func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.Wor
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
archivedCount, err := s.wfArchive.CountWorkflows(ctx, options)
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)

// Only calculate archivedCount when it's actually needed
// This avoids expensive COUNT queries when not necessary
var archivedCount int64
var totalCount int64
if options.ShowRemainingItemCount {
archivedCount, err = s.wfArchive.CountWorkflows(ctx, options)
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
totalCount = liveWfCount + archivedCount
} else {
// For pagination without remaining count, we can use a more efficient approach
// Just check if there are more items beyond the current page
totalCount = liveWfCount // Start with live count, will be updated if needed
}
totalCount := liveWfCount + archivedCount

// first fetch live workflows
liveWfList := &wfv1.WorkflowList{}
Expand Down Expand Up @@ -236,16 +247,31 @@ func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.Wor
if s.wfReflector != nil {
meta.ResourceVersion = s.wfReflector.LastSyncResourceVersion()
}
remainCount := totalCount - int64(options.Offset) - int64(len(wfs))
if remainCount < 0 {
remainCount = 0
var remainCount int64
if options.ShowRemainingItemCount {
// Calculate exact remaining count when requested
remainCount = totalCount - int64(options.Offset) - int64(len(wfs))
if remainCount < 0 {
remainCount = 0
}
meta.RemainingItemCount = &remainCount
} else {
// For pagination without remaining count, use the efficient HasMoreWorkflows method
// This avoids expensive COUNT queries
hasMore, err := s.wfArchive.HasMoreWorkflows(ctx, options)
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}
if hasMore {
remainCount = 1 // There are more items
} else {
remainCount = 0 // No more items
}
}

if remainCount > 0 {
meta.Continue = fmt.Sprintf("%v", options.Offset+len(wfs))
}
if options.ShowRemainingItemCount {
meta.RemainingItemCount = &remainCount
}

cleaner := fields.NewCleaner(req.Fields)
logger := logging.RequireLoggerFromContext(ctx)
Expand Down
2 changes: 2 additions & 0 deletions server/workflow/workflow_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,8 +617,10 @@ func getWorkflowServer(t *testing.T) (workflowpkg.WorkflowServiceServer, context
}
archivedRepo.On("CountWorkflows", mock.Anything, sutils.ListOptions{Namespace: "workflows", LabelRequirements: r}).Return(int64(2), nil)
archivedRepo.On("ListWorkflows", mock.Anything, sutils.ListOptions{Namespace: "workflows", Limit: -2, LabelRequirements: r}).Return(v1alpha1.Workflows{wfObj2, failedWfObj}, nil)
archivedRepo.On("HasMoreWorkflows", mock.Anything, sutils.ListOptions{Namespace: "workflows", LabelRequirements: r}).Return(false, nil)
archivedRepo.On("CountWorkflows", mock.Anything, sutils.ListOptions{Namespace: "test", LabelRequirements: r}).Return(int64(1), nil)
archivedRepo.On("ListWorkflows", mock.Anything, sutils.ListOptions{Namespace: "test", Limit: -1, LabelRequirements: r}).Return(v1alpha1.Workflows{wfObj4}, nil)
archivedRepo.On("HasMoreWorkflows", mock.Anything, sutils.ListOptions{Namespace: "test", LabelRequirements: r}).Return(false, nil)

kubeClientSet := fake.NewSimpleClientset()
kubeClientSet.PrependReactor("create", "selfsubjectaccessreviews", func(action ktesting.Action) (handled bool, ret runtime.Object, err error) {
Expand Down
Loading