@@ -481,24 +481,39 @@ func TestCacheConcurrentWrites(t *testing.T) {
481481
482482 // Launch 500 concurrent writers
483483 numWriters := 500
484+ entriesPerWriter := 10
484485 done := make (chan bool , numWriters )
485486
487+ // Track all entries written for later verification
488+ type entryInfo struct {
489+ params []pipelinev1.Param
490+ expectedData string
491+ }
492+ allEntries := make ([]entryInfo , numWriters * entriesPerWriter )
493+
486494 for i := range numWriters {
487495 go func (writerID int ) {
488496 defer func () { done <- true }()
489497
490498 // Each writer adds 10 unique entries
491- for j := range 10 {
499+ for j := range entriesPerWriter {
492500 params := []pipelinev1.Param {
493501 {Name : "bundle" , Value : pipelinev1.ParamValue {
494502 Type : pipelinev1 .ParamTypeString ,
495503 StringVal : fmt .Sprintf ("registry.io/writer%d-entry%d@sha256:%064d" , writerID , j , writerID * 100 + j ),
496504 }},
497505 }
506+ expectedData := fmt .Sprintf ("writer-%d-data-%d" , writerID , j )
498507 mockResource := & mockResolvedResource {
499- data : []byte (fmt . Sprintf ( "writer-%d-data-%d" , writerID , j ) ),
508+ data : []byte (expectedData ),
500509 }
501510 cache .Add (resolverType , params , mockResource )
511+
512+ // Record this entry for verification
513+ allEntries [writerID * entriesPerWriter + j ] = entryInfo {
514+ params : params ,
515+ expectedData : expectedData ,
516+ }
502517 }
503518 }(i )
504519 }
@@ -508,45 +523,39 @@ func TestCacheConcurrentWrites(t *testing.T) {
508523 <- done
509524 }
510525
511- // After all concurrent writes, add some entries synchronously
512- // These are guaranteed to be the most recent entries in the cache
513- numSyncEntries := 20
514- syncEntries := make ([][]pipelinev1.Param , numSyncEntries )
515- for i := range numSyncEntries {
516- params := []pipelinev1.Param {
517- {Name : "bundle" , Value : pipelinev1.ParamValue {
518- Type : pipelinev1 .ParamTypeString ,
519- StringVal : fmt .Sprintf ("registry.io/sync-entry%d@sha256:%064d" , i , 999000 + i ),
520- }},
521- }
522- syncEntries [i ] = params
523- mockResource := & mockResolvedResource {
524- data : []byte (fmt .Sprintf ("sync-data-%d" , i )),
525- }
526- cache .Add (resolverType , params , mockResource )
527- }
528-
529- // Verify all synchronous entries are retrievable
530- // Since they were written most recently, they should all be in cache
526+ // Verify that concurrent writes are retrievable and have correct data
527+ // With 5000 entries (500 writers * 10 entries each) and cache size of 1000,
528+ // we expect many entries to be evicted due to LRU. We verify that:
529+ // 1. Entries that ARE in cache have the correct data
530+ // 2. We get a reasonable hit rate
531531 cachedCount := 0
532- for i , params := range syncEntries {
533- cached , ok := cache . Get ( resolverType , params )
534- if ! ok || cached == nil {
535- t . Errorf ( "Expected cache hit for sync entry %d, but got miss" , i )
536- } else {
532+ wrongDataCount := 0
533+
534+ for _ , entry := range allEntries {
535+ cached , ok := cache . Get ( resolverType , entry . params )
536+ if ok {
537537 cachedCount ++
538538 // Verify the data is correct
539- expectedData := fmt . Sprintf ( "sync-data-%d" , i )
540- if string ( cached . Data ()) != expectedData {
541- t .Errorf ("Expected data '%s' for sync entry %d , got '%s'" , expectedData , i , string (cached .Data ()))
539+ if string ( cached . Data ()) != entry . expectedData {
540+ wrongDataCount ++
541+ t .Errorf ("Expected data '%s', got '%s'" , entry . expectedData , string (cached .Data ()))
542542 }
543543 }
544544 }
545545
546- // All synchronous entries should be in cache since they were written most recently
547- if cachedCount != numSyncEntries {
548- t .Errorf ("Expected all %d synchronous entries to be cached, but only found %d" , numSyncEntries , cachedCount )
546+ // We should have no entries with wrong data
547+ if wrongDataCount > 0 {
548+ t .Errorf ("Found %d entries with wrong data" , wrongDataCount )
549+ }
550+
551+ // We should have some reasonable number of entries cached
552+ // With 5000 entries and cache size 1000, we expect close to 1000 entries to be cached
553+ // Using a lower bound of 500 to account for concurrent evictions and timing
554+ if cachedCount < 500 {
555+ t .Errorf ("Expected at least 500 entries to be cached, but only found %d out of %d total" , cachedCount , len (allEntries ))
549556 }
557+
558+ t .Logf ("Concurrent write test passed: %d entries cached out of %d written" , cachedCount , len (allEntries ))
550559}
551560
552561func TestCacheConcurrentReadWrite (t * testing.T ) {
@@ -574,9 +583,17 @@ func TestCacheConcurrentReadWrite(t *testing.T) {
574583 // Launch 300 readers and 300 writers concurrently
575584 numReaders := 300
576585 numWriters := 300
586+ entriesPerWriter := 5
577587 totalGoroutines := numReaders + numWriters
578588 done := make (chan bool , totalGoroutines )
579589
590+ // Track entries written by concurrent writers for later verification
591+ type entryInfo struct {
592+ params []pipelinev1.Param
593+ expectedData string
594+ }
595+ writerEntries := make ([]entryInfo , numWriters * entriesPerWriter )
596+
580597 // Start readers
581598 for i := range numReaders {
582599 go func (readerID int ) {
@@ -594,17 +611,24 @@ func TestCacheConcurrentReadWrite(t *testing.T) {
594611 go func (writerID int ) {
595612 defer func () { done <- true }()
596613
597- for j := range 5 {
614+ for j := range entriesPerWriter {
598615 params := []pipelinev1.Param {
599616 {Name : "bundle" , Value : pipelinev1.ParamValue {
600617 Type : pipelinev1 .ParamTypeString ,
601618 StringVal : fmt .Sprintf ("registry.io/concurrent-writer%d-entry%d@sha256:%064d" , writerID , j , writerID * 10 + j ),
602619 }},
603620 }
621+ expectedData := fmt .Sprintf ("concurrent-writer-%d-data-%d" , writerID , j )
604622 mockResource := & mockResolvedResource {
605- data : []byte (fmt . Sprintf ( "concurrent-writer-%d-data-%d" , writerID , j ) ),
623+ data : []byte (expectedData ),
606624 }
607625 cache .Add (resolverType , params , mockResource )
626+
627+ // Record this entry for verification
628+ writerEntries [writerID * entriesPerWriter + j ] = entryInfo {
629+ params : params ,
630+ expectedData : expectedData ,
631+ }
608632 }
609633 }(i )
610634 }
@@ -613,6 +637,40 @@ func TestCacheConcurrentReadWrite(t *testing.T) {
613637 for range totalGoroutines {
614638 <- done
615639 }
640+
641+ // Verify that concurrent writes are retrievable and have correct data
642+ // With 1500 new entries (300 writers * 5 entries each) plus 100 initial entries,
643+ // and cache size of 500, we expect some entries to be evicted. We verify that:
644+ // 1. Entries that ARE in cache have the correct data
645+ // 2. We get a reasonable hit rate for the concurrent writes
646+ cachedCount := 0
647+ wrongDataCount := 0
648+
649+ for _ , entry := range writerEntries {
650+ cached , ok := cache .Get (resolverType , entry .params )
651+ if ok {
652+ cachedCount ++
653+ // Verify the data is correct
654+ if string (cached .Data ()) != entry .expectedData {
655+ wrongDataCount ++
656+ t .Errorf ("Expected data '%s', got '%s'" , entry .expectedData , string (cached .Data ()))
657+ }
658+ }
659+ }
660+
661+ // We should have no entries with wrong data
662+ if wrongDataCount > 0 {
663+ t .Errorf ("Found %d entries with wrong data" , wrongDataCount )
664+ }
665+
666+ // We should have some reasonable number of entries cached
667+ // With 1500 concurrent write entries and cache size 500, accounting for initial entries
668+ // and concurrent evictions, we expect at least 200 of the writer entries to be cached
669+ if cachedCount < 200 {
670+ t .Errorf ("Expected at least 200 writer entries to be cached, but only found %d out of %d total" , cachedCount , len (writerEntries ))
671+ }
672+
673+ t .Logf ("Concurrent read/write test passed: %d writer entries cached out of %d written" , cachedCount , len (writerEntries ))
616674}
617675
618676func TestCacheConcurrentEviction (t * testing.T ) {
0 commit comments