Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 98 additions & 37 deletions internal/runtime/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
runtimemetrics "sigs.k8s.io/cluster-api/internal/runtime/metrics"
runtimeregistry "sigs.k8s.io/cluster-api/internal/runtime/registry"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/cache"
)

type errCallingExtensionHandler error
Expand All @@ -70,22 +71,52 @@ type Options struct {
// New returns a new Client.
func New(options Options) runtimeclient.Client {
return &client{
certFile: options.CertFile,
keyFile: options.KeyFile,
catalog: options.Catalog,
registry: options.Registry,
client: options.Client,
certFile: options.CertFile,
keyFile: options.KeyFile,
catalog: options.Catalog,
registry: options.Registry,
client: options.Client,
httpClientsCache: cache.New[httpClientEntry](24 * time.Hour),
}
}

var _ runtimeclient.Client = &client{}

type client struct {
certFile string
keyFile string
catalog *runtimecatalog.Catalog
registry runtimeregistry.ExtensionRegistry
client ctrlclient.Client
certFile string
keyFile string
catalog *runtimecatalog.Catalog
registry runtimeregistry.ExtensionRegistry
client ctrlclient.Client
httpClientsCache cache.Cache[httpClientEntry]
}

type httpClientEntry struct {
// Note: caData and hostName are the variable parts in the TLSConfig
// for an http.Client that is used to call runtime extensions.
caData []byte
hostName string

client *http.Client
}

func newHTTPClientEntry(hostName string, caData []byte, client *http.Client) httpClientEntry {
return httpClientEntry{
hostName: hostName,
caData: caData,
client: client,
}
}

func newHTTPClientEntryKey(hostName string, caData []byte) string {
return httpClientEntry{
hostName: hostName,
caData: caData,
}.Key()
}

func (r httpClientEntry) Key() string {
return fmt.Sprintf("%s/%s", r.hostName, string(r.caData))
}

func (c *client) WarmUp(extensionConfigList *runtimev1.ExtensionConfigList) error {
Expand All @@ -105,16 +136,20 @@ func (c *client) Discover(ctx context.Context, extensionConfig *runtimev1.Extens
return nil, errors.Wrapf(err, "failed to discover extension %q: failed to compute GVH of hook", extensionConfig.Name)
}

httpClient, err := c.getHTTPClient(extensionConfig.Spec.ClientConfig)
if err != nil {
return nil, errors.Wrapf(err, "failed to discover extension %q: failed to get http client", extensionConfig.Name)
}

request := &runtimehooksv1.DiscoveryRequest{}
response := &runtimehooksv1.DiscoveryResponse{}
opts := &httpCallOptions{
certFile: c.certFile,
keyFile: c.keyFile,
catalog: c.catalog,
config: extensionConfig.Spec.ClientConfig,
registrationGVH: hookGVH,
hookGVH: hookGVH,
timeout: defaultDiscoveryTimeout,
httpClient: httpClient,
}
if err := httpCall(ctx, request, response, opts); err != nil {
return nil, errors.Wrapf(err, "failed to discover extension %q", extensionConfig.Name)
Expand Down Expand Up @@ -336,15 +371,19 @@ func (c *client) CallExtension(ctx context.Context, hook runtimecatalog.Hook, fo
}
}

httpClient, err := c.getHTTPClient(registration.ClientConfig)
if err != nil {
return errors.Wrapf(err, "failed to call extension handler %q: failed to get http client", name)
}

httpOpts := &httpCallOptions{
certFile: c.certFile,
keyFile: c.keyFile,
catalog: c.catalog,
config: registration.ClientConfig,
registrationGVH: registration.GroupVersionHook,
hookGVH: hookGVH,
name: strings.TrimSuffix(registration.Name, "."+registration.ExtensionConfigName),
timeout: timeoutDuration,
httpClient: httpClient,
}
err = httpCall(ctx, request, response, httpOpts)
if err != nil {
Expand Down Expand Up @@ -388,6 +427,48 @@ func (c *client) CallExtension(ctx context.Context, hook runtimecatalog.Hook, fo
return nil
}

func (c *client) getHTTPClient(config runtimev1.ClientConfig) (*http.Client, error) {
// Note: we are passing an empty gvh and "" as name because the only relevant part of the url
// for this function is the Hostname, which derives from config (ghv and name are appended to the path).
extensionURL, err := urlForExtension(config, runtimecatalog.GroupVersionHook{}, "")
if err != nil {
return nil, err
}

if cacheEntry, ok := c.httpClientsCache.Has(newHTTPClientEntryKey(extensionURL.Hostname(), config.CABundle)); ok {
return cacheEntry.client, nil
}

httpClient, err := createHTTPClient(c.certFile, c.keyFile, config.CABundle, extensionURL.Hostname())
if err != nil {
return nil, err
}

c.httpClientsCache.Add(newHTTPClientEntry(extensionURL.Hostname(), config.CABundle, httpClient))
return httpClient, nil
}

func createHTTPClient(certFile, keyFile string, caData []byte, hostName string) (*http.Client, error) {
httpClient := &http.Client{}
tlsConfig, err := transport.TLSConfigFor(&transport.Config{
TLS: transport.TLSConfig{
CertFile: certFile,
KeyFile: keyFile,
CAData: caData,
ServerName: hostName,
},
})
if err != nil {
return nil, errors.Wrap(err, "failed to create tls config")
}

// This also adds http2
httpClient.Transport = utilnet.SetTransportDefaults(&http.Transport{
TLSClientConfig: tlsConfig,
})
return httpClient, nil
}

// cloneAndAddSettings creates a new request object and adds settings to it.
func cloneAndAddSettings(request runtimehooksv1.RequestObject, registrationSettings map[string]string) runtimehooksv1.RequestObject {
// Merge the settings from registration with the settings in the request.
Expand All @@ -406,14 +487,13 @@ func cloneAndAddSettings(request runtimehooksv1.RequestObject, registrationSetti
}

type httpCallOptions struct {
certFile string
keyFile string
catalog *runtimecatalog.Catalog
config runtimev1.ClientConfig
registrationGVH runtimecatalog.GroupVersionHook
hookGVH runtimecatalog.GroupVersionHook
name string
timeout time.Duration
httpClient *http.Client
}

func httpCall(ctx context.Context, request, response runtime.Object, opts *httpCallOptions) error {
Expand Down Expand Up @@ -492,27 +572,8 @@ func httpCall(ctx context.Context, request, response runtime.Object, opts *httpC
return errors.Wrap(err, "http call failed: failed to create http request")
}

// Use client-go's transport.TLSConfigureFor to ensure good defaults for tls
client := &http.Client{}
defer client.CloseIdleConnections()

tlsConfig, err := transport.TLSConfigFor(&transport.Config{
TLS: transport.TLSConfig{
CertFile: opts.certFile,
KeyFile: opts.keyFile,
CAData: opts.config.CABundle,
ServerName: extensionURL.Hostname(),
},
})
if err != nil {
return errors.Wrap(err, "http call failed: failed to create tls config")
}
// This also adds http2
client.Transport = utilnet.SetTransportDefaults(&http.Transport{
TLSClientConfig: tlsConfig,
})

resp, err := client.Do(httpRequest)
// Call the extension.
resp, err := opts.httpClient.Do(httpRequest)

// Create http request metric.
defer func() {
Expand Down
96 changes: 96 additions & 0 deletions internal/runtime/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"os"
"path/filepath"
"regexp"
Expand Down Expand Up @@ -201,6 +202,15 @@
// set url to srv for in tt.opts
tt.opts.config.URL = ptr.To(srv.URL)
tt.opts.config.CABundle = testcerts.CACert

// set httpClient in tt.opts
// Note: cert and key file are not necessary, because in this test the server do not requires client authentication with certificates signed by a given CA.
u, err := url.Parse(srv.URL)
g.Expect(err).ToNot(HaveOccurred())

httpClient, err := createHTTPClient("", "", testcerts.CACert, u.Hostname())
g.Expect(err).ToNot(HaveOccurred())
tt.opts.httpClient = httpClient
}

err := httpCall(context.TODO(), tt.request, tt.response, tt.opts)
Expand Down Expand Up @@ -941,6 +951,92 @@
g.Expect(serverCallCount).To(Equal(1))
}

func TestClient_GetHttpClient(t *testing.T) {
g := NewWithT(t)

extension1 := runtimev1.ExtensionConfig{
ObjectMeta: metav1.ObjectMeta{
Name: "extension1",
ResourceVersion: "15",
},
Spec: runtimev1.ExtensionConfigSpec{
ClientConfig: runtimev1.ClientConfig{
URL: "https://serverA.example.com/",

Check failure on line 964 in internal/runtime/client/client_test.go

View workflow job for this annotation

GitHub Actions / lint

cannot use "https://serverA.example.com/" (untyped string constant) as *string value in struct literal

Check failure on line 964 in internal/runtime/client/client_test.go

View workflow job for this annotation

GitHub Actions / lint (test)

cannot use "https://serverA.example.com/" (untyped string constant) as *string value in struct literal

Check failure on line 964 in internal/runtime/client/client_test.go

View workflow job for this annotation

GitHub Actions / lint (hack/tools)

cannot use "https://serverA.example.com/" (untyped string constant) as *string value in struct literal
CABundle: testcerts.CACert,
},
},
}

extension2 := runtimev1.ExtensionConfig{
ObjectMeta: metav1.ObjectMeta{
Name: "extension2",
ResourceVersion: "36",
},
Spec: runtimev1.ExtensionConfigSpec{
ClientConfig: runtimev1.ClientConfig{
URL: "https://serverA.example.com/",

Check failure on line 977 in internal/runtime/client/client_test.go

View workflow job for this annotation

GitHub Actions / lint

cannot use "https://serverA.example.com/" (untyped string constant) as *string value in struct literal

Check failure on line 977 in internal/runtime/client/client_test.go

View workflow job for this annotation

GitHub Actions / lint (test)

cannot use "https://serverA.example.com/" (untyped string constant) as *string value in struct literal

Check failure on line 977 in internal/runtime/client/client_test.go

View workflow job for this annotation

GitHub Actions / lint (hack/tools)

cannot use "https://serverA.example.com/" (untyped string constant) as *string value in struct literal
CABundle: testcerts.CACert,
},
},
}

extension3 := runtimev1.ExtensionConfig{
ObjectMeta: metav1.ObjectMeta{
Name: "extension3",
ResourceVersion: "54",
},
Spec: runtimev1.ExtensionConfigSpec{
ClientConfig: runtimev1.ClientConfig{
URL: "https://serverB.example.com/",

Check failure on line 990 in internal/runtime/client/client_test.go

View workflow job for this annotation

GitHub Actions / lint

cannot use "https://serverB.example.com/" (untyped string constant) as *string value in struct literal

Check failure on line 990 in internal/runtime/client/client_test.go

View workflow job for this annotation

GitHub Actions / lint (test)

cannot use "https://serverB.example.com/" (untyped string constant) as *string value in struct literal

Check failure on line 990 in internal/runtime/client/client_test.go

View workflow job for this annotation

GitHub Actions / lint (hack/tools)

cannot use "https://serverB.example.com/" (untyped string constant) as *string value in struct literal
CABundle: testcerts.CACert, // in a real example also CA should be different, but the host name is already enough to require a different client.
},
},
}

c := New(Options{})

internalClient := c.(*client)
g.Expect(internalClient.httpClientsCache.Len()).To(Equal(0))

Check failure on line 999 in internal/runtime/client/client_test.go

View workflow job for this annotation

GitHub Actions / lint

internalClient.httpClientsCache.Len undefined (type "sigs.k8s.io/cluster-api/util/cache".Cache[httpClientEntry] has no field or method Len)

Check failure on line 999 in internal/runtime/client/client_test.go

View workflow job for this annotation

GitHub Actions / lint (test)

internalClient.httpClientsCache.Len undefined (type "sigs.k8s.io/cluster-api/util/cache".Cache[httpClientEntry] has no field or method Len)

Check failure on line 999 in internal/runtime/client/client_test.go

View workflow job for this annotation

GitHub Actions / lint (hack/tools)

internalClient.httpClientsCache.Len undefined (type "sigs.k8s.io/cluster-api/util/cache".Cache[httpClientEntry] has no field or method Len)

// Get http client for extension 1
gotClientExtension1, err := internalClient.getHTTPClient(extension1.Spec.ClientConfig)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(gotClientExtension1).ToNot(BeNil())

// Check http client cache have only one item
g.Expect(internalClient.httpClientsCache.Len()).To(Equal(1))

Check failure on line 1007 in internal/runtime/client/client_test.go

View workflow job for this annotation

GitHub Actions / lint

internalClient.httpClientsCache.Len undefined (type "sigs.k8s.io/cluster-api/util/cache".Cache[httpClientEntry] has no field or method Len)

Check failure on line 1007 in internal/runtime/client/client_test.go

View workflow job for this annotation

GitHub Actions / lint (test)

internalClient.httpClientsCache.Len undefined (type "sigs.k8s.io/cluster-api/util/cache".Cache[httpClientEntry] has no field or method Len)

Check failure on line 1007 in internal/runtime/client/client_test.go

View workflow job for this annotation

GitHub Actions / lint (hack/tools)

internalClient.httpClientsCache.Len undefined (type "sigs.k8s.io/cluster-api/util/cache".Cache[httpClientEntry] has no field or method Len)
_, ok := internalClient.httpClientsCache.Has(newHTTPClientEntryKey("serverA.example.com", extension1.Spec.ClientConfig.CABundle))
g.Expect(ok).To(BeTrue())

// Check http client cache is used for the same extension
gotClientExtension1Again, err := internalClient.getHTTPClient(extension1.Spec.ClientConfig)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(gotClientExtension1Again).To(Equal(gotClientExtension1))

// Get http client for extension 2, same server
gotClientExtension2, err := internalClient.getHTTPClient(extension2.Spec.ClientConfig)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(gotClientExtension2).ToNot(BeNil())
g.Expect(gotClientExtension2).To(Equal(gotClientExtension1))

// Check http client cache have two items
g.Expect(internalClient.httpClientsCache.Len()).To(Equal(1))

Check failure on line 1023 in internal/runtime/client/client_test.go

View workflow job for this annotation

GitHub Actions / lint

internalClient.httpClientsCache.Len undefined (type "sigs.k8s.io/cluster-api/util/cache".Cache[httpClientEntry] has no field or method Len)

Check failure on line 1023 in internal/runtime/client/client_test.go

View workflow job for this annotation

GitHub Actions / lint (test)

internalClient.httpClientsCache.Len undefined (type "sigs.k8s.io/cluster-api/util/cache".Cache[httpClientEntry] has no field or method Len)

Check failure on line 1023 in internal/runtime/client/client_test.go

View workflow job for this annotation

GitHub Actions / lint (hack/tools)

internalClient.httpClientsCache.Len undefined (type "sigs.k8s.io/cluster-api/util/cache".Cache[httpClientEntry] has no field or method Len)
_, ok = internalClient.httpClientsCache.Has(newHTTPClientEntryKey("serverA.example.com", extension2.Spec.ClientConfig.CABundle))
g.Expect(ok).To(BeTrue())

// Get http client for extension 3, another server
gotClientExtension3, err := internalClient.getHTTPClient(extension3.Spec.ClientConfig)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(gotClientExtension3).ToNot(BeNil())

// Check http client cache have two items
g.Expect(internalClient.httpClientsCache.Len()).To(Equal(2))

Check failure on line 1033 in internal/runtime/client/client_test.go

View workflow job for this annotation

GitHub Actions / lint

internalClient.httpClientsCache.Len undefined (type "sigs.k8s.io/cluster-api/util/cache".Cache[httpClientEntry] has no field or method Len) (typecheck)

Check failure on line 1033 in internal/runtime/client/client_test.go

View workflow job for this annotation

GitHub Actions / lint (test)

internalClient.httpClientsCache.Len undefined (type "sigs.k8s.io/cluster-api/util/cache".Cache[httpClientEntry] has no field or method Len) (typecheck)

Check failure on line 1033 in internal/runtime/client/client_test.go

View workflow job for this annotation

GitHub Actions / lint (hack/tools)

internalClient.httpClientsCache.Len undefined (type "sigs.k8s.io/cluster-api/util/cache".Cache[httpClientEntry] has no field or method Len) (typecheck)
_, ok = internalClient.httpClientsCache.Has(newHTTPClientEntryKey("serverA.example.com", extension1.Spec.ClientConfig.CABundle))
g.Expect(ok).To(BeTrue())
_, ok = internalClient.httpClientsCache.Has(newHTTPClientEntryKey("serverB.example.com", extension2.Spec.ClientConfig.CABundle))
g.Expect(ok).To(BeTrue())
}

func cacheKeyFunc(extensionName, extensionConfigResourceVersion string, request runtimehooksv1.RequestObject) string {
// Note: extensionName is identical to the value of the name parameter passed into CallExtension.
s := fmt.Sprintf("%s-%s", extensionName, extensionConfigResourceVersion)
Expand Down
Loading