Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions registry-scanner/pkg/registry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func clearRegistries() {
registryLock.Lock()
registries = make(map[string]*RegistryEndpoint)
registryLock.Unlock()

// Also clear transport cache when registries are cleared
// This ensures that when registry configuration changes, we use fresh transports
ClearTransportCache()
}

// LoadRegistryConfiguration loads a YAML-formatted registry configuration from
Expand Down
71 changes: 68 additions & 3 deletions registry-scanner/pkg/registry/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/argoproj-labs/argocd-image-updater/registry-scanner/pkg/image"
"github.com/argoproj-labs/argocd-image-updater/registry-scanner/pkg/log"

memcache "github.com/patrickmn/go-cache"
"go.uber.org/ratelimit"
"golang.org/x/sync/singleflight"
)
Expand Down Expand Up @@ -122,6 +123,10 @@ var registryLock sync.RWMutex
// credentialGroup ensures only one credential refresh happens per registry
var credentialGroup singleflight.Group

// Transport cache to avoid creating new transports for each request
// Using go-cache with 30 minute expiration and 10 minute cleanup interval
var transportCache = memcache.New(30*time.Minute, 10*time.Minute)

func AddRegistryEndpointFromConfig(epc RegistryConfiguration) error {
ep := NewRegistryEndpoint(epc.Prefix, epc.Name, epc.ApiURL, epc.Credentials, epc.DefaultNS, epc.Insecure, TagListSortFromString(epc.TagSortMode), epc.Limit, epc.CredsExpire)
return AddRegistryEndpoint(ep)
Expand Down Expand Up @@ -308,16 +313,76 @@ func (ep *RegistryEndpoint) DeepCopy() *RegistryEndpoint {
return newEp
}

// ClearTransportCache clears the transport cache
// This is useful when registry configuration changes
func ClearTransportCache() {
transportCache.Flush()
}

// GetTransport returns a transport object for this endpoint
// Implements connection pooling and reuse to avoid creating new transports for each request
func (ep *RegistryEndpoint) GetTransport() *http.Transport {
// Check if we have a cached transport for this registry
if cachedTransport, found := transportCache.Get(ep.RegistryAPI); found {
transport := cachedTransport.(*http.Transport)
log.Debugf("Transport cache HIT for %s: %p", ep.RegistryAPI, transport)

// Validate that the transport is still usable
if isTransportValid(transport) {
return transport
}

// Transport is stale, remove it from cache
log.Debugf("Transport for %s is stale, removing from cache", ep.RegistryAPI)
transportCache.Delete(ep.RegistryAPI)
}

log.Debugf("Transport cache MISS for %s", ep.RegistryAPI)

// Create a new transport with optimized connection pool settings
tlsC := &tls.Config{}
if ep.Insecure {
tlsC.InsecureSkipVerify = true
}
return &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: tlsC,

// Create transport with aggressive timeout and connection management
transport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: tlsC,
MaxIdleConns: 20, // Reduced global max idle connections
MaxIdleConnsPerHost: 5, // Reduced per-host connections
IdleConnTimeout: 90 * time.Second, // Reduced idle timeout
TLSHandshakeTimeout: 10 * time.Second, // Reduced TLS timeout
ExpectContinueTimeout: 1 * time.Second, // Expect-Continue timeout
DisableKeepAlives: false, // Enable HTTP Keep-Alive
ForceAttemptHTTP2: true, // Enable HTTP/2 if available
// Critical timeout settings to prevent hanging connections
ResponseHeaderTimeout: 10 * time.Second, // Response header timeout
MaxConnsPerHost: 10, // Limit total connections per host
}

// Cache the transport for reuse with default expiration (30 minutes)
transportCache.Set(ep.RegistryAPI, transport, memcache.DefaultExpiration)
log.Debugf("Cached NEW transport for %s: %p", ep.RegistryAPI, transport)

return transport
}

// isTransportValid checks if a cached transport is still valid and usable
func isTransportValid(transport *http.Transport) bool {
// Basic validation - check if transport is not nil and has valid configuration
if transport == nil {
return false
}

// Check if the transport's connection settings are reasonable
// This is a simple validation, more sophisticated checks could be added
if transport.MaxIdleConns < 0 || transport.MaxIdleConnsPerHost < 0 {
return false
}

// Transport appears to be valid
return true
}

// init initializes the registry configuration
Expand Down
58 changes: 56 additions & 2 deletions registry-scanner/pkg/registry/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package registry

import (
"fmt"
"net/http"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -308,9 +309,12 @@ func Test_GetTagListSortFromString(t *testing.T) {
}

func TestGetTransport(t *testing.T) {
ClearTransportCache()
defer ClearTransportCache()
t.Run("returns transport with default TLS config when Insecure is false", func(t *testing.T) {
endpoint := &RegistryEndpoint{
Insecure: false,
RegistryAPI: "secure-registry",
Insecure: false,
}
transport := endpoint.GetTransport()

Expand All @@ -321,7 +325,8 @@ func TestGetTransport(t *testing.T) {

t.Run("returns transport with insecure TLS config when Insecure is true", func(t *testing.T) {
endpoint := &RegistryEndpoint{
Insecure: true,
RegistryAPI: "insecure-registry",
Insecure: true,
}
transport := endpoint.GetTransport()

Expand Down Expand Up @@ -373,3 +378,52 @@ func TestAddRegistryEndpointFromConfig(t *testing.T) {
require.NoError(t, err)
})
}

// Test for transport caching and retrieval
func TestTransportCache(t *testing.T) {
// Clean up cache before and after test
ClearTransportCache()
defer ClearTransportCache()

endpoint := &RegistryEndpoint{
RegistryAPI: "https://example.com",
Insecure: false,
}

// 1. Test cache MISS and creation of a new transport
transport1 := endpoint.GetTransport()
assert.NotNil(t, transport1, "Transport should not be nil on cache miss")

// 2. Test cache HIT
transport2 := endpoint.GetTransport()
assert.NotNil(t, transport2, "Transport should not be nil on cache hit")
assert.Same(t, transport1, transport2, "Should retrieve the same transport instance from cache")

// 3. Test cache clearing
ClearTransportCache()
transport3 := endpoint.GetTransport()
assert.NotSame(t, transport1, transport3, "Should create a new transport after cache is cleared")
}

// Test for transport validation logic
func TestIsTransportValid(t *testing.T) {
t.Run("valid transport", func(t *testing.T) {
transport := &http.Transport{
MaxIdleConns: 10,
MaxIdleConnsPerHost: 5,
}
assert.True(t, isTransportValid(transport), "Should be a valid transport")
})

t.Run("nil transport", func(t *testing.T) {
assert.False(t, isTransportValid(nil), "Nil transport should be invalid")
})

t.Run("invalid connection settings", func(t *testing.T) {
transport := &http.Transport{
MaxIdleConns: -1,
}
assert.False(t, isTransportValid(transport), "Transport with invalid settings should be invalid")
})
}