diff --git a/cmd/vela-worker/exec.go b/cmd/vela-worker/exec.go index c6033c8d..62aa1198 100644 --- a/cmd/vela-worker/exec.go +++ b/cmd/vela-worker/exec.go @@ -132,7 +132,7 @@ func (w *Worker) exec(index int) error { return nil } - // log streaming uses buildCtx so that it is not subject to the timeout. + // log/event streaming uses buildCtx so that it is not subject to the timeout. go func() { logger.Info("streaming build logs") // execute the build with the executor diff --git a/executor/linux/build.go b/executor/linux/build.go index b488aae0..c31e2938 100644 --- a/executor/linux/build.go +++ b/executor/linux/build.go @@ -510,6 +510,13 @@ func (c *client) StreamBuild(ctx context.Context) error { c.Logger.Info("all stream functions have returned") }() + // allow the runtime to do log/event streaming setup at build-level + streams.Go(func() error { + // If needed, the runtime should handle synchronizing with + // AssembleBuild which runs concurrently with StreamBuild. + return c.Runtime.StreamBuild(streamCtx, c.pipeline) + }) + for { select { case req := <-c.streamRequests: diff --git a/executor/local/build.go b/executor/local/build.go index c079787b..70556d9e 100644 --- a/executor/local/build.go +++ b/executor/local/build.go @@ -363,6 +363,13 @@ func (c *client) StreamBuild(ctx context.Context) error { fmt.Fprintln(os.Stdout, "all stream functions have returned") }() + // allow the runtime to do log/event streaming setup at build-level + streams.Go(func() error { + // If needed, the runtime should handle synchronizing with + // AssembleBuild which runs concurrently with StreamBuild. + return c.Runtime.StreamBuild(streamCtx, c.pipeline) + }) + for { select { case req := <-c.streamRequests: diff --git a/runtime/docker/build.go b/runtime/docker/build.go index 305422ba..e47145b2 100644 --- a/runtime/docker/build.go +++ b/runtime/docker/build.go @@ -26,6 +26,14 @@ func (c *client) SetupBuild(ctx context.Context, b *pipeline.Build) error { return nil } +// StreamBuild initializes log/event streaming for build. +// This is a no-op for docker. +func (c *client) StreamBuild(ctx context.Context, b *pipeline.Build) error { + c.Logger.Tracef("no-op: streaming build %s", b.ID) + + return nil +} + // AssembleBuild finalizes pipeline build setup. // This is a no-op for docker. func (c *client) AssembleBuild(ctx context.Context, b *pipeline.Build) error { diff --git a/runtime/docker/build_test.go b/runtime/docker/build_test.go index 35b61dae..1b678f8e 100644 --- a/runtime/docker/build_test.go +++ b/runtime/docker/build_test.go @@ -91,6 +91,41 @@ func TestDocker_SetupBuild(t *testing.T) { } } +func TestKubernetes_StreamBuild(t *testing.T) { + tests := []struct { + name string + failure bool + pipeline *pipeline.Build + }{ + { + name: "steps", + failure: false, + pipeline: _pipeline, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + _engine, err := NewMock() + if err != nil { + t.Errorf("unable to create runtime engine: %v", err) + } + + err = _engine.StreamBuild(context.Background(), test.pipeline) + + if test.failure { + if err == nil { + t.Errorf("StreamBuild should have returned err") + } + + return // continue to next test + } + + if err != nil { + t.Errorf("StreamBuild returned err: %v", err) + } + }) + } +} func TestDocker_AssembleBuild(t *testing.T) { // setup tests tests := []struct { diff --git a/runtime/engine.go b/runtime/engine.go index 918f55c1..6b538a4c 100644 --- a/runtime/engine.go +++ b/runtime/engine.go @@ -29,6 +29,10 @@ type Engine interface { // SetupBuild defines a function that // prepares the pipeline build. SetupBuild(context.Context, *pipeline.Build) error + // StreamBuild defines a function that initializes + // log/event streaming if the runtime needs it. + // StreamBuild and AssembleBuild run concurrently. + StreamBuild(context.Context, *pipeline.Build) error // AssembleBuild defines a function that // finalizes pipeline build setup. AssembleBuild(context.Context, *pipeline.Build) error diff --git a/runtime/kubernetes/build.go b/runtime/kubernetes/build.go index 3938cfef..ad94681f 100644 --- a/runtime/kubernetes/build.go +++ b/runtime/kubernetes/build.go @@ -7,12 +7,14 @@ package kubernetes import ( "context" "fmt" + "time" "github.com/go-vela/types/pipeline" "github.com/go-vela/worker/runtime/kubernetes/apis/vela/v1alpha1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" // The k8s libraries have some quirks around yaml marshaling (see opts.go). // So, just use the same library for all kubernetes-related YAML. @@ -79,6 +81,7 @@ func (c *client) SetupBuild(ctx context.Context, b *pipeline.Build) error { // https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1?tab=doc#ObjectMeta c.Pod.ObjectMeta = metav1.ObjectMeta{ Name: b.ID, + Namespace: c.config.Namespace, // this is used by the podTracker Labels: labels, Annotations: c.PipelinePodTemplate.Metadata.Annotations, } @@ -124,6 +127,33 @@ func (c *client) SetupBuild(ctx context.Context, b *pipeline.Build) error { } } + // initialize the PodTracker now that we have a Pod for it to track + tracker, err := newPodTracker(c.Logger, c.Kubernetes, c.Pod, time.Second*30) + if err != nil { + return err + } + + c.PodTracker = tracker + + return nil +} + +// StreamBuild initializes log/event streaming for build. +func (c *client) StreamBuild(ctx context.Context, b *pipeline.Build) error { + c.Logger.Tracef("streaming build %s", b.ID) + + select { + case <-ctx.Done(): + // bail out, as build timed out or was canceled. + return nil + case <-c.PodTracker.Ready: + // AssembleBuild signaled that the PodTracker is ready. + break + } + + // Populate the PodTracker caches before creating the pipeline pod + c.PodTracker.Start(ctx) + return nil } @@ -182,6 +212,17 @@ func (c *client) AssembleBuild(ctx context.Context, b *pipeline.Build) error { } } + // setup containerTeachers now that all containers are defined. + c.PodTracker.TrackContainers(c.Pod.Spec.Containers) + + // send signal to StreamBuild now that PodTracker is ready to be started. + close(c.PodTracker.Ready) + + // wait for the PodTracker caches to populate before creating the pipeline pod. + if ok := cache.WaitForCacheSync(ctx.Done(), c.PodTracker.PodSynced); !ok { + return fmt.Errorf("failed to wait for caches to sync") + } + // If the api call to create the pod fails, the pod might // partially exist. So, set this first to make sure all // remnants get deleted. @@ -206,6 +247,15 @@ func (c *client) AssembleBuild(ctx context.Context, b *pipeline.Build) error { func (c *client) RemoveBuild(ctx context.Context, b *pipeline.Build) error { c.Logger.Tracef("removing build %s", b.ID) + // PodTracker gets created in SetupBuild before pod is created + defer func() { + // check for nil as RemoveBuild may get called multiple times + if c.PodTracker != nil { + c.PodTracker.Stop() + c.PodTracker = nil + } + }() + if !c.createdPod { // nothing to do return nil diff --git a/runtime/kubernetes/build_test.go b/runtime/kubernetes/build_test.go index d1d06215..50f304ea 100644 --- a/runtime/kubernetes/build_test.go +++ b/runtime/kubernetes/build_test.go @@ -376,6 +376,82 @@ func TestKubernetes_SetupBuild(t *testing.T) { } } +func TestKubernetes_StreamBuild(t *testing.T) { + tests := []struct { + name string + failure bool + doCancel bool + doReady bool + pipeline *pipeline.Build + pod *v1.Pod + }{ + { + name: "stages canceled", + failure: false, + doCancel: true, + pipeline: _stages, + pod: _stagesPod, + }, + { + name: "steps canceled", + failure: false, + doCancel: true, + pipeline: _steps, + pod: _pod, + }, + { + name: "stages ready", + failure: false, + doReady: true, + pipeline: _stages, + pod: _stagesPod, + }, + { + name: "steps ready", + failure: false, + doReady: true, + pipeline: _steps, + pod: _pod, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + _engine, err := NewMock(test.pod) + if err != nil { + t.Errorf("unable to create runtime engine: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // StreamBuild and AssembleBuild coordinate their work. + go func() { + if test.doCancel { + // simulate canceled build + cancel() + } else if test.doReady { + // simulate AssembleBuild + close(_engine.PodTracker.Ready) + } + }() + + err = _engine.StreamBuild(ctx, test.pipeline) + + if test.failure { + if err == nil { + t.Errorf("StreamBuild should have returned err") + } + + return // continue to next test + } + + if err != nil { + t.Errorf("StreamBuild returned err: %v", err) + } + }) + } +} + func TestKubernetes_AssembleBuild(t *testing.T) { // setup tests tests := []struct { @@ -421,6 +497,10 @@ func TestKubernetes_AssembleBuild(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { _engine, err := NewMock(test.k8sPod) + if err != nil { + t.Errorf("unable to create runtime engine: %v", err) + } + _engine.Pod = test.enginePod _engine.containersLookup = map[string]int{} @@ -428,9 +508,14 @@ func TestKubernetes_AssembleBuild(t *testing.T) { _engine.containersLookup[ctn.Name] = i } - if err != nil { - t.Errorf("unable to create runtime engine: %v", err) - } + // StreamBuild and AssembleBuild coordinate their work, so, emulate + // executor.StreamBuild which calls runtime.StreamBuild concurrently. + go func() { + err := _engine.StreamBuild(context.Background(), test.pipeline) + if err != nil { + t.Errorf("unable to start PodTracker via StreamBuild") + } + }() err = _engine.AssembleBuild(context.Background(), test.pipeline) diff --git a/runtime/kubernetes/container.go b/runtime/kubernetes/container.go index 430a9e8a..28e0932d 100644 --- a/runtime/kubernetes/container.go +++ b/runtime/kubernetes/container.go @@ -19,7 +19,6 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" ) @@ -28,15 +27,10 @@ import ( func (c *client) InspectContainer(ctx context.Context, ctn *pipeline.Container) error { c.Logger.Tracef("inspecting container %s", ctn.ID) - // create options for getting the container - opts := metav1.GetOptions{} - - // send API call to capture the container - // - // https://pkg.go.dev/k8s.io/client-go/kubernetes/typed/core/v1?tab=doc#PodInterface - pod, err := c.Kubernetes.CoreV1(). + // get the pod from the local cache, which the Informer keeps up-to-date + pod, err := c.PodTracker.PodLister. Pods(c.config.Namespace). - Get(ctx, c.Pod.ObjectMeta.Name, opts) + Get(c.Pod.ObjectMeta.Name) if err != nil { return err } @@ -321,74 +315,55 @@ func (c *client) TailContainer(ctx context.Context, ctn *pipeline.Container) (io func (c *client) WaitContainer(ctx context.Context, ctn *pipeline.Container) error { c.Logger.Tracef("waiting for container %s", ctn.ID) - // create label selector for watching the pod - selector := fmt.Sprintf("pipeline=%s", fields.EscapeValue(c.Pod.ObjectMeta.Name)) - - // create options for watching the container - opts := metav1.ListOptions{ - LabelSelector: selector, - Watch: true, + // get the containerTracker for this container + tracker, ok := c.PodTracker.Containers[ctn.ID] + if !ok { + return fmt.Errorf("containerTracker is missing for %s", ctn.ID) } - // send API call to capture channel for watching the container - // - // https://pkg.go.dev/k8s.io/client-go/kubernetes/typed/core/v1?tab=doc#PodInterface - // -> - // https://pkg.go.dev/k8s.io/apimachinery/pkg/watch?tab=doc#Interface - podWatch, err := c.Kubernetes.CoreV1(). - Pods(c.config.Namespace). - Watch(ctx, opts) - if err != nil { - return err - } + // wait for the container terminated signal + <-tracker.Terminated - defer podWatch.Stop() + return nil +} - for { - // capture new result from the channel - // - // https://pkg.go.dev/k8s.io/apimachinery/pkg/watch?tab=doc#Interface - result := <-podWatch.ResultChan() +// inspectContainerStatuses signals when a container reaches a terminal state. +func (p *podTracker) inspectContainerStatuses(pod *v1.Pod) { + // check if the pod is in a pending state + // + // https://pkg.go.dev/k8s.io/api/core/v1?tab=doc#PodStatus + if pod.Status.Phase == v1.PodPending { + p.Logger.Debugf("skipping container status inspection as pod %s is pending", p.TrackedPod) + + // nothing to inspect if pod is in a pending state + return + } - // convert the object from the result to a pod - pod, ok := result.Object.(*v1.Pod) + // iterate through each container in the pod + for _, cst := range pod.Status.ContainerStatuses { + // get the containerTracker for this container + tracker, ok := p.Containers[cst.Name] if !ok { - return fmt.Errorf("unable to watch pod %s", c.Pod.ObjectMeta.Name) - } + // unknown container (probably a sidecar injected by an admissions controller) + p.Logger.Debugf("ignoring untracked container %s from pod %s", cst.Name, p.TrackedPod) - // check if the pod is in a pending state - // - // https://pkg.go.dev/k8s.io/api/core/v1?tab=doc#PodStatus - if pod.Status.Phase == v1.PodPending { - // skip pod if it's in a pending state continue } - // iterate through each container in the pod - for _, cst := range pod.Status.ContainerStatuses { - // check if the container has a matching ID - // - // https://pkg.go.dev/k8s.io/api/core/v1?tab=doc#ContainerStatus - if !strings.EqualFold(cst.Name, ctn.ID) { - // skip container if it's not a matching ID - continue - } - - // check if the container is in a terminated state - // - // https://pkg.go.dev/k8s.io/api/core/v1?tab=doc#ContainerState - if cst.State.Terminated == nil { - // skip container if it's not in a terminated state - break - } + // cst.State has details about the cst.Image's exit. + // cst.LastTerminationState has details about the kubernetes/pause image's exit. + // cst.RestartCount is 1 at exit due to switch from kubernetes/pause to final image. - // check if the container has a terminated state reason - // - // https://pkg.go.dev/k8s.io/api/core/v1?tab=doc#ContainerStateTerminated - if len(cst.State.Terminated.Reason) > 0 { - // break watching the container as it's complete - return nil - } + // check if the container is in a terminated state + // + // https://pkg.go.dev/k8s.io/api/core/v1?tab=doc#ContainerState + if cst.State.Terminated != nil { + tracker.terminatedOnce.Do(func() { + p.Logger.Debugf("container completed: %s in pod %s, %v", cst.Name, p.TrackedPod, cst) + + // let WaitContainer know the container is terminated + close(tracker.Terminated) + }) } } } diff --git a/runtime/kubernetes/container_test.go b/runtime/kubernetes/container_test.go index d71d3753..74369658 100644 --- a/runtime/kubernetes/container_test.go +++ b/runtime/kubernetes/container_test.go @@ -13,9 +13,8 @@ import ( "github.com/go-vela/worker/internal/image" velav1alpha1 "github.com/go-vela/worker/runtime/kubernetes/apis/vela/v1alpha1" + "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" ) func TestKubernetes_InspectContainer(t *testing.T) { @@ -424,33 +423,29 @@ func TestKubernetes_WaitContainer(t *testing.T) { name string failure bool container *pipeline.Container - object runtime.Object + oldPod *v1.Pod + newPod *v1.Pod }{ { name: "default order in ContainerStatuses", failure: false, container: _container, - object: _pod, + oldPod: _pod, + newPod: _pod, }, { name: "inverted order in ContainerStatuses", failure: false, container: _container, - object: &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "github-octocat-1", - Namespace: "test", - Labels: map[string]string{ - "pipeline": "github-octocat-1", - }, - }, - TypeMeta: metav1.TypeMeta{ - APIVersion: "v1", - Kind: "Pod", - }, + oldPod: _pod, + newPod: &v1.Pod{ + ObjectMeta: _pod.ObjectMeta, + TypeMeta: _pod.TypeMeta, + Spec: _pod.Spec, Status: v1.PodStatus{ Phase: v1.PodRunning, ContainerStatuses: []v1.ContainerStatus{ + // alternate order { Name: "step-github-octocat-1-echo", State: v1.ContainerState{ @@ -476,25 +471,57 @@ func TestKubernetes_WaitContainer(t *testing.T) { }, }, { - name: "watch returns invalid type", + name: "container goes from running to terminated", + failure: false, + container: _container, + oldPod: &v1.Pod{ + ObjectMeta: _pod.ObjectMeta, + TypeMeta: _pod.TypeMeta, + Spec: _pod.Spec, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + ContainerStatuses: []v1.ContainerStatus{ + { + Name: "step-github-octocat-1-clone", + State: v1.ContainerState{ + Running: &v1.ContainerStateRunning{}, + }, + }, + }, + }, + }, + newPod: _pod, + }, + { + name: "if client.Pod.Spec is empty podTracker fails", failure: true, container: _container, - object: new(v1.PodTemplate), + oldPod: _pod, + newPod: &v1.Pod{ + ObjectMeta: _pod.ObjectMeta, + TypeMeta: _pod.TypeMeta, + Status: _pod.Status, + // if client.Pod.Spec is empty, podTracker will fail + //Spec: _pod.Spec, + }, }, } // run tests for _, test := range tests { t.Run(test.name, func(t *testing.T) { - // setup types - _engine, _watch, err := newMockWithWatch(_pod, "pods") + // set up the fake k8s clientset so that it returns the final/updated state + _engine, err := NewMock(test.newPod) if err != nil { t.Errorf("unable to create runtime engine: %v", err) } go func() { - // simulate adding a pod to the watcher - _watch.Add(test.object) + oldPod := test.oldPod.DeepCopy() + oldPod.SetResourceVersion("older") + + // simulate a re-sync/PodUpdate event + _engine.PodTracker.HandlePodUpdate(oldPod, _engine.Pod) }() err = _engine.WaitContainer(context.Background(), test.container) @@ -513,3 +540,123 @@ func TestKubernetes_WaitContainer(t *testing.T) { }) } } + +func Test_podTracker_inspectContainerStatuses(t *testing.T) { + // setup types + logger := logrus.NewEntry(logrus.StandardLogger()) + + tests := []struct { + name string + trackedPod string + ctnName string + terminated bool + pod *v1.Pod + }{ + { + name: "container is terminated", + trackedPod: "test/github-octocat-1", + ctnName: "step-github-octocat-1-clone", + terminated: true, + pod: _pod, + }, + { + name: "pod is pending", + trackedPod: "test/github-octocat-1", + ctnName: "step-github-octocat-1-clone", + terminated: false, + pod: &v1.Pod{ + ObjectMeta: _pod.ObjectMeta, + TypeMeta: _pod.TypeMeta, + Spec: _pod.Spec, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + }, + }, + { + name: "container is running", + trackedPod: "test/github-octocat-1", + ctnName: "step-github-octocat-1-clone", + terminated: false, + pod: &v1.Pod{ + ObjectMeta: _pod.ObjectMeta, + TypeMeta: _pod.TypeMeta, + Spec: _pod.Spec, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + ContainerStatuses: []v1.ContainerStatus{ + { + Name: "step-github-octocat-1-clone", + State: v1.ContainerState{ + Running: &v1.ContainerStateRunning{}, + }, + }, + }, + }, + }, + }, + { + name: "pod has an untracked container", + trackedPod: "test/github-octocat-1", + ctnName: "step-github-octocat-1-clone", + terminated: true, + pod: &v1.Pod{ + ObjectMeta: _pod.ObjectMeta, + TypeMeta: _pod.TypeMeta, + Spec: _pod.Spec, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + ContainerStatuses: []v1.ContainerStatus{ + { + Name: "step-github-octocat-1-clone", + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{ + Reason: "Completed", + ExitCode: 0, + }, + }, + }, + { + Name: "injected-by-admissions-controller", + State: v1.ContainerState{ + Running: &v1.ContainerStateRunning{}, + }, + }, + }, + }, + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctnTracker := containerTracker{ + Name: test.ctnName, + Terminated: make(chan struct{}), + } + podTracker := podTracker{ + Logger: logger, + TrackedPod: test.trackedPod, + Containers: map[string]*containerTracker{}, + // other fields not used by inspectContainerStatuses + // if they're needed, use newPodTracker + } + podTracker.Containers[test.ctnName] = &ctnTracker + + podTracker.inspectContainerStatuses(test.pod) + + func() { + defer func() { + // nolint: errcheck // repeat close() panics (otherwise it won't) + recover() + }() + + close(ctnTracker.Terminated) + + // this will only run if close() did not panic + if test.terminated { + t.Error("inspectContainerStatuses should have signaled termination") + } + }() + }) + } +} diff --git a/runtime/kubernetes/kubernetes.go b/runtime/kubernetes/kubernetes.go index f35c4188..5532bafb 100644 --- a/runtime/kubernetes/kubernetes.go +++ b/runtime/kubernetes/kubernetes.go @@ -44,6 +44,8 @@ type client struct { Pod *v1.Pod // containersLookup maps the container name to its index in Containers containersLookup map[string]int + // PodTracker wraps the Kubernetes client to simplify watching the pod for changes + PodTracker *podTracker // PipelinePodTemplate has default values to be used in Setup* methods PipelinePodTemplate *velav1alpha1.PipelinePodTemplate // commonVolumeMounts includes workspace mount and any global host mounts (VELA_RUNTIME_VOLUMES) @@ -163,7 +165,8 @@ func NewMock(_pod *v1.Pod, opts ...ClientOpt) (*client, error) { c.config.Namespace = "test" // set the Kubernetes pod in the runtime client - c.Pod = _pod + c.Pod = _pod.DeepCopy() + c.Pod.SetResourceVersion("0") // apply all provided configuration options for _, opt := range opts { @@ -188,5 +191,15 @@ func NewMock(_pod *v1.Pod, opts ...ClientOpt) (*client, error) { }, ) + // set the PodTracker (normally populated in SetupBuild) + tracker, err := mockPodTracker(c.Logger, c.Kubernetes, c.Pod) + if err != nil { + return c, err + } + + c.PodTracker = tracker + + // The test is responsible for calling c.PodTracker.Start() if needed + return c, nil } diff --git a/runtime/kubernetes/kubernetes_test.go b/runtime/kubernetes/kubernetes_test.go index 364a90b0..60e2a5ed 100644 --- a/runtime/kubernetes/kubernetes_test.go +++ b/runtime/kubernetes/kubernetes_test.go @@ -11,9 +11,6 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes/fake" - testcore "k8s.io/client-go/testing" ) func TestKubernetes_New(t *testing.T) { @@ -325,39 +322,3 @@ var ( }, } ) - -// newMockWithWatch returns an Engine implementation that -// integrates with a Kubernetes runtime and a FakeWatcher -// that can be used to inject resource events into it. -func newMockWithWatch(pod *v1.Pod, watchResource string, opts ...ClientOpt) (*client, *watch.RaceFreeFakeWatcher, error) { - // setup types - _engine, err := NewMock(pod, opts...) - if err != nil { - return nil, nil, err - } - - // create a new fake kubernetes client - // - // https://pkg.go.dev/k8s.io/client-go/kubernetes/fake?tab=doc#NewSimpleClientset - _kubernetes := fake.NewSimpleClientset(pod) - - // create a new fake watcher - // - // https://pkg.go.dev/k8s.io/apimachinery/pkg/watch?tab=doc#NewRaceFreeFake - _watch := watch.NewRaceFreeFake() - - // create a new watch reactor with the fake watcher - // - // https://pkg.go.dev/k8s.io/client-go/testing?tab=doc#DefaultWatchReactor - reactor := testcore.DefaultWatchReactor(_watch, nil) - - // add watch reactor to beginning of the client chain - // - // https://pkg.go.dev/k8s.io/client-go/testing?tab=doc#Fake.PrependWatchReactor - _kubernetes.PrependWatchReactor(watchResource, reactor) - - // overwrite the mock kubernetes client - _engine.Kubernetes = _kubernetes - - return _engine, _watch, nil -} diff --git a/runtime/kubernetes/pod_tracker.go b/runtime/kubernetes/pod_tracker.go new file mode 100644 index 00000000..221e881f --- /dev/null +++ b/runtime/kubernetes/pod_tracker.go @@ -0,0 +1,264 @@ +// Copyright (c) 2022 Target Brands, Inc. All rights reserved. +// +// Use of this source code is governed by the LICENSE file in this repository. + +package kubernetes + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/sirupsen/logrus" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" + kubeinformers "k8s.io/client-go/informers" + informers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" + listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" +) + +// containerTracker contains useful signals that are managed by the podTracker. +type containerTracker struct { + // Name is the name of the container + Name string + // terminatedOnce ensures that the Terminated channel only gets closed once. + terminatedOnce sync.Once + // Terminated will be closed once the container reaches a terminal state. + Terminated chan struct{} + // TODO: collect streaming logs here before TailContainer is called +} + +// podTracker contains Informers used to watch and synchronize local k8s caches. +// This is similar to a typical Kubernetes controller (eg like k8s.io/sample-controller.Controller). +type podTracker struct { + // https://pkg.go.dev/github.com/sirupsen/logrus#Entry + Logger *logrus.Entry + // TrackedPod is the Namespace/Name of the tracked pod + TrackedPod string + + // informerFactory is used to create Informers and Listers + informerFactory kubeinformers.SharedInformerFactory + // informerDone is a function used to stop the informerFactory + informerDone context.CancelFunc + // podInformer watches the given pod, caches the results, and makes them available in podLister + podInformer informers.PodInformer + + // PodLister helps list Pods. All objects returned here must be treated as read-only. + PodLister listers.PodLister + // PodSynced is a function that can be used to determine if an informer has synced. + // This is useful for determining if caches have synced. + PodSynced cache.InformerSynced + + // Containers maps the container name to a containerTracker + Containers map[string]*containerTracker + + // Ready signals when the PodTracker is done with setup and ready to Start. + Ready chan struct{} +} + +// HandlePodAdd is an AddFunc for cache.ResourceEventHandlerFuncs for Pods. +func (p *podTracker) HandlePodAdd(newObj interface{}) { + newPod := p.getTrackedPod(newObj) + if newPod == nil { + // not valid or not our tracked pod + return + } + + p.Logger.Tracef("handling pod add event for %s", p.TrackedPod) + + p.inspectContainerStatuses(newPod) +} + +// HandlePodUpdate is an UpdateFunc for cache.ResourceEventHandlerFuncs for Pods. +func (p *podTracker) HandlePodUpdate(oldObj, newObj interface{}) { + oldPod := p.getTrackedPod(oldObj) + newPod := p.getTrackedPod(newObj) + + if oldPod == nil || newPod == nil { + // not valid or not our tracked pod + return + } + // if we need to optimize and avoid the resync update events, we can do this: + //if newPod.ResourceVersion == oldPod.ResourceVersion { + // // Periodic resync will send update events for all known Pods + // // If ResourceVersion is the same we have to look harder for Status changes + // if newPod.Status.Phase == oldPod.Status.Phase && newPod.Status.Size() == oldPod.Status.Size() { + // return + // } + //} + + p.Logger.Tracef("handling pod update event for %s", p.TrackedPod) + + p.inspectContainerStatuses(newPod) +} + +// HandlePodDelete is an DeleteFunc for cache.ResourceEventHandlerFuncs for Pods. +func (p *podTracker) HandlePodDelete(oldObj interface{}) { + oldPod := p.getTrackedPod(oldObj) + if oldPod == nil { + // not valid or not our tracked pod + return + } + + p.Logger.Tracef("handling pod delete event for %s", p.TrackedPod) + + p.inspectContainerStatuses(oldPod) +} + +// getTrackedPod tries to convert the obj into a Pod and makes sure it is the tracked Pod. +// This should only be used by the funcs of cache.ResourceEventHandlerFuncs. +func (p *podTracker) getTrackedPod(obj interface{}) *v1.Pod { + var ( + pod *v1.Pod + ok bool + ) + + if pod, ok = obj.(*v1.Pod); !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + p.Logger.Errorf("error decoding pod, invalid type") + return nil + } + + pod, ok = tombstone.Obj.(*v1.Pod) + if !ok { + p.Logger.Errorf("error decoding pod tombstone, invalid type") + return nil + } + } + + trackedPod := pod.GetNamespace() + "/" + pod.GetName() + if trackedPod != p.TrackedPod { + p.Logger.Errorf("error got unexpected pod: %s", trackedPod) + return nil + } + + return pod +} + +// Start kicks off the API calls to start populating the cache. +// There is no need to run this in a separate goroutine (ie go podTracker.Start(ctx)). +func (p *podTracker) Start(ctx context.Context) { + p.Logger.Tracef("starting PodTracker for pod %s", p.TrackedPod) + + informerCtx, done := context.WithCancel(ctx) + p.informerDone = done + + // Start method is non-blocking and runs all registered informers in a dedicated goroutine. + p.informerFactory.Start(informerCtx.Done()) +} + +// Stop shuts down any informers (e.g. stop watching APIs). +func (p *podTracker) Stop() { + p.Logger.Tracef("stopping PodTracker for pod %s", p.TrackedPod) + + if p.informerDone != nil { + p.informerDone() + } +} + +// TrackContainers creates a containerTracker for each container. +func (p *podTracker) TrackContainers(containers []v1.Container) { + p.Logger.Tracef("tracking %d more containers for pod %s", len(containers), p.TrackedPod) + + if p.Containers == nil { + p.Containers = map[string]*containerTracker{} + } + + for _, ctn := range containers { + p.Containers[ctn.Name] = &containerTracker{ + Name: ctn.Name, + Terminated: make(chan struct{}), + } + } +} + +// newPodTracker initializes a podTracker with a given clientset for a given pod. +func newPodTracker(log *logrus.Entry, clientset kubernetes.Interface, pod *v1.Pod, defaultResync time.Duration) (*podTracker, error) { + if pod == nil { + return nil, fmt.Errorf("newPodTracker expected a pod, got nil") + } + + trackedPod := pod.ObjectMeta.Namespace + "/" + pod.ObjectMeta.Name + if pod.ObjectMeta.Name == "" || pod.ObjectMeta.Namespace == "" { + return nil, fmt.Errorf("newPodTracker expects pod to have Name and Namespace, got %s", trackedPod) + } + + log.Tracef("creating PodTracker for pod %s", trackedPod) + + // create label selector for watching the pod + selector, err := labels.NewRequirement( + "pipeline", + selection.Equals, + []string{fields.EscapeValue(pod.ObjectMeta.Name)}, + ) + if err != nil { + return nil, err + } + + // create filtered Informer factory which is commonly used for k8s controllers + informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions( + clientset, + defaultResync, + kubeinformers.WithNamespace(pod.ObjectMeta.Namespace), + kubeinformers.WithTweakListOptions(func(listOptions *metav1.ListOptions) { + listOptions.LabelSelector = selector.String() + }), + ) + podInformer := informerFactory.Core().V1().Pods() + + // initialize podTracker + tracker := podTracker{ + Logger: log, + TrackedPod: trackedPod, + informerFactory: informerFactory, + podInformer: podInformer, + PodLister: podInformer.Lister(), + PodSynced: podInformer.Informer().HasSynced, + Ready: make(chan struct{}), + } + + // register event handler funcs in podInformer + podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: tracker.HandlePodAdd, + UpdateFunc: tracker.HandlePodUpdate, + DeleteFunc: tracker.HandlePodDelete, + }) + + return &tracker, nil +} + +// mockPodTracker returns a new podTracker with the given pod pre-loaded in the cache. +func mockPodTracker(log *logrus.Entry, clientset kubernetes.Interface, pod *v1.Pod) (*podTracker, error) { + // Make sure test pods are valid before passing to PodTracker (ie support &v1.Pod{}). + if pod.ObjectMeta.Name == "" { + pod.ObjectMeta.Name = "test-pod" + } + + if pod.ObjectMeta.Namespace == "" { + pod.ObjectMeta.Namespace = "test" + } + + tracker, err := newPodTracker(log, clientset, pod, 0*time.Second) + if err != nil { + return nil, err + } + + // init containerTrackers as well + tracker.TrackContainers(pod.Spec.Containers) + + // pre-populate the podInformer cache + err = tracker.podInformer.Informer().GetIndexer().Add(pod) + if err != nil { + return nil, err + } + + return tracker, err +} diff --git a/runtime/kubernetes/pod_tracker_test.go b/runtime/kubernetes/pod_tracker_test.go new file mode 100644 index 00000000..79b84621 --- /dev/null +++ b/runtime/kubernetes/pod_tracker_test.go @@ -0,0 +1,379 @@ +// Copyright (c) 2022 Target Brands, Inc. All rights reserved. +// +// Use of this source code is governed by the LICENSE file in this repository. + +package kubernetes + +import ( + "context" + "reflect" + "testing" + "time" + + "github.com/sirupsen/logrus" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" +) + +func TestNewPodTracker(t *testing.T) { + // setup types + logger := logrus.NewEntry(logrus.StandardLogger()) + clientset := fake.NewSimpleClientset() + + tests := []struct { + name string + pod *v1.Pod + wantErr bool + }{ + { + name: "pass-with-pod", + pod: _pod, + wantErr: false, + }, + { + name: "error-with-nil-pod", + pod: nil, + wantErr: true, + }, + { + name: "error-with-empty-pod", + pod: &v1.Pod{}, + wantErr: true, + }, + { + name: "error-with-pod-without-namespace", + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pod"}, + }, + wantErr: true, + }, + { + name: "fail-with-pod", + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "github-octocat-1-for-some-odd-reason-this-name-is-way-too-long-and-will-cause-an-error", + Namespace: _pod.ObjectMeta.Namespace, + Labels: _pod.ObjectMeta.Labels, + }, + TypeMeta: _pod.TypeMeta, + Spec: _pod.Spec, + Status: _pod.Status, + }, + wantErr: true, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + _, err := newPodTracker(logger, clientset, test.pod, 0*time.Second) + if (err != nil) != test.wantErr { + t.Errorf("newPodTracker() error = %v, wantErr %v", err, test.wantErr) + return + } + }) + } +} + +func Test_podTracker_getTrackedPod(t *testing.T) { + // setup types + logger := logrus.NewEntry(logrus.StandardLogger()) + + tests := []struct { + name string + trackedPod string // namespace/podName + obj interface{} + want *v1.Pod + }{ + { + name: "got-tracked-pod", + trackedPod: "test/github-octocat-1", + obj: _pod, + want: _pod, + }, + { + name: "wrong-pod", + trackedPod: "test/github-octocat-2", + obj: _pod, + want: nil, + }, + { + name: "invalid-type", + trackedPod: "test/github-octocat-1", + obj: new(v1.PodTemplate), + want: nil, + }, + { + name: "nil", + trackedPod: "test/nil", + obj: nil, + want: nil, + }, + { + name: "tombstone-pod", + trackedPod: "test/github-octocat-1", + obj: cache.DeletedFinalStateUnknown{ + Key: "test/github-octocat-1", + Obj: _pod, + }, + want: _pod, + }, + { + name: "tombstone-nil", + trackedPod: "test/github-octocat-1", + obj: cache.DeletedFinalStateUnknown{ + Key: "test/github-octocat-1", + Obj: nil, + }, + want: nil, + }, + { + name: "tombstone-invalid-type", + trackedPod: "test/github-octocat-1", + obj: cache.DeletedFinalStateUnknown{ + Key: "test/github-octocat-1", + Obj: new(v1.PodTemplate), + }, + want: nil, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + p := podTracker{ + Logger: logger, + TrackedPod: test.trackedPod, + // other fields not used by getTrackedPod + // if they're needed, use newPodTracker + } + if got := p.getTrackedPod(test.obj); !reflect.DeepEqual(got, test.want) { + t.Errorf("getTrackedPod() = %v, want %v", got, test.want) + } + }) + } +} + +func Test_podTracker_HandlePodAdd(t *testing.T) { + // setup types + logger := logrus.NewEntry(logrus.StandardLogger()) + + tests := []struct { + name string + trackedPod string // namespace/podName + obj interface{} + }{ + { + name: "got-tracked-pod", + trackedPod: "test/github-octocat-1", + obj: _pod, + }, + { + name: "wrong-pod", + trackedPod: "test/github-octocat-2", + obj: _pod, + }, + { + name: "invalid-type", + trackedPod: "test/github-octocat-1", + obj: new(v1.PodTemplate), + }, + { + name: "nil", + trackedPod: "test/nil", + obj: nil, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + p := &podTracker{ + Logger: logger, + TrackedPod: test.trackedPod, + // other fields not used by getTrackedPod + // if they're needed, use newPodTracker + } + + // just make sure this doesn't panic + p.HandlePodAdd(test.obj) + }) + } +} + +func Test_podTracker_HandlePodUpdate(t *testing.T) { + // setup types + logger := logrus.NewEntry(logrus.StandardLogger()) + + tests := []struct { + name string + trackedPod string // namespace/podName + oldObj interface{} + newObj interface{} + }{ + { + name: "re-sync event without change", + trackedPod: "test/github-octocat-1", + oldObj: _pod, + newObj: _pod, + }, + { + name: "wrong-pod", + trackedPod: "test/github-octocat-2", + oldObj: _pod, + newObj: _pod, + }, + { + name: "invalid-type-old", + trackedPod: "test/github-octocat-1", + oldObj: new(v1.PodTemplate), + newObj: _pod, + }, + { + name: "nil-old", + trackedPod: "test/github-octocat-1", + oldObj: nil, + newObj: _pod, + }, + { + name: "invalid-type-new", + trackedPod: "test/github-octocat-1", + oldObj: _pod, + newObj: new(v1.PodTemplate), + }, + { + name: "nil-new", + trackedPod: "test/github-octocat-1", + oldObj: _pod, + newObj: nil, + }, + { + name: "invalid-type-both", + trackedPod: "test/github-octocat-1", + oldObj: new(v1.PodTemplate), + newObj: new(v1.PodTemplate), + }, + { + name: "nil-both", + trackedPod: "test/nil", + oldObj: nil, + newObj: nil, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + p := &podTracker{ + Logger: logger, + TrackedPod: test.trackedPod, + // other fields not used by getTrackedPod + // if they're needed, use newPodTracker + } + + // just make sure this doesn't panic + p.HandlePodUpdate(test.oldObj, test.newObj) + }) + } +} + +func Test_podTracker_HandlePodDelete(t *testing.T) { + // setup types + logger := logrus.NewEntry(logrus.StandardLogger()) + + tests := []struct { + name string + trackedPod string // namespace/podName + obj interface{} + }{ + { + name: "got-tracked-pod", + trackedPod: "test/github-octocat-1", + obj: _pod, + }, + { + name: "wrong-pod", + trackedPod: "test/github-octocat-2", + obj: _pod, + }, + { + name: "invalid-type", + trackedPod: "test/github-octocat-1", + obj: new(v1.PodTemplate), + }, + { + name: "nil", + trackedPod: "test/nil", + obj: nil, + }, + { + name: "tombstone-pod", + trackedPod: "test/github-octocat-1", + obj: cache.DeletedFinalStateUnknown{ + Key: "test/github-octocat-1", + Obj: _pod, + }, + }, + { + name: "tombstone-nil", + trackedPod: "test/github-octocat-1", + obj: cache.DeletedFinalStateUnknown{ + Key: "test/github-octocat-1", + Obj: nil, + }, + }, + { + name: "tombstone-invalid-type", + trackedPod: "test/github-octocat-1", + obj: cache.DeletedFinalStateUnknown{ + Key: "test/github-octocat-1", + Obj: new(v1.PodTemplate), + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + p := &podTracker{ + Logger: logger, + TrackedPod: test.trackedPod, + // other fields not used by getTrackedPod + // if they're needed, use newPodTracker + } + + // just make sure this doesn't panic + p.HandlePodDelete(test.obj) + }) + } +} + +func Test_podTracker_Stop(t *testing.T) { + // setup types + logger := logrus.NewEntry(logrus.StandardLogger()) + clientset := fake.NewSimpleClientset() + + tests := []struct { + name string + pod *v1.Pod + started bool + }{ + { + name: "started", + pod: _pod, + started: true, + }, + { + name: "not started", + pod: _pod, + started: false, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + tracker, err := newPodTracker(logger, clientset, test.pod, 0*time.Second) + if err != nil { + t.Errorf("newPodTracker() error = %v", err) + return + } + + if test.started { + tracker.Start(context.Background()) + } + tracker.Stop() + }) + } +} diff --git a/runtime/kubernetes/volume.go b/runtime/kubernetes/volume.go index 8cf7cd2d..a786d193 100644 --- a/runtime/kubernetes/volume.go +++ b/runtime/kubernetes/volume.go @@ -122,6 +122,7 @@ func (c *client) RemoveVolume(ctx context.Context, b *pipeline.Build) error { // // https://pkg.go.dev/k8s.io/api/core/v1?tab=doc#PodSpec c.Pod.Spec.Volumes = []v1.Volume{} + c.commonVolumeMounts = []v1.VolumeMount{} return nil }