Skip to content

Commit 65ffe04

Browse files
Merge pull request #135580 from serathius/client-go-transformer
Embed proper interface in TransformingStore to ensure DeltaFIFO and RealFIFO are implementing it Kubernetes-commit: 04e8064bccebd04981ee0094457550c9de4f92e3
2 parents 97256a6 + 171ef8c commit 65ffe04

13 files changed

+321
-167
lines changed

go.mod

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ require (
2424
golang.org/x/time v0.9.0
2525
google.golang.org/protobuf v1.36.8
2626
gopkg.in/evanphx/json-patch.v4 v4.13.0
27-
k8s.io/api v0.0.0
28-
k8s.io/apimachinery v0.0.0
27+
k8s.io/api v0.0.0-20251204222945-bbcbaa8f8665
28+
k8s.io/apimachinery v0.0.0-20251204222403-72d71eac265e
2929
k8s.io/klog/v2 v2.130.1
3030
k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912
3131
k8s.io/utils v0.0.0-20251002143259-bc988d571ff4
@@ -62,8 +62,3 @@ require (
6262
gopkg.in/inf.v0 v0.9.1 // indirect
6363
gopkg.in/yaml.v3 v3.0.1 // indirect
6464
)
65-
66-
replace (
67-
k8s.io/api => ../api
68-
k8s.io/apimachinery => ../apimachinery
69-
)

go.sum

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
1-
cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k=
21
github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0=
32
github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM=
4-
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
53
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
64
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
7-
github.com/chzyer/readline v1.5.1/go.mod h1:Eh+b79XXUwfKfcPLepksvw2tcLE/Ct21YObkaSkeBlk=
85
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
96
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
107
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
@@ -25,7 +22,6 @@ github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+Gr
2522
github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ=
2623
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
2724
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
28-
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
2925
github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg=
3026
github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
3127
github.com/google/gnostic-models v0.7.0 h1:qwTtogB15McXDaNqTZdzPJRHvaVJlAl+HVQnLmJEJxo=
@@ -41,7 +37,6 @@ github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5T
4137
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA=
4238
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 h1:+ngKgrYPPJrOjhax5N+uePQ0Fh1Z7PheYoUI/0nzkPA=
4339
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
44-
github.com/ianlancetaylor/demangle v0.0.0-20240312041847-bd984b5ce465/go.mod h1:gx7rwoVhcfuVKG5uya9Hs3Sxj7EIvldVofAWIUtGouw=
4540
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
4641
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
4742
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
@@ -98,7 +93,6 @@ go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0=
9893
go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8=
9994
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
10095
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
101-
golang.org/x/crypto v0.44.0/go.mod h1:013i+Nw79BMiQiMsOPcVCB5ZIJbYkerPrGnOa00tvmc=
10296
golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA=
10397
golang.org/x/mod v0.29.0/go.mod h1:NyhrlYXJ2H4eJiRy/WDBO6HMqZQ6q9nk4JzS3NuCK+w=
10498
golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY=
@@ -117,9 +111,6 @@ golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY=
117111
golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
118112
golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ=
119113
golang.org/x/tools v0.38.0/go.mod h1:yEsQ/d/YK8cjh0L6rZlY8tgtlKiBNTL14pGDJPJpYQs=
120-
golang.org/x/tools/go/expect v0.1.0-deprecated/go.mod h1:eihoPOH+FgIqa3FpoTwguz/bVUSGBlGQU67vpBeOrBY=
121-
golang.org/x/tools/go/packages/packagestest v0.1.1-deprecated/go.mod h1:RVAQXBGNv1ib0J382/DPCRS/BPnsGebyM1Gj5VSDpG8=
122-
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
123114
google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc=
124115
google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
125116
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
@@ -132,7 +123,10 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
132123
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
133124
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
134125
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
135-
k8s.io/gengo/v2 v2.0.0-20250604051438-85fd79dbfd9f/go.mod h1:EJykeLsmFC60UQbYJezXkEsG2FLrt0GPNkU5iK5GWxU=
126+
k8s.io/api v0.0.0-20251204222945-bbcbaa8f8665 h1:yCdvBpHA4R+NYVHh6B+ZWOmN0FhnqP1uGX9oNLhDWLw=
127+
k8s.io/api v0.0.0-20251204222945-bbcbaa8f8665/go.mod h1:etlr1bA5uFLXrSHE4hq3fjX6JCVC4aD10YGu8kktjJM=
128+
k8s.io/apimachinery v0.0.0-20251204222403-72d71eac265e h1:j68TlPomsB5ecepACqUXTMgiob1Hmx4N+VTd+jzORI8=
129+
k8s.io/apimachinery v0.0.0-20251204222403-72d71eac265e/go.mod h1:jQCgFZFR1F4Ik7hvr2g84RTJSZegBc8yHgFWKn//hns=
136130
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
137131
k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
138132
k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 h1:Y3gxNAuB0OBLImH611+UDZcmKS3g6CthxToOb37KgwE=

tools/cache/controller.go

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -708,20 +708,7 @@ func newInformer(clientState Store, options InformerOptions) Controller {
708708
// KeyLister, that way resync operations will result in the correct set
709709
// of update/delete deltas.
710710

711-
var fifo Queue
712-
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
713-
fifo = NewRealFIFOWithOptions(RealFIFOOptions{
714-
KeyFunction: MetaNamespaceKeyFunc,
715-
KnownObjects: clientState,
716-
Transformer: options.Transform,
717-
})
718-
} else {
719-
fifo = NewDeltaFIFOWithOptions(DeltaFIFOOptions{
720-
KnownObjects: clientState,
721-
EmitDeltaTypeReplaced: true,
722-
Transformer: options.Transform,
723-
})
724-
}
711+
fifo := newQueueFIFO(clientState, options.Transform)
725712

726713
cfg := &Config{
727714
Queue: fifo,
@@ -742,3 +729,19 @@ func newInformer(clientState Store, options InformerOptions) Controller {
742729
}
743730
return New(cfg)
744731
}
732+
733+
func newQueueFIFO(clientState Store, transform TransformFunc) Queue {
734+
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
735+
return NewRealFIFOWithOptions(RealFIFOOptions{
736+
KeyFunction: MetaNamespaceKeyFunc,
737+
KnownObjects: clientState,
738+
Transformer: transform,
739+
})
740+
} else {
741+
return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
742+
KnownObjects: clientState,
743+
EmitDeltaTypeReplaced: true,
744+
Transformer: transform,
745+
})
746+
}
747+
}

tools/cache/delta_fifo.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,8 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
270270
}
271271

272272
var (
273-
_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
273+
_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
274+
_ = TransformingStore(&DeltaFIFO{}) // DeltaFIFO implements TransformingStore to allow memory optimizations
274275
)
275276

276277
var (

tools/cache/delta_fifo_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import (
2828
// from the most recent Delta.
2929
// You should treat the items returned inside the deltas as immutable.
3030
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
31-
func (f *DeltaFIFO) List() []interface{} {
31+
func (f *DeltaFIFO) list() []interface{} {
3232
f.lock.RLock()
3333
defer f.lock.RUnlock()
3434
return f.listLocked()
@@ -46,7 +46,7 @@ func (f *DeltaFIFO) listLocked() []interface{} {
4646
// ListKeys returns a list of all the keys of the objects currently
4747
// in the FIFO.
4848
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
49-
func (f *DeltaFIFO) ListKeys() []string {
49+
func (f *DeltaFIFO) listKeys() []string {
5050
f.lock.RLock()
5151
defer f.lock.RUnlock()
5252
list := make([]string, 0, len(f.queue))
@@ -60,19 +60,19 @@ func (f *DeltaFIFO) ListKeys() []string {
6060
// or sets exists=false.
6161
// You should treat the items returned inside the deltas as immutable.
6262
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
63-
func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
63+
func (f *DeltaFIFO) get(obj interface{}) (item interface{}, exists bool, err error) {
6464
key, err := f.KeyOf(obj)
6565
if err != nil {
6666
return nil, false, KeyError{obj, err}
6767
}
68-
return f.GetByKey(key)
68+
return f.getByKey(key)
6969
}
7070

7171
// GetByKey returns the complete list of deltas for the requested item,
7272
// setting exists=false if that list is empty.
7373
// You should treat the items returned inside the deltas as immutable.
7474
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
75-
func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
75+
func (f *DeltaFIFO) getByKey(key string) (item interface{}, exists bool, err error) {
7676
f.lock.RLock()
7777
defer f.lock.RUnlock()
7878
d, exists := f.items[key]
@@ -320,10 +320,10 @@ func TestDeltaFIFO_addUpdate(t *testing.T) {
320320
f.Update(mkFifoObj("foo", 12))
321321
f.Delete(mkFifoObj("foo", 15))
322322

323-
if e, a := []interface{}{mkFifoObj("foo", 15)}, f.List(); !reflect.DeepEqual(e, a) {
323+
if e, a := []interface{}{mkFifoObj("foo", 15)}, f.list(); !reflect.DeepEqual(e, a) {
324324
t.Errorf("Expected %+v, got %+v", e, a)
325325
}
326-
if e, a := []string{"foo"}, f.ListKeys(); !reflect.DeepEqual(e, a) {
326+
if e, a := []string{"foo"}, f.listKeys(); !reflect.DeepEqual(e, a) {
327327
t.Errorf("Expected %+v, got %+v", e, a)
328328
}
329329

@@ -349,7 +349,7 @@ func TestDeltaFIFO_addUpdate(t *testing.T) {
349349
t.Errorf("Got second value %v", unexpected.val)
350350
case <-time.After(50 * time.Millisecond):
351351
}
352-
_, exists, _ := f.Get(mkFifoObj("foo", ""))
352+
_, exists, _ := f.get(mkFifoObj("foo", ""))
353353
if exists {
354354
t.Errorf("item did not get removed")
355355
}
@@ -397,7 +397,7 @@ func TestDeltaFIFO_transformer(t *testing.T) {
397397
must(f.Replace([]interface{}{}, ""))
398398

399399
// Should be empty
400-
if e, a := []string{"foo", "bar"}, f.ListKeys(); !reflect.DeepEqual(e, a) {
400+
if e, a := []string{"foo", "bar"}, f.listKeys(); !reflect.DeepEqual(e, a) {
401401
t.Errorf("Expected %+v, got %+v", e, a)
402402
}
403403

@@ -507,7 +507,7 @@ func TestDeltaFIFO_addReplace(t *testing.T) {
507507
t.Errorf("Got second value %v", unexpected.val)
508508
case <-time.After(50 * time.Millisecond):
509509
}
510-
_, exists, _ := f.Get(mkFifoObj("foo", ""))
510+
_, exists, _ := f.get(mkFifoObj("foo", ""))
511511
if exists {
512512
t.Errorf("item did not get removed")
513513
}
@@ -991,7 +991,7 @@ func BenchmarkDeltaFIFOListKeys(b *testing.B) {
991991
b.ResetTimer()
992992
b.RunParallel(func(pb *testing.PB) {
993993
for pb.Next() {
994-
_ = f.ListKeys()
994+
_ = f.listKeys()
995995
}
996996
})
997997
b.StopTimer()

tools/cache/reflector.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ type ReflectorStore interface {
7979
// TransformingStore is an optional interface that can be implemented by the provided store.
8080
// If implemented on the provided store reflector will use the same transformer in its internal stores.
8181
type TransformingStore interface {
82-
Store
82+
ReflectorStore
8383
Transformer() TransformFunc
8484
}
8585

@@ -733,9 +733,11 @@ func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) {
733733
return false
734734
}
735735

736+
var transformer TransformFunc
736737
storeOpts := []StoreOption{}
737738
if tr, ok := r.store.(TransformingStore); ok && tr.Transformer() != nil {
738-
storeOpts = append(storeOpts, WithTransformer(tr.Transformer()))
739+
transformer = tr.Transformer()
740+
storeOpts = append(storeOpts, WithTransformer(transformer))
739741
}
740742

741743
initTrace := trace.New("Reflector WatchList", trace.Field{Key: "name", Value: r.name})
@@ -795,7 +797,7 @@ func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) {
795797
// we utilize the temporaryStore to ensure independence from the current store implementation.
796798
// as of today, the store is implemented as a queue and will be drained by the higher-level
797799
// component as soon as it finishes replacing the content.
798-
checkWatchListDataConsistencyIfRequested(ctx, r.name, resourceVersion, r.listerWatcher.ListWithContext, temporaryStore.List)
800+
checkWatchListDataConsistencyIfRequested(ctx, r.name, resourceVersion, r.listerWatcher.ListWithContext, transformer, temporaryStore.List)
799801

800802
if err := r.store.Replace(temporaryStore.List(), resourceVersion); err != nil {
801803
return nil, fmt.Errorf("unable to sync watch-list result: %w", err)

tools/cache/reflector_data_consistency_detector.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@ import (
3333
//
3434
// Note that this function will panic when data inconsistency is detected.
3535
// This is intentional because we want to catch it in the CI.
36-
func checkWatchListDataConsistencyIfRequested[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn consistencydetector.ListFunc[T], retrieveItemsFn consistencydetector.RetrieveItemsFunc[U]) {
36+
func checkWatchListDataConsistencyIfRequested[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn consistencydetector.ListFunc[T], listItemTransformFunc func(interface{}) (interface{}, error), retrieveItemsFn consistencydetector.RetrieveItemsFunc[U]) {
3737
if !consistencydetector.IsDataConsistencyDetectionForWatchListEnabled() {
3838
return
3939
}
4040
// for informers we pass an empty ListOptions because
4141
// listFn might be wrapped for filtering during informer construction.
42-
consistencydetector.CheckDataConsistency(ctx, identity, lastSyncedResourceVersion, listFn, metav1.ListOptions{}, retrieveItemsFn)
42+
consistencydetector.CheckDataConsistency(ctx, identity, lastSyncedResourceVersion, listFn, listItemTransformFunc, metav1.ListOptions{}, retrieveItemsFn)
4343
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package cache
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"testing"
23+
"time"
24+
25+
v1 "k8s.io/api/core/v1"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
"k8s.io/apimachinery/pkg/runtime"
28+
"k8s.io/apimachinery/pkg/util/wait"
29+
"k8s.io/apimachinery/pkg/watch"
30+
clientfeatures "k8s.io/client-go/features"
31+
clientfeaturestesting "k8s.io/client-go/features/testing"
32+
"k8s.io/client-go/util/consistencydetector"
33+
"k8s.io/klog/v2/ktesting"
34+
)
35+
36+
func TestReflectorDataConsistencyDetector(t *testing.T) {
37+
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true)
38+
restore := consistencydetector.SetDataConsistencyDetectionForWatchListEnabledForTest(true)
39+
defer restore()
40+
41+
markTransformed := func(obj interface{}) (interface{}, error) {
42+
pod, ok := obj.(*v1.Pod)
43+
if !ok {
44+
return obj, nil
45+
}
46+
newPod := pod.DeepCopy()
47+
if newPod.Labels == nil {
48+
newPod.Labels = make(map[string]string)
49+
}
50+
newPod.Labels["transformed"] = "true"
51+
return newPod, nil
52+
}
53+
54+
for _, inOrder := range []bool{false, true} {
55+
t.Run(fmt.Sprintf("InOrder=%v", inOrder), func(t *testing.T) {
56+
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.InOrderInformers, inOrder)
57+
for _, transformerEnabled := range []bool{false, true} {
58+
var transformer TransformFunc
59+
if transformerEnabled {
60+
transformer = markTransformed
61+
}
62+
t.Run(fmt.Sprintf("Transformer=%v", transformerEnabled), func(t *testing.T) {
63+
runTestReflectorDataConsistencyDetector(t, transformer)
64+
})
65+
}
66+
})
67+
}
68+
}
69+
70+
func runTestReflectorDataConsistencyDetector(t *testing.T, transformer TransformFunc) {
71+
_, ctx := ktesting.NewTestContext(t)
72+
ctx, cancel := context.WithCancel(ctx)
73+
defer cancel()
74+
75+
store := NewStore(MetaNamespaceKeyFunc)
76+
fifo := newQueueFIFO(store, transformer)
77+
78+
lw := &ListWatch{
79+
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
80+
return &v1.PodList{
81+
ListMeta: metav1.ListMeta{ResourceVersion: "1"},
82+
Items: []v1.Pod{
83+
{ObjectMeta: metav1.ObjectMeta{Name: "pod-1", ResourceVersion: "1"}},
84+
},
85+
}, nil
86+
},
87+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
88+
w := watch.NewFake()
89+
go func() {
90+
w.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod-1", ResourceVersion: "1"}})
91+
w.Action(watch.Bookmark, &v1.Pod{ObjectMeta: metav1.ObjectMeta{
92+
Name: "pod-1",
93+
ResourceVersion: "1",
94+
Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"},
95+
}})
96+
}()
97+
return w, nil
98+
},
99+
}
100+
101+
r := NewReflector(lw, &v1.Pod{}, fifo, 0)
102+
103+
go func() {
104+
_ = wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
105+
return r.LastSyncResourceVersion() != "", nil
106+
})
107+
cancel()
108+
}()
109+
110+
err := r.ListAndWatchWithContext(ctx)
111+
if err != nil {
112+
t.Errorf("ListAndWatchWithContext returned error: %v", err)
113+
}
114+
}

0 commit comments

Comments
 (0)