Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2025 The Kubernetes Authors.
Copyright 2026 The Kubernetes Authors.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the copyright is realy 2025-... I don't think we need to change copyright statements in files modified in a specific year,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted to 2025


Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -14,24 +14,22 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics
package httpds

import (
"context"
"fmt"
"io"
"net/http"
"net/url"
"time"

"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
)

// Client is an interface for retrieving the metrics from an endpoint URL.
// Client is an interface for retrieving the data from an endpoint URL.
type Client interface {
Get(ctx context.Context, target *url.URL, ep datalayer.Addressable) (PrometheusMetricMap, error)
Get(ctx context.Context, target *url.URL, ep datalayer.Addressable, parser func(io.Reader) (any, error)) (any, error)
}

const (
Expand Down Expand Up @@ -67,14 +65,15 @@ type client struct {
http.Client
}

func (cl *client) Get(ctx context.Context, target *url.URL, ep datalayer.Addressable) (PrometheusMetricMap, error) {
func (cl *client) Get(ctx context.Context, target *url.URL, ep datalayer.Addressable,
parser func(io.Reader) (any, error)) (any, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, target.String(), nil)
if err != nil {
return nil, fmt.Errorf("failed to create request: %v", err)
}
resp, err := defaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to fetch metrics from %s: %w", ep.GetNamespacedName(), err)
return nil, fmt.Errorf("failed to fetch data from %s: %w", ep.GetNamespacedName(), err)
}
defer func() {
_ = resp.Body.Close()
Expand All @@ -84,10 +83,5 @@ func (cl *client) Get(ctx context.Context, target *url.URL, ep datalayer.Address
return nil, fmt.Errorf("unexpected status code from %s: %v", ep.GetNamespacedName(), resp.StatusCode)
}

parser := expfmt.NewTextParser(model.LegacyValidation)
metricFamilies, err := parser.TextToMetricFamilies(resp.Body)
if err != nil {
return nil, err
}
return metricFamilies, err
return parser(resp.Body)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2025 The Kubernetes Authors.
Copyright 2026 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -14,61 +14,67 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics
package httpds

import (
"context"
"crypto/tls"
"errors"
"fmt"
"io"
"net/url"
"reflect"
"sync"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
)

// DataSource is a Model Server Protocol (MSP) compliant metrics data source,
// returning Prometheus formatted metrics for an endpoint.
type DataSource struct {
typedName plugins.TypedName
metricsScheme string // scheme to use in metrics URL
metricsPath string // path to use in metrics URL
// HTTPDataSource is a data source that recieves its data using HTTP client.
type HTTPDataSource struct {
typedName plugins.TypedName
scheme string // scheme to use
path string // path to use

client Client // client (e.g. a wrapped http.Client) used to get metrics
client Client // client (e.g. a wrapped http.Client) used to get data
parser func(io.Reader) (any, error)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: I think there are two high level ways to refactor this:

  • keep a shared implementation and configure it from the outside with implementation specific details (the typed-name, parse callback, output type, etc); or
  • create specific instances (e.g., metrics, v1/models) that build a data source from lower level building blocks (e.g., they would have their own TypedName, call client.Get() and then pass the result to an internal parse method, instead of adding parse to the Get parameters).

To clarify, you could still have an HTTPDataSource under pkg/epp/datalayer/http, but add MetricsSource and ModelsSource Go files next to it or in their own directories. By embedding an HTTPDataSource the specific sources could reuse the shared implementation.

Out of curiosuity, any thoughts on preferring one over the other? Either approach works, they can be thought of as composition vs inheritance.
I would lean towards the second approach (e.g., there is no HTTPDataSource registered factory in the system, only ones for metrics and v1/models so the HTTP bits seem like an implemntation detail to me and can be modeled as function "building blocks" and not reuse the full struct, esp. since v1/models would be in a different repo altogether).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the second approach allows less code reuse comparing to the first one

outputType reflect.Type
extractors sync.Map // key: name, value: extractor
}

// NewMetricsDataSource returns a new MSP compliant metrics data source, configured with
// NewHTTPDataSource returns a new data source, configured with
// the provided scheme, path and certificate verification parameters.
func NewMetricsDataSource(metricsScheme string, metricsPath string, skipCertVerification bool) *DataSource {
if metricsScheme == "https" {
func NewHTTPDataSource(scheme string, path string, skipCertVerification bool, pluginType string,
pluginName string, parser func(io.Reader) (any, error), outputType reflect.Type) *HTTPDataSource {
if scheme == "https" {
httpsTransport := baseTransport.Clone()
httpsTransport.TLSClientConfig = &tls.Config{
InsecureSkipVerify: skipCertVerification,
}
defaultClient.Transport = httpsTransport
}

dataSrc := &DataSource{
dataSrc := &HTTPDataSource{
typedName: plugins.TypedName{
Type: MetricsDataSourceType,
Name: MetricsDataSourceType,
Type: pluginType,
Name: pluginName,
},
metricsScheme: metricsScheme,
metricsPath: metricsPath,
client: defaultClient,
scheme: scheme,
path: path,
client: defaultClient,
parser: parser,
outputType: outputType,
}
return dataSrc
}

// TypedName returns the metrics data source type and name.
func (dataSrc *DataSource) TypedName() plugins.TypedName {
// TypedName returns the data source type and name.
func (dataSrc *HTTPDataSource) TypedName() plugins.TypedName {
return dataSrc.typedName
}

// Extractors returns a list of registered Extractor names.
func (dataSrc *DataSource) Extractors() []string {
func (dataSrc *HTTPDataSource) Extractors() []string {
extractors := []string{}
dataSrc.extractors.Range(func(_, val any) bool {
if ex, ok := val.(datalayer.Extractor); ok {
Expand All @@ -80,9 +86,9 @@ func (dataSrc *DataSource) Extractors() []string {
}

// AddExtractor adds an extractor to the data source, validating it can process
// the metrics' data source output type.
func (dataSrc *DataSource) AddExtractor(extractor datalayer.Extractor) error {
if err := datalayer.ValidateExtractorType(PrometheusMetricType, extractor.ExpectedInputType()); err != nil {
// the data source output type.
func (dataSrc *HTTPDataSource) AddExtractor(extractor datalayer.Extractor) error {
if err := datalayer.ValidateExtractorType(dataSrc.outputType, extractor.ExpectedInputType()); err != nil {
return err
}
if _, loaded := dataSrc.extractors.LoadOrStore(extractor.TypedName().Name, extractor); loaded {
Expand All @@ -92,10 +98,10 @@ func (dataSrc *DataSource) AddExtractor(extractor datalayer.Extractor) error {
}

// Collect is triggered by the data layer framework to fetch potentially new
// MSP metrics data for an endpoint.
func (dataSrc *DataSource) Collect(ctx context.Context, ep datalayer.Endpoint) error {
target := dataSrc.getMetricsEndpoint(ep.GetMetadata())
families, err := dataSrc.client.Get(ctx, target, ep.GetMetadata())
// data for an endpoint.
func (dataSrc *HTTPDataSource) Collect(ctx context.Context, ep datalayer.Endpoint) error {
target := dataSrc.getEndpoint(ep.GetMetadata())
data, err := dataSrc.client.Get(ctx, target, ep.GetMetadata(), dataSrc.parser)

if err != nil {
return err
Expand All @@ -104,7 +110,7 @@ func (dataSrc *DataSource) Collect(ctx context.Context, ep datalayer.Endpoint) e
var errs []error
dataSrc.extractors.Range(func(_, val any) bool {
if ex, ok := val.(datalayer.Extractor); ok {
if err = ex.Extract(ctx, families, ep); err != nil {
if err = ex.Extract(ctx, data, ep); err != nil {
errs = append(errs, err)
}
}
Expand All @@ -117,10 +123,12 @@ func (dataSrc *DataSource) Collect(ctx context.Context, ep datalayer.Endpoint) e
return nil
}

func (dataSrc *DataSource) getMetricsEndpoint(ep datalayer.Addressable) *url.URL {
func (dataSrc *HTTPDataSource) getEndpoint(ep datalayer.Addressable) *url.URL {
return &url.URL{
Scheme: dataSrc.metricsScheme,
Scheme: dataSrc.scheme,
Host: ep.GetMetricsHost(),
Path: dataSrc.metricsPath,
Path: dataSrc.path,
}
}

var _ datalayer.DataSource = (*HTTPDataSource)(nil)
6 changes: 4 additions & 2 deletions pkg/epp/datalayer/metrics/datasource_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2025 The Kubernetes Authors.
Copyright 2025, 2026 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -25,10 +25,12 @@ import (
"k8s.io/apimachinery/pkg/types"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
httpds "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/http"
)

func TestDatasource(t *testing.T) {
source := NewMetricsDataSource("https", "/metrics", true)
source := httpds.NewHTTPDataSource("https", "/metrics", true, MetricsDataSourceType,
"metrics-data-source", parseMetrics, PrometheusMetricType)
extractor, err := NewModelServerExtractor(defaultTotalQueuedRequestsMetric, "", "", "", "")
assert.Nil(t, err, "failed to create extractor")

Expand Down
15 changes: 12 additions & 3 deletions pkg/epp/datalayer/metrics/factories.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2025 The Kubernetes Authors.
Copyright 2025, 2026 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -19,10 +19,14 @@ package metrics
import (
"encoding/json"
"fmt"
"io"
"strconv"

"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
flag "github.com/spf13/pflag"

httpds "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer/http"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
)

Expand Down Expand Up @@ -72,8 +76,8 @@ func MetricsDataSourceFactory(name string, parameters json.RawMessage, handle pl
}
}

ds := NewMetricsDataSource(cfg.Scheme, cfg.Path, cfg.InsecureSkipVerify)
ds.typedName.Name = name
ds := httpds.NewHTTPDataSource(cfg.Scheme, cfg.Path, cfg.InsecureSkipVerify, MetricsDataSourceType,
name, parseMetrics, PrometheusMetricType)
return ds, nil
}

Expand Down Expand Up @@ -183,3 +187,8 @@ func fromBoolFlag(name string) (bool, error) {
}
return b, nil
}

func parseMetrics(data io.Reader) (any, error) {
parser := expfmt.NewTextParser(model.LegacyValidation)
return parser.TextToMetricFamilies(data)
}