Skip to content

Commit 2e99ef4

Browse files
committed
fix: safely clone index during the index flush
Signed-off-by: Chris Randles <[email protected]>
1 parent 408d53b commit 2e99ef4

File tree

2 files changed

+64
-4
lines changed

2 files changed

+64
-4
lines changed

pkg/cache/index/client.go

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ func NewIndexedClient(
6363
name: cacheName,
6464
cacheProvider: cacheProvider,
6565
cancel: cancel,
66+
forceFlush: make(chan bool),
67+
hasFlushed: make(chan bool, 1),
6668
}
6769
indexExpiry := o.IndexExpiry
6870
idx.options.Store(o)
@@ -128,6 +130,8 @@ type IndexedClient struct {
128130
lastWrite atomicx.Time `msg:"-"`
129131
isClosing atomic.Bool
130132
cancel context.CancelFunc
133+
forceFlush chan bool
134+
hasFlushed chan bool
131135
flusherExited atomic.Bool
132136
reaperExited atomic.Bool
133137
}
@@ -294,25 +298,50 @@ func (idx *IndexedClient) Close() error {
294298

295299
// flusher periodically calls the cache's index flush func that writes the cache index to disk
296300
func (idx *IndexedClient) flusher(ctx context.Context) {
301+
fi := idx.options.Load().(*options.Options).FlushInterval
302+
ticker := time.NewTicker(fi)
303+
defer ticker.Stop()
297304
FLUSHER:
298305
for {
299-
fi := idx.options.Load().(*options.Options).FlushInterval
300306
select {
301307
case <-ctx.Done():
302308
break FLUSHER
303-
case <-time.After(fi):
309+
case <-ticker.C:
304310
if idx.lastWrite.Load().Before(idx.LastFlush.Load()) {
305311
continue
306312
}
307-
idx.flushOnce()
313+
case <-idx.forceFlush:
314+
}
315+
idx.flushOnce()
316+
select {
317+
case idx.hasFlushed <- true:
318+
// signal that a flush has occurred
319+
default:
320+
// drop message if no listener
308321
}
309322
}
310323
idx.flusherExited.Store(true)
311324
}
312325

326+
// clone the msgpack encoded fields of the IndexedClient structure
327+
// not meant to be a deep clone / usable client, just a snapshot of the index state
328+
func (idx *IndexedClient) clone() *IndexedClient {
329+
clone := &IndexedClient{
330+
CacheSize: atomic.LoadInt64(&idx.CacheSize),
331+
ObjectCount: atomic.LoadInt64(&idx.ObjectCount),
332+
}
333+
clone.LastFlush.Store(idx.LastFlush.Load())
334+
idx.Objects.Range(func(key, value any) bool {
335+
clone.Objects.Store(key, value)
336+
return true
337+
})
338+
return clone
339+
}
340+
313341
func (idx *IndexedClient) flushOnce() {
314342
idx.LastFlush.Store(time.Now()) // update flush time, so that it is marshalled / stored
315-
bytes, err := idx.MarshalMsg(nil)
343+
clone := idx.clone()
344+
bytes, err := clone.MarshalMsg(nil)
316345
if err != nil {
317346
logger.Warn("unable to serialize index for flushing",
318347
logging.Pairs{"cacheName": idx.name, "detail": err.Error()})

pkg/cache/index/client_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717
package index
1818

1919
import (
20+
"fmt"
2021
"sync/atomic"
2122
"testing"
2223
"time"
2324

2425
"github.com/stretchr/testify/require"
26+
"github.com/trickstercache/trickster/v2/pkg/cache"
2527
"github.com/trickstercache/trickster/v2/pkg/cache/filesystem"
2628
fso "github.com/trickstercache/trickster/v2/pkg/cache/filesystem/options"
2729
"github.com/trickstercache/trickster/v2/pkg/cache/index/options"
@@ -196,6 +198,35 @@ func TestIndexedClient(t *testing.T) {
196198
_, s, err = ic.Retrieve("test.1")
197199
require.NoError(t, err)
198200
require.Equal(t, status.LookupStatusHit, s)
201+
202+
t.Run("flush loop", func(t *testing.T) {
203+
// test the actual flush loop by forcing it to flush, this utilizes goroutines
204+
// and should detect more potential race conditions vs the existing flush tests that use
205+
// flush internal methods
206+
ic1 := NewIndexedClient("flushTest", provider, &options.Options{
207+
ReapInterval: time.Second * 60 * 60 * 24,
208+
FlushInterval: time.Second * 60 * 60 * 24,
209+
MaxSizeObjects: 5,
210+
MaxSizeBackoffObjects: 3,
211+
MaxSizeBytes: 100,
212+
MaxSizeBackoffBytes: 30,
213+
IndexExpiry: 1 * time.Hour,
214+
}, mc)
215+
defer ic1.Close()
216+
for i := range 5 {
217+
index := fmt.Sprintf("%d", i)
218+
key := "key." + index
219+
require.NoError(t, ic1.Store(key, []byte("value1."+index), 0))
220+
}
221+
_, s, err := ic1.Client.Retrieve(IndexKey)
222+
require.Equal(t, cache.ErrKNF, err)
223+
require.Equal(t, status.LookupStatusKeyMiss, s)
224+
ic1.forceFlush <- true
225+
<-ic1.hasFlushed
226+
_, s, err = ic1.Client.Retrieve(IndexKey)
227+
require.NoError(t, err)
228+
require.Equal(t, status.LookupStatusHit, s)
229+
})
199230
})
200231

201232
/* converting */

0 commit comments

Comments
 (0)