Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 6 additions & 13 deletions pkg/epp/framework/plugins/scheduling/scorer/prefix/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ func newIndexer(ctx context.Context, defaultLRUSize int) *indexer {
// Add adds a list of prefix hashes to the cache, tied to the server.
func (i *indexer) Add(hashes []BlockHash, pod Server) {
i.mu.Lock()
defer i.mu.Unlock()

// Check if the LRU pod exist
lruForPod, exists := i.podToLRU[pod.ServerID]
if !exists {
Expand All @@ -64,15 +66,12 @@ func (i *indexer) Add(hashes []BlockHash, pod Server) {
lruForPod = newLRU
}

i.mu.Unlock()

// Add to LRU (may evict)
for _, hash := range hashes {
lruForPod.Add(hash, struct{}{})
}

// Update hashToPods once under lock
i.mu.Lock()
// Update hashToPods
for _, hash := range hashes {
podIDs := i.hashToPods[hash]
if podIDs == nil {
Expand All @@ -81,8 +80,6 @@ func (i *indexer) Add(hashes []BlockHash, pod Server) {
podIDs[pod.ServerID] = struct{}{}
i.hashToPods[hash] = podIDs
}

i.mu.Unlock()
}

// Get returns a set of servers that have the given prefix hash cached.
Expand All @@ -103,8 +100,6 @@ func (i *indexer) Get(hash BlockHash) podSet {
// makeEvictionFn returns a per-pod LRU eviction callback that removes the pod from hashToPods on eviction.
func (i *indexer) makeEvictionFn(pod ServerID) func(BlockHash, struct{}) {
return func(hash BlockHash, _ struct{}) {
i.mu.Lock()
defer i.mu.Unlock()
// Remove the pod from the hash→pods map
if podSet, ok := i.hashToPods[hash]; ok {
delete(podSet, pod)
Expand Down Expand Up @@ -156,10 +151,10 @@ func (i *indexer) reportLRUSize(ctx context.Context, interval time.Duration) {

// RemovePod removes a pod and its associated entries from the indexer.
func (i *indexer) RemovePod(pod ServerID) {
i.mu.RLock()
lruCache, exists := i.podToLRU[pod]
i.mu.RUnlock()
i.mu.Lock()
defer i.mu.Unlock()

lruCache, exists := i.podToLRU[pod]
if !exists {
return
}
Expand All @@ -169,9 +164,7 @@ func (i *indexer) RemovePod(pod ServerID) {
lruCache.Remove(hash)
}

i.mu.Lock()
delete(i.podToLRU, pod)
i.mu.Unlock()
}

// Pods returns the list of all pods currently tracked in the indexer.
Expand Down
21 changes: 21 additions & 0 deletions pkg/epp/framework/plugins/scheduling/scorer/prefix/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package prefix

import (
"context"
"sync"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -110,3 +111,23 @@ func TestIndexer_RemovePodAndEviction(t *testing.T) {
// Ensure hashToPods contains exactly indexerSize hashes (post-eviction and server2 removal)
assert.Len(t, i.hashToPods, indexerSize, "hashToPods should contain %d hashes after cleanup", indexerSize)
}

func TestIndexer_ConcurrentAddRemovePod(t *testing.T) {
lruSize := 10
for iter := range 100 {
i := newIndexer(context.Background(), lruSize)
pod := Server{ServerID: ServerID{Namespace: "default", Name: "pod1"}}

var wg sync.WaitGroup
wg.Add(2)
go func() { defer wg.Done(); i.Add([]BlockHash{1, 2, 3}, pod) }()
go func() { defer wg.Done(); i.RemovePod(pod.ServerID) }()
wg.Wait()

if _, exists := i.podToLRU[pod.ServerID]; !exists {
for hash, pods := range i.hashToPods {
assert.NotContains(t, pods, pod.ServerID, "iter %d: hashToPods[%v] references removed pod", iter, hash)
}
}
}
}
Loading