Skip to content

Commit 179571e

Browse files
authored
feat(server): optimize pagination when counting workflows in archive. Fixes:#13948 (#14892)
Signed-off-by: shuangkun <[email protected]>
1 parent 9510408 commit 179571e

File tree

6 files changed

+257
-10
lines changed

6 files changed

+257
-10
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
Component: UI
2+
Issues: 13948
3+
Description: Optimize pagination performance when counting workflows in archive.
4+
Author: [Shuangkun Tian](https://github.com/shuangkun)
5+
6+
When querying archived workflows with pagination, the system now uses more efficient methods to check if there are more items available. Instead of performing expensive full table scans, the new implementation uses LIMIT queries to check if there are items beyond the current offset+limit, significantly improving performance for large datasets.

persist/sqldb/mocks/WorkflowArchive.go

Lines changed: 66 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

persist/sqldb/null_workflow_archive.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ func (r *nullWorkflowArchive) CountWorkflows(ctx context.Context, options sutils
3131
return 0, nil
3232
}
3333

34+
func (r *nullWorkflowArchive) HasMoreWorkflows(ctx context.Context, options sutils.ListOptions) (bool, error) {
35+
return false, nil
36+
}
37+
3438
func (r *nullWorkflowArchive) GetWorkflow(ctx context.Context, uid string, namespace string, name string) (*wfv1.Workflow, error) {
3539
return nil, fmt.Errorf("getting archived workflows not supported")
3640
}

persist/sqldb/workflow_archive.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ type WorkflowArchive interface {
7272
// list workflows, with the most recently started workflows at the beginning (i.e. index 0 is the most recent)
7373
ListWorkflows(ctx context.Context, options sutils.ListOptions) (wfv1.Workflows, error)
7474
CountWorkflows(ctx context.Context, options sutils.ListOptions) (int64, error)
75+
// HasMoreWorkflows efficiently checks if there are more workflows beyond the current offset+limit
76+
// This is much faster than counting all workflows for pagination purposes
77+
HasMoreWorkflows(ctx context.Context, options sutils.ListOptions) (bool, error)
7578
GetWorkflow(ctx context.Context, uid string, namespace string, name string) (*wfv1.Workflow, error)
7679
GetWorkflowForEstimator(ctx context.Context, namespace string, requirements []labels.Requirement) (*wfv1.Workflow, error)
7780
DeleteWorkflow(ctx context.Context, uid string) error
@@ -275,8 +278,47 @@ func (r *workflowArchive) ListWorkflows(ctx context.Context, options sutils.List
275278
}
276279

277280
func (r *workflowArchive) CountWorkflows(ctx context.Context, options sutils.ListOptions) (int64, error) {
281+
if options.Limit > 0 && options.Offset > 0 {
282+
return r.countWorkflowsOptimized(options)
283+
}
284+
278285
total := &archivedWorkflowCount{}
279286

287+
if len(options.LabelRequirements) == 0 {
288+
selector := r.session.SQL().
289+
Select(db.Raw("count(*) as total")).
290+
From(archiveTableName).
291+
Where(r.clusterManagedNamespaceAndInstanceID()).
292+
And(namespaceEqual(options.Namespace)).
293+
And(namePrefixClause(options.NamePrefix)).
294+
And(startedAtFromClause(options.MinStartedAt)).
295+
And(startedAtToClause(options.MaxStartedAt)).
296+
And(createdAfterClause(options.CreatedAfter)).
297+
And(finishedBeforeClause(options.FinishedBefore))
298+
299+
if options.Name != "" {
300+
nameFilter := options.NameFilter
301+
if nameFilter == "" {
302+
nameFilter = "Exact"
303+
}
304+
if nameFilter == "Exact" {
305+
selector = selector.And(nameEqual(options.Name))
306+
}
307+
if nameFilter == "Contains" {
308+
selector = selector.And(nameContainsClause(options.Name))
309+
}
310+
if nameFilter == "Prefix" {
311+
selector = selector.And(namePrefixClause(options.Name))
312+
}
313+
}
314+
315+
err := selector.One(total)
316+
if err != nil {
317+
return 0, err
318+
}
319+
return int64(total.Total), nil
320+
}
321+
280322
selector := r.session.SQL().
281323
Select(db.Raw("count(*) as total")).
282324
From(archiveTableName).
@@ -294,6 +336,107 @@ func (r *workflowArchive) CountWorkflows(ctx context.Context, options sutils.Lis
294336
return int64(total.Total), nil
295337
}
296338

339+
func (r *workflowArchive) countWorkflowsOptimized(options sutils.ListOptions) (int64, error) {
340+
sampleSelector := r.session.SQL().
341+
Select(db.Raw("count(*) as total")).
342+
From(archiveTableName).
343+
Where(r.clusterManagedNamespaceAndInstanceID()).
344+
And(namespaceEqual(options.Namespace)).
345+
And(namePrefixClause(options.NamePrefix)).
346+
And(startedAtFromClause(options.MinStartedAt)).
347+
And(startedAtToClause(options.MaxStartedAt)).
348+
And(createdAfterClause(options.CreatedAfter)).
349+
And(finishedBeforeClause(options.FinishedBefore))
350+
351+
if options.Name != "" {
352+
nameFilter := options.NameFilter
353+
if nameFilter == "" {
354+
nameFilter = "Exact"
355+
}
356+
if nameFilter == "Exact" {
357+
sampleSelector = sampleSelector.And(nameEqual(options.Name))
358+
}
359+
if nameFilter == "Contains" {
360+
sampleSelector = sampleSelector.And(nameContainsClause(options.Name))
361+
}
362+
if nameFilter == "Prefix" {
363+
sampleSelector = sampleSelector.And(namePrefixClause(options.Name))
364+
}
365+
}
366+
367+
if options.Offset < 1000 {
368+
total := &archivedWorkflowCount{}
369+
err := sampleSelector.One(total)
370+
if err != nil {
371+
return 0, err
372+
}
373+
return int64(total.Total), nil
374+
}
375+
376+
sampleSize := 1000
377+
sampleSelector = sampleSelector.Limit(sampleSize)
378+
379+
sampleTotal := &archivedWorkflowCount{}
380+
err := sampleSelector.One(sampleTotal)
381+
if err != nil {
382+
return 0, err
383+
}
384+
385+
if int64(sampleTotal.Total) < int64(sampleSize) {
386+
return int64(sampleTotal.Total), nil
387+
}
388+
389+
estimatedTotal := int64(options.Offset) + int64(sampleTotal.Total) + int64(options.Limit)
390+
return estimatedTotal, nil
391+
}
392+
393+
func (r *workflowArchive) HasMoreWorkflows(ctx context.Context, options sutils.ListOptions) (bool, error) {
394+
selector := r.session.SQL().
395+
Select("uid").
396+
From(archiveTableName).
397+
Where(r.clusterManagedNamespaceAndInstanceID()).
398+
And(namespaceEqual(options.Namespace)).
399+
And(namePrefixClause(options.NamePrefix)).
400+
And(startedAtFromClause(options.MinStartedAt)).
401+
And(startedAtToClause(options.MaxStartedAt)).
402+
And(createdAfterClause(options.CreatedAfter)).
403+
And(finishedBeforeClause(options.FinishedBefore))
404+
405+
if options.Name != "" {
406+
nameFilter := options.NameFilter
407+
if nameFilter == "" {
408+
nameFilter = "Exact"
409+
}
410+
if nameFilter == "Exact" {
411+
selector = selector.And(nameEqual(options.Name))
412+
}
413+
if nameFilter == "Contains" {
414+
selector = selector.And(nameContainsClause(options.Name))
415+
}
416+
if nameFilter == "Prefix" {
417+
selector = selector.And(namePrefixClause(options.Name))
418+
}
419+
}
420+
421+
if len(options.LabelRequirements) > 0 {
422+
var err error
423+
selector, err = BuildArchivedWorkflowSelector(selector, archiveTableName, archiveLabelsTableName, r.dbType, options, false)
424+
if err != nil {
425+
return false, err
426+
}
427+
}
428+
429+
selector = selector.Limit(1).Offset(options.Offset + options.Limit)
430+
431+
var result []struct{ UID string }
432+
err := selector.All(&result)
433+
if err != nil {
434+
return false, err
435+
}
436+
437+
return len(result) > 0, nil
438+
}
439+
297440
func (r *workflowArchive) clusterManagedNamespaceAndInstanceID() *db.AndExpr {
298441
return db.And(
299442
db.Cond{"clustername": r.clusterName},

server/workflow/workflow_server.go

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -201,11 +201,22 @@ func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.Wor
201201
if err != nil {
202202
return nil, sutils.ToStatusError(err, codes.Internal)
203203
}
204-
archivedCount, err := s.wfArchive.CountWorkflows(ctx, options)
205-
if err != nil {
206-
return nil, sutils.ToStatusError(err, codes.Internal)
204+
205+
// Only calculate archivedCount when it's actually needed
206+
// This avoids expensive COUNT queries when not necessary
207+
var archivedCount int64
208+
var totalCount int64
209+
if options.ShowRemainingItemCount {
210+
archivedCount, err = s.wfArchive.CountWorkflows(ctx, options)
211+
if err != nil {
212+
return nil, sutils.ToStatusError(err, codes.Internal)
213+
}
214+
totalCount = liveWfCount + archivedCount
215+
} else {
216+
// For pagination without remaining count, we can use a more efficient approach
217+
// Just check if there are more items beyond the current page
218+
totalCount = liveWfCount // Start with live count, will be updated if needed
207219
}
208-
totalCount := liveWfCount + archivedCount
209220

210221
// first fetch live workflows
211222
liveWfList := &wfv1.WorkflowList{}
@@ -236,16 +247,31 @@ func (s *workflowServer) ListWorkflows(ctx context.Context, req *workflowpkg.Wor
236247
if s.wfReflector != nil {
237248
meta.ResourceVersion = s.wfReflector.LastSyncResourceVersion()
238249
}
239-
remainCount := totalCount - int64(options.Offset) - int64(len(wfs))
240-
if remainCount < 0 {
241-
remainCount = 0
250+
var remainCount int64
251+
if options.ShowRemainingItemCount {
252+
// Calculate exact remaining count when requested
253+
remainCount = totalCount - int64(options.Offset) - int64(len(wfs))
254+
if remainCount < 0 {
255+
remainCount = 0
256+
}
257+
meta.RemainingItemCount = &remainCount
258+
} else {
259+
// For pagination without remaining count, use the efficient HasMoreWorkflows method
260+
// This avoids expensive COUNT queries
261+
hasMore, err := s.wfArchive.HasMoreWorkflows(ctx, options)
262+
if err != nil {
263+
return nil, sutils.ToStatusError(err, codes.Internal)
264+
}
265+
if hasMore {
266+
remainCount = 1 // There are more items
267+
} else {
268+
remainCount = 0 // No more items
269+
}
242270
}
271+
243272
if remainCount > 0 {
244273
meta.Continue = fmt.Sprintf("%v", options.Offset+len(wfs))
245274
}
246-
if options.ShowRemainingItemCount {
247-
meta.RemainingItemCount = &remainCount
248-
}
249275

250276
cleaner := fields.NewCleaner(req.Fields)
251277
logger := logging.RequireLoggerFromContext(ctx)

server/workflow/workflow_server_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -617,8 +617,10 @@ func getWorkflowServer(t *testing.T) (workflowpkg.WorkflowServiceServer, context
617617
}
618618
archivedRepo.On("CountWorkflows", mock.Anything, sutils.ListOptions{Namespace: "workflows", LabelRequirements: r}).Return(int64(2), nil)
619619
archivedRepo.On("ListWorkflows", mock.Anything, sutils.ListOptions{Namespace: "workflows", Limit: -2, LabelRequirements: r}).Return(v1alpha1.Workflows{wfObj2, failedWfObj}, nil)
620+
archivedRepo.On("HasMoreWorkflows", mock.Anything, sutils.ListOptions{Namespace: "workflows", LabelRequirements: r}).Return(false, nil)
620621
archivedRepo.On("CountWorkflows", mock.Anything, sutils.ListOptions{Namespace: "test", LabelRequirements: r}).Return(int64(1), nil)
621622
archivedRepo.On("ListWorkflows", mock.Anything, sutils.ListOptions{Namespace: "test", Limit: -1, LabelRequirements: r}).Return(v1alpha1.Workflows{wfObj4}, nil)
623+
archivedRepo.On("HasMoreWorkflows", mock.Anything, sutils.ListOptions{Namespace: "test", LabelRequirements: r}).Return(false, nil)
622624

623625
kubeClientSet := fake.NewSimpleClientset()
624626
kubeClientSet.PrependReactor("create", "selfsubjectaccessreviews", func(action ktesting.Action) (handled bool, ret runtime.Object, err error) {

0 commit comments

Comments
 (0)