Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 98 additions & 37 deletions internal/runtime/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
runtimemetrics "sigs.k8s.io/cluster-api/internal/runtime/metrics"
runtimeregistry "sigs.k8s.io/cluster-api/internal/runtime/registry"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/cache"
)

type errCallingExtensionHandler error
Expand All @@ -70,22 +71,52 @@ type Options struct {
// New returns a new Client.
func New(options Options) runtimeclient.Client {
return &client{
certFile: options.CertFile,
keyFile: options.KeyFile,
catalog: options.Catalog,
registry: options.Registry,
client: options.Client,
certFile: options.CertFile,
keyFile: options.KeyFile,
catalog: options.Catalog,
registry: options.Registry,
client: options.Client,
httpClientsCache: cache.New[httpClientEntry](24 * time.Hour),
}
}

var _ runtimeclient.Client = &client{}

type client struct {
certFile string
keyFile string
catalog *runtimecatalog.Catalog
registry runtimeregistry.ExtensionRegistry
client ctrlclient.Client
certFile string
keyFile string
catalog *runtimecatalog.Catalog
registry runtimeregistry.ExtensionRegistry
client ctrlclient.Client
httpClientsCache cache.Cache[httpClientEntry]
}

type httpClientEntry struct {
// Note: caData and hostName are the variable parts in the TLSConfig
// for an http.Client that is used to call runtime extensions.
caData []byte
hostName string

client *http.Client
}

func newHTTPClientEntry(hostName string, caData []byte, client *http.Client) httpClientEntry {
return httpClientEntry{
hostName: hostName,
caData: caData,
client: client,
}
}

func newHTTPClientEntryKey(hostName string, caData []byte) string {
return httpClientEntry{
hostName: hostName,
caData: caData,
}.Key()
}

func (r httpClientEntry) Key() string {
return fmt.Sprintf("%s/%s", r.hostName, string(r.caData))
}

func (c *client) WarmUp(extensionConfigList *runtimev1.ExtensionConfigList) error {
Expand All @@ -105,16 +136,20 @@ func (c *client) Discover(ctx context.Context, extensionConfig *runtimev1.Extens
return nil, errors.Wrapf(err, "failed to discover extension %q: failed to compute GVH of hook", extensionConfig.Name)
}

httpClient, err := c.getHTTPClient(extensionConfig.Spec.ClientConfig)
if err != nil {
return nil, errors.Wrapf(err, "failed to discover extension %q: failed to get http client", extensionConfig.Name)
}

request := &runtimehooksv1.DiscoveryRequest{}
response := &runtimehooksv1.DiscoveryResponse{}
opts := &httpCallOptions{
certFile: c.certFile,
keyFile: c.keyFile,
catalog: c.catalog,
config: extensionConfig.Spec.ClientConfig,
registrationGVH: hookGVH,
hookGVH: hookGVH,
timeout: defaultDiscoveryTimeout,
httpClient: httpClient,
}
if err := httpCall(ctx, request, response, opts); err != nil {
return nil, errors.Wrapf(err, "failed to discover extension %q", extensionConfig.Name)
Expand Down Expand Up @@ -336,15 +371,19 @@ func (c *client) CallExtension(ctx context.Context, hook runtimecatalog.Hook, fo
}
}

httpClient, err := c.getHTTPClient(registration.ClientConfig)
if err != nil {
return errors.Wrapf(err, "failed to call extension handler %q: failed to get http client", name)
}

httpOpts := &httpCallOptions{
certFile: c.certFile,
keyFile: c.keyFile,
catalog: c.catalog,
config: registration.ClientConfig,
registrationGVH: registration.GroupVersionHook,
hookGVH: hookGVH,
name: strings.TrimSuffix(registration.Name, "."+registration.ExtensionConfigName),
timeout: timeoutDuration,
httpClient: httpClient,
}
err = httpCall(ctx, request, response, httpOpts)
if err != nil {
Expand Down Expand Up @@ -388,6 +427,48 @@ func (c *client) CallExtension(ctx context.Context, hook runtimecatalog.Hook, fo
return nil
}

func (c *client) getHTTPClient(config runtimev1.ClientConfig) (*http.Client, error) {
// Note: we are passing an empty gvh and "" as name because the only relevant part of the url
// for this function is the Hostname, which derives from config (ghv and name are appended to the path).
extensionURL, err := urlForExtension(config, runtimecatalog.GroupVersionHook{}, "")
if err != nil {
return nil, err
}

if cacheEntry, ok := c.httpClientsCache.Has(newHTTPClientEntryKey(extensionURL.Hostname(), config.CABundle)); ok {
return cacheEntry.client, nil
}

httpClient, err := createHTTPClient(c.certFile, c.keyFile, config.CABundle, extensionURL.Hostname())
if err != nil {
return nil, err
}

c.httpClientsCache.Add(newHTTPClientEntry(extensionURL.Hostname(), config.CABundle, httpClient))
return httpClient, nil
}

func createHTTPClient(certFile, keyFile string, caData []byte, hostName string) (*http.Client, error) {
httpClient := &http.Client{}
tlsConfig, err := transport.TLSConfigFor(&transport.Config{
TLS: transport.TLSConfig{
CertFile: certFile,
KeyFile: keyFile,
CAData: caData,
ServerName: hostName,
},
})
if err != nil {
return nil, errors.Wrap(err, "failed to create tls config")
}

// This also adds http2
httpClient.Transport = utilnet.SetTransportDefaults(&http.Transport{
TLSClientConfig: tlsConfig,
})
return httpClient, nil
}

// cloneAndAddSettings creates a new request object and adds settings to it.
func cloneAndAddSettings(request runtimehooksv1.RequestObject, registrationSettings map[string]string) runtimehooksv1.RequestObject {
// Merge the settings from registration with the settings in the request.
Expand All @@ -406,14 +487,13 @@ func cloneAndAddSettings(request runtimehooksv1.RequestObject, registrationSetti
}

type httpCallOptions struct {
certFile string
keyFile string
catalog *runtimecatalog.Catalog
config runtimev1.ClientConfig
registrationGVH runtimecatalog.GroupVersionHook
hookGVH runtimecatalog.GroupVersionHook
name string
timeout time.Duration
httpClient *http.Client
}

func httpCall(ctx context.Context, request, response runtime.Object, opts *httpCallOptions) error {
Expand Down Expand Up @@ -492,27 +572,8 @@ func httpCall(ctx context.Context, request, response runtime.Object, opts *httpC
return errors.Wrap(err, "http call failed: failed to create http request")
}

// Use client-go's transport.TLSConfigureFor to ensure good defaults for tls
client := &http.Client{}
defer client.CloseIdleConnections()

tlsConfig, err := transport.TLSConfigFor(&transport.Config{
TLS: transport.TLSConfig{
CertFile: opts.certFile,
KeyFile: opts.keyFile,
CAData: opts.config.CABundle,
ServerName: extensionURL.Hostname(),
},
})
if err != nil {
return errors.Wrap(err, "http call failed: failed to create tls config")
}
// This also adds http2
client.Transport = utilnet.SetTransportDefaults(&http.Transport{
TLSClientConfig: tlsConfig,
})

resp, err := client.Do(httpRequest)
// Call the extension.
resp, err := opts.httpClient.Do(httpRequest)

// Create http request metric.
defer func() {
Expand Down
96 changes: 96 additions & 0 deletions internal/runtime/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"os"
"path/filepath"
"regexp"
Expand Down Expand Up @@ -201,6 +202,15 @@ func TestClient_httpCall(t *testing.T) {
// set url to srv for in tt.opts
tt.opts.config.URL = srv.URL
tt.opts.config.CABundle = testcerts.CACert

// set httpClient in tt.opts
// Note: cert and key file are not necessary, because in this test the server do not requires client authentication with certificates signed by a given CA.
u, err := url.Parse(srv.URL)
g.Expect(err).ToNot(HaveOccurred())

httpClient, err := createHTTPClient("", "", testcerts.CACert, u.Hostname())
g.Expect(err).ToNot(HaveOccurred())
tt.opts.httpClient = httpClient
}

err := httpCall(context.TODO(), tt.request, tt.response, tt.opts)
Expand Down Expand Up @@ -939,6 +949,92 @@ func TestClient_CallExtensionWithClientAuthentication(t *testing.T) {
g.Expect(serverCallCount).To(Equal(1))
}

func TestClient_GetHttpClient(t *testing.T) {
g := NewWithT(t)

extension1 := runtimev1.ExtensionConfig{
ObjectMeta: metav1.ObjectMeta{
Name: "extension1",
ResourceVersion: "15",
},
Spec: runtimev1.ExtensionConfigSpec{
ClientConfig: runtimev1.ClientConfig{
URL: "https://serverA.example.com/",
CABundle: testcerts.CACert,
},
},
}

extension2 := runtimev1.ExtensionConfig{
ObjectMeta: metav1.ObjectMeta{
Name: "extension2",
ResourceVersion: "36",
},
Spec: runtimev1.ExtensionConfigSpec{
ClientConfig: runtimev1.ClientConfig{
URL: "https://serverA.example.com/",
CABundle: testcerts.CACert,
},
},
}

extension3 := runtimev1.ExtensionConfig{
ObjectMeta: metav1.ObjectMeta{
Name: "extension3",
ResourceVersion: "54",
},
Spec: runtimev1.ExtensionConfigSpec{
ClientConfig: runtimev1.ClientConfig{
URL: "https://serverB.example.com/",
CABundle: testcerts.CACert, // in a real example also CA should be different, but the host name is already enough to require a different client.
},
},
}

c := New(Options{})

internalClient := c.(*client)
g.Expect(internalClient.httpClientsCache.Len()).To(Equal(0))

// Get http client for extension 1
gotClientExtension1, err := internalClient.getHTTPClient(extension1.Spec.ClientConfig)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(gotClientExtension1).ToNot(BeNil())

// Check http client cache have only one item
g.Expect(internalClient.httpClientsCache.Len()).To(Equal(1))
_, ok := internalClient.httpClientsCache.Has(newHTTPClientEntryKey("serverA.example.com", extension1.Spec.ClientConfig.CABundle))
g.Expect(ok).To(BeTrue())

// Check http client cache is used for the same extension
gotClientExtension1Again, err := internalClient.getHTTPClient(extension1.Spec.ClientConfig)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(gotClientExtension1Again).To(Equal(gotClientExtension1))

// Get http client for extension 2, same server
gotClientExtension2, err := internalClient.getHTTPClient(extension2.Spec.ClientConfig)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(gotClientExtension2).ToNot(BeNil())
g.Expect(gotClientExtension2).To(Equal(gotClientExtension1))

// Check http client cache have two items
g.Expect(internalClient.httpClientsCache.Len()).To(Equal(1))
_, ok = internalClient.httpClientsCache.Has(newHTTPClientEntryKey("serverA.example.com", extension2.Spec.ClientConfig.CABundle))
g.Expect(ok).To(BeTrue())

// Get http client for extension 3, another server
gotClientExtension3, err := internalClient.getHTTPClient(extension3.Spec.ClientConfig)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(gotClientExtension3).ToNot(BeNil())

// Check http client cache have two items
g.Expect(internalClient.httpClientsCache.Len()).To(Equal(2))
_, ok = internalClient.httpClientsCache.Has(newHTTPClientEntryKey("serverA.example.com", extension1.Spec.ClientConfig.CABundle))
g.Expect(ok).To(BeTrue())
_, ok = internalClient.httpClientsCache.Has(newHTTPClientEntryKey("serverB.example.com", extension2.Spec.ClientConfig.CABundle))
g.Expect(ok).To(BeTrue())
}

func cacheKeyFunc(extensionName, extensionConfigResourceVersion string, request runtimehooksv1.RequestObject) string {
// Note: extensionName is identical to the value of the name parameter passed into CallExtension.
s := fmt.Sprintf("%s-%s", extensionName, extensionConfigResourceVersion)
Expand Down
7 changes: 7 additions & 0 deletions util/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type Cache[E Entry] interface {
// Has checks if the given key (still) exists in the Cache.
// Note: entries expire after the ttl.
Has(key string) (E, bool)

// Len returns the number of entries in the cache.
Len() int
}

// New creates a new cache.
Expand Down Expand Up @@ -96,6 +99,10 @@ func (r *cache[E]) Has(key string) (E, bool) {
return *new(E), false
}

func (r *cache[E]) Len() int {
return len(r.ListKeys())
}

// ReconcileEntry is an Entry for the Cache that stores the
// earliest time after which the next Reconcile should be executed.
type ReconcileEntry struct {
Expand Down
Loading