Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ require (
require (
github.com/go-logr/zapr v1.3.0
github.com/spf13/pflag v1.0.10
go.opentelemetry.io/otel/trace v1.39.0
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what triggered adding this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure actually. I ran go mod tidy and this continuously gets applied, but I do not think this should be triggered by my delta.

sigs.k8s.io/kustomize/api v0.21.0
sigs.k8s.io/kustomize/kyaml v0.21.0
)
Expand Down Expand Up @@ -116,7 +117,6 @@ require (
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0 // indirect
go.opentelemetry.io/otel/metric v1.39.0 // indirect
go.opentelemetry.io/otel/trace v1.39.0 // indirect
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.yaml.in/yaml/v2 v2.4.3 // indirect
Expand Down
13 changes: 12 additions & 1 deletion pkg/epp/util/testing/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/common"
)

// DefaultTestPort is the standard port used for mock model servers in tests.
const DefaultTestPort = 8000

// PodWrapper wraps a Pod.
type PodWrapper struct {
corev1.Pod
Expand All @@ -51,7 +54,8 @@ func MakePod(podName string) *PodWrapper {
}
}

// Complete sets necessary fields for a Pod to make it not denied by the apiserver
// Complete sets necessary fields for a Pod to make it not denied by the apiserver.
// It applies a default container image and ensures the model server port is exposed.
func (p *PodWrapper) Complete() *PodWrapper {
if p.Pod.Namespace == "" {
p.Namespace("default")
Expand All @@ -60,6 +64,13 @@ func (p *PodWrapper) Complete() *PodWrapper {
{
Name: "mock-vllm",
Image: "mock-vllm:latest",
Ports: []corev1.ContainerPort{
{
Name: "http",
ContainerPort: DefaultTestPort,
Protocol: corev1.ProtocolTCP,
},
},
},
}
return p
Expand Down
81 changes: 64 additions & 17 deletions test/integration/epp/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,33 @@ var (

const testPoolName = "vllm-llama3-8b-instruct-pool"

// HarnessConfig holds configuration options for the TestHarness.
type HarnessConfig struct {
// StandaloneMode indicates if the EPP should run without watching Gateway API CRDs.
StandaloneMode bool
}

// HarnessOption is a functional option for configuring the TestHarness.
type HarnessOption func(*HarnessConfig)

// WithStandaloneMode configures the harness to run in Standalone mode.
// In this mode, CRD watchers are disabled and a static EndpointPool is injected.
func WithStandaloneMode() HarnessOption {
return func(c *HarnessConfig) {
c.StandaloneMode = true
}
}

// TestHarness encapsulates the environment for a single isolated EPP test run.
// It manages the lifecycle of the controller manager, the EPP server, and the K8s namespace.
type TestHarness struct {
t *testing.T
ctx context.Context
Namespace string

// --- Config State ---
StandaloneMode bool

Mgr ctrl.Manager
ServerRunner *server.ExtProcServerRunner
Client extProcPb.ExternalProcessor_ProcessClient
Expand All @@ -95,9 +115,14 @@ type TestHarness struct {
// NewTestHarness boots up a fully isolated test environment.
// It creates a unique Namespace, scopes the Manager to that Namespace, and starts the components.
// Note: EPP tests must run serially because they rely on the global Prometheus registry.
func NewTestHarness(t *testing.T, ctx context.Context) *TestHarness {
func NewTestHarness(t *testing.T, ctx context.Context, opts ...HarnessOption) *TestHarness {
t.Helper()

config := &HarnessConfig{}
for _, opt := range opts {
opt(config)
}

// 1. Identity & Namespace Isolation
// We use a unique UUID to ensure that resources from this test do not collide with others.
uid := uuid.New().String()[:8]
Expand Down Expand Up @@ -148,8 +173,24 @@ func NewTestHarness(t *testing.T, ctx context.Context) *TestHarness {

// 5. Dependency Injection (Scheduler, Scorers, Datastore)
pmf := backendmetrics.NewPodMetricsFactory(runner.TestPodMetricsClient, 10*time.Millisecond)

// Configure Datastore based on mode.
// We disable periodic resync (0) to ensure deterministic test behavior.
runner.Datastore = datastore.NewDatastore(ctx, pmf, 0)
if config.StandaloneMode {
// Disable CRD watching for Standalone mode.
runner.ControllerCfg = server.NewControllerConfig(false)

// Inject static Endpoint Pool.
// This replicates the manual pool construction that happens in runner.go CLI parsing.
// TODO(#2174): Refactor this to share logic with runner.go.
endpointPool := datalayer.NewEndpointPool(nsName, testPoolName)
endpointPool.Selector = map[string]string{"app": testPoolName}
endpointPool.TargetPorts = []int{epptestutil.DefaultTestPort}

runner.Datastore = datastore.NewDatastore(ctx, pmf, 0, datastore.WithEndpointPool(endpointPool))
} else {
runner.Datastore = datastore.NewDatastore(ctx, pmf, 0)
}

defaultProfile := framework.NewSchedulerProfile().
WithScorers(
Expand Down Expand Up @@ -206,14 +247,15 @@ func NewTestHarness(t *testing.T, ctx context.Context) *TestHarness {
)

h := &TestHarness{
t: t,
ctx: serverCtx,
Namespace: nsName,
Mgr: mgr,
ServerRunner: runner,
Client: client,
Datastore: runner.Datastore,
grpcConn: conn,
t: t,
ctx: serverCtx,
Namespace: nsName,
StandaloneMode: config.StandaloneMode,
Mgr: mgr,
ServerRunner: runner,
Client: client,
Datastore: runner.Datastore,
grpcConn: conn,
}

// 7. Register Cleanup
Expand Down Expand Up @@ -308,28 +350,33 @@ func (h *TestHarness) WaitForReadyPodsMetric(expectedCount int) {
}, 10*time.Second, 50*time.Millisecond, "Timed out waiting for inference_pool_ready_pods metric to settle")
}

// WaitForSync blocks until the EPP Datastore has synced the expected number of pods and, optionally, a specific model
// objective.
// WaitForSync blocks until the EPP Datastore has synced the expected number of pods.
// In Standard mode, it also waits for the InferencePool CRD to sync.
func (h *TestHarness) WaitForSync(expectedPods int, checkModelObjective string) *TestHarness {
h.t.Helper()
require.Eventually(h.t, func() bool {
if !h.Datastore.PoolHasSynced() {
// If we are NOT in standalone mode, we must wait for the Pool CRD to sync.
// In Standalone mode, there is no CRD controller, so this check is skipped.
if !h.StandaloneMode && !h.Datastore.PoolHasSynced() {
return false
}

if len(h.Datastore.PodList(datastore.AllPodsPredicate)) != expectedPods {
return false
}
if checkModelObjective != "" && h.Datastore.ObjectiveGet(checkModelObjective) == nil {
// In Standalone mode, Objectives are not CRDs, so we skip checking the Objective store unless we add logic to mock
// that too.
// For now, we skip objective verification in Standalone.
if !h.StandaloneMode && checkModelObjective != "" && h.Datastore.ObjectiveGet(checkModelObjective) == nil {
return false
}
return true
}, 10*time.Second, 50*time.Millisecond,
"Datastore sync timed out.\n- PoolSynced: %v\n- Pods Found: %d (Expected: %d)\n- Objective '%s' Found: %v",
"Datastore sync timed out.\n- Mode: Standalone=%v\n- PoolSynced: %v\n- Pods Found: %d (Expected: %d)",
h.StandaloneMode,
h.Datastore.PoolHasSynced(),
len(h.Datastore.PodList(datastore.AllPodsPredicate)),
expectedPods,
checkModelObjective,
h.Datastore.ObjectiveGet(checkModelObjective) != nil,
)
return h
}
Expand Down
78 changes: 50 additions & 28 deletions test/integration/epp/hermetic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,25 @@ func TestMain(m *testing.M) {
}

func TestFullDuplexStreamed_KubeInferenceObjectiveRequest(t *testing.T) {
// executionModes defines the permutations of EPP deployment modes to test.
executionModes := []struct {
name string
standalone bool
}{
{name: "Standard", standalone: false},
{name: "Standalone", standalone: true},
}

tests := []struct {
name string
requests []*extProcPb.ProcessingRequest
pods []podState
wantResponses []*extProcPb.ProcessingResponse
wantMetrics map[string]string
waitForModel string
// requiresCRDs indicates that this test case relies on specific Gateway API CRD features (like
// InferenceModelRewrite) which are not available in Standalone mode.
requiresCRDs bool
}{
// --- Standard Routing Logic ---
{
Expand Down Expand Up @@ -277,6 +289,7 @@ func TestFullDuplexStreamed_KubeInferenceObjectiveRequest(t *testing.T) {
wantMetrics: map[string]string{
"inference_objective_request_total": cleanMetric(metricReqTotal(modelToBeWritten, modelAfterRewrite)),
},
requiresCRDs: true,
},
{
name: "protocol: simple GET (header only)",
Expand Down Expand Up @@ -371,40 +384,49 @@ func TestFullDuplexStreamed_KubeInferenceObjectiveRequest(t *testing.T) {
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
// Resolve Model to Sync.
modelToSync := tc.waitForModel
if modelToSync == "" {
modelToSync = modelMyModel
}
for _, mode := range executionModes {
t.Run(mode.name, func(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
if mode.standalone && tc.requiresCRDs {
t.Skipf("Skipping test %q: requires CRDs, but running in Standalone mode", tc.name)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@capri-xiyue I expect to support standalone with inference CRDs, standalone can have two modes of operation, one with the Inference* CRDs (a richer mode of operation that allows using InfObj for example) and one without (using pod selector and completely decoupled from all CRDs). The stable thing in standalone is running without the Gateway API dependency.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mode.standalone may have been a misunderstanding on my part. You can consider this PR covering:

one without (using pod selector and completely decoupled from all CRDs)

We can add a third harness for standalone w/ CRDs.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I agree that we can add a third harness for standalone w/ CRDs.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, @capri-xiyue can you pls add that in a follow up PR?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

}

h := NewTestHarness(t, context.Background()).
WithBaseResources().
WithPods(tc.pods).
WaitForSync(len(tc.pods), modelToSync)
var h *TestHarness
if mode.standalone {
h = NewTestHarness(t, context.Background(), WithStandaloneMode())
} else {
h = NewTestHarness(t, context.Background()).WithBaseResources()
}

// Wait for metrics to settle to avoid race conditions where Datastore has pods but Scheduler/Metrics collector
// hasn't processed them yet (causing random scheduling or missing metrics).
if len(tc.pods) > 0 {
h.WaitForReadyPodsMetric(len(tc.pods))
}
// In Standalone mode, we cannot wait for an Objective CRD to sync as it doesn't exist.
// We only wait for Pod discovery.
modelToSync := tc.waitForModel
if modelToSync == "" {
modelToSync = modelMyModel
}

responses, err := integration.StreamedRequest(t, h.Client, tc.requests, len(tc.wantResponses))
h.WithPods(tc.pods).WaitForSync(len(tc.pods), modelToSync)
if len(tc.pods) > 0 {
h.WaitForReadyPodsMetric(len(tc.pods))
}

require.NoError(t, err)
responses, err := integration.StreamedRequest(t, h.Client, tc.requests, len(tc.wantResponses))
require.NoError(t, err)

if diff := cmp.Diff(tc.wantResponses, responses,
protocmp.Transform(),
protocmp.SortRepeated(func(a, b *configPb.HeaderValueOption) bool {
return a.GetHeader().GetKey() < b.GetHeader().GetKey()
}),
); diff != "" {
t.Errorf("Response mismatch (-want +got): %v", diff)
}
if diff := cmp.Diff(tc.wantResponses, responses,
protocmp.Transform(),
protocmp.SortRepeated(func(a, b *configPb.HeaderValueOption) bool {
return a.GetHeader().GetKey() < b.GetHeader().GetKey()
}),
); diff != "" {
t.Errorf("Response mismatch (-want +got): %v", diff)
}

if len(tc.wantMetrics) > 0 {
h.ExpectMetrics(tc.wantMetrics)
if len(tc.wantMetrics) > 0 {
h.ExpectMetrics(tc.wantMetrics)
}
})
}
})
}
Expand Down