Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions pkg/cache/index/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ func NewIndexedClient(
cancel: cancel,
forceFlush: make(chan bool),
hasFlushed: make(chan bool, 1),
forceReap: make(chan bool),
hasReaped: make(chan bool, 1),
}
indexExpiry := o.IndexExpiry
idx.options.Store(o)
Expand Down Expand Up @@ -130,10 +132,14 @@ type IndexedClient struct {
lastWrite atomicx.Time `msg:"-"`
isClosing atomic.Bool
cancel context.CancelFunc
forceFlush chan bool
hasFlushed chan bool
flusherExited atomic.Bool
reaperExited atomic.Bool

// used only in tests: fields to interact with client goroutines
forceFlush chan bool
hasFlushed chan bool
forceReap chan bool
hasReaped chan bool
}

// Clear the index from its currently tracked cache objects
Expand Down Expand Up @@ -357,7 +363,14 @@ REAPER:
case <-ctx.Done():
break REAPER
case <-time.After(ri):
idx.reap()
case <-idx.forceReap:
}
idx.reap()
select {
case idx.hasReaped <- true:
// signal that a reap has occurred
default:
// drop message if no listener
}
}
idx.reaperExited.Store(true)
Expand Down
48 changes: 48 additions & 0 deletions pkg/cache/index/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,54 @@ func TestIndexedClient(t *testing.T) {
require.NoError(t, err)
require.Equal(t, status.LookupStatusHit, s)
})

t.Run("reap loop", func(t *testing.T) {
// test the actual reap loop by forcing it to reap, this utilizes goroutines
// and should detect more potential race conditions vs the existing reap tests that use
// reap internal methods
ic2 := NewIndexedClient("reapTest", provider, &options.Options{
ReapInterval: time.Second * 60 * 60 * 24,
FlushInterval: time.Second * 60 * 60 * 24,
MaxSizeObjects: 5,
MaxSizeBackoffObjects: 5,
MaxSizeBytes: 10000,
MaxSizeBackoffBytes: 300,
IndexExpiry: 1 * time.Hour,
}, mc)
defer ic2.Close()

// write 5 objects, expect 5 objects
for i := range 5 {
index := fmt.Sprintf("%d", i)
key := "key." + index
require.NoError(t, ic2.Store(key, []byte("value1."+index), 0))
}
state := getIndexedClientState(ic2)
require.Equal(t, int64(5), state.ObjectCount)
require.Len(t, state.Objects, 5)
// force reap, expect 5 objects still
ic2.forceReap <- true
<-ic2.hasReaped
state = getIndexedClientState(ic2)
require.Equal(t, int64(5), state.ObjectCount)
require.Len(t, state.Objects, 5)

// write more objects, then force reap to trigger (count based) eviction
for i := range 5 {
index := fmt.Sprintf("%d", i)
key := "another.key." + index
require.NoError(t, ic2.Store(key, []byte("value1."+index), 0))
}
state = getIndexedClientState(ic2)
require.Equal(t, int64(10), state.ObjectCount)
require.Equal(t, len(state.Objects), 10)
// force reap, expect some evictions (back to the MaxSizeObjects count)
ic2.forceReap <- true
<-ic2.hasReaped
state = getIndexedClientState(ic2)
require.Equal(t, int64(5), state.ObjectCount)
require.Equal(t, len(state.Objects), 5)
})
})

/* converting */
Expand Down
Loading