Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/vela-worker/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (w *Worker) exec(index int) error {
return nil
}

// log streaming uses buildCtx so that it is not subject to the timeout.
// log/event streaming uses buildCtx so that it is not subject to the timeout.
go func() {
logger.Info("streaming build logs")
// execute the build with the executor
Expand Down
7 changes: 7 additions & 0 deletions executor/linux/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,13 @@ func (c *client) StreamBuild(ctx context.Context) error {
c.Logger.Info("all stream functions have returned")
}()

// allow the runtime to do log/event streaming setup at build-level
streams.Go(func() error {
// If needed, the runtime should handle synchronizing with
// AssembleBuild which runs concurrently with StreamBuild.
return c.Runtime.StreamBuild(streamCtx, c.pipeline)
})

for {
select {
case req := <-c.streamRequests:
Expand Down
7 changes: 7 additions & 0 deletions executor/local/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,13 @@ func (c *client) StreamBuild(ctx context.Context) error {
fmt.Fprintln(os.Stdout, "all stream functions have returned")
}()

// allow the runtime to do log/event streaming setup at build-level
streams.Go(func() error {
// If needed, the runtime should handle synchronizing with
// AssembleBuild which runs concurrently with StreamBuild.
return c.Runtime.StreamBuild(streamCtx, c.pipeline)
})

for {
select {
case req := <-c.streamRequests:
Expand Down
8 changes: 8 additions & 0 deletions runtime/docker/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ func (c *client) SetupBuild(ctx context.Context, b *pipeline.Build) error {
return nil
}

// StreamBuild initializes log/event streaming for build.
// This is a no-op for docker.
func (c *client) StreamBuild(ctx context.Context, b *pipeline.Build) error {
c.Logger.Tracef("no-op: streaming build %s", b.ID)

return nil
}

// AssembleBuild finalizes pipeline build setup.
// This is a no-op for docker.
func (c *client) AssembleBuild(ctx context.Context, b *pipeline.Build) error {
Expand Down
35 changes: 35 additions & 0 deletions runtime/docker/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,41 @@ func TestDocker_SetupBuild(t *testing.T) {
}
}

func TestKubernetes_StreamBuild(t *testing.T) {
tests := []struct {
name string
failure bool
pipeline *pipeline.Build
}{
{
name: "steps",
failure: false,
pipeline: _pipeline,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_engine, err := NewMock()
if err != nil {
t.Errorf("unable to create runtime engine: %v", err)
}

err = _engine.StreamBuild(context.Background(), test.pipeline)

if test.failure {
if err == nil {
t.Errorf("StreamBuild should have returned err")
}

return // continue to next test
}

if err != nil {
t.Errorf("StreamBuild returned err: %v", err)
}
})
}
}
func TestDocker_AssembleBuild(t *testing.T) {
// setup tests
tests := []struct {
Expand Down
4 changes: 4 additions & 0 deletions runtime/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ type Engine interface {
// SetupBuild defines a function that
// prepares the pipeline build.
SetupBuild(context.Context, *pipeline.Build) error
// StreamBuild defines a function that initializes
// log/event streaming if the runtime needs it.
// StreamBuild and AssembleBuild run concurrently.
StreamBuild(context.Context, *pipeline.Build) error
// AssembleBuild defines a function that
// finalizes pipeline build setup.
AssembleBuild(context.Context, *pipeline.Build) error
Expand Down
50 changes: 50 additions & 0 deletions runtime/kubernetes/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ package kubernetes
import (
"context"
"fmt"
"time"

"github.com/go-vela/types/pipeline"
"github.com/go-vela/worker/runtime/kubernetes/apis/vela/v1alpha1"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"

// The k8s libraries have some quirks around yaml marshaling (see opts.go).
// So, just use the same library for all kubernetes-related YAML.
Expand Down Expand Up @@ -79,6 +81,7 @@ func (c *client) SetupBuild(ctx context.Context, b *pipeline.Build) error {
// https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1?tab=doc#ObjectMeta
c.Pod.ObjectMeta = metav1.ObjectMeta{
Name: b.ID,
Namespace: c.config.Namespace, // this is used by the podTracker
Labels: labels,
Annotations: c.PipelinePodTemplate.Metadata.Annotations,
}
Expand Down Expand Up @@ -124,6 +127,33 @@ func (c *client) SetupBuild(ctx context.Context, b *pipeline.Build) error {
}
}

// initialize the PodTracker now that we have a Pod for it to track
tracker, err := newPodTracker(c.Logger, c.Kubernetes, c.Pod, time.Second*30)
if err != nil {
return err
}

c.PodTracker = tracker

return nil
}

// StreamBuild initializes log/event streaming for build.
func (c *client) StreamBuild(ctx context.Context, b *pipeline.Build) error {
c.Logger.Tracef("streaming build %s", b.ID)

select {
case <-ctx.Done():
// bail out, as build timed out or was canceled.
return nil
case <-c.PodTracker.Ready:
// AssembleBuild signaled that the PodTracker is ready.
break
}

// Populate the PodTracker caches before creating the pipeline pod
c.PodTracker.Start(ctx)

return nil
}

Expand Down Expand Up @@ -182,6 +212,17 @@ func (c *client) AssembleBuild(ctx context.Context, b *pipeline.Build) error {
}
}

// setup containerTeachers now that all containers are defined.
c.PodTracker.TrackContainers(c.Pod.Spec.Containers)

// send signal to StreamBuild now that PodTracker is ready to be started.
close(c.PodTracker.Ready)

// wait for the PodTracker caches to populate before creating the pipeline pod.
if ok := cache.WaitForCacheSync(ctx.Done(), c.PodTracker.PodSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}

// If the api call to create the pod fails, the pod might
// partially exist. So, set this first to make sure all
// remnants get deleted.
Expand All @@ -206,6 +247,15 @@ func (c *client) AssembleBuild(ctx context.Context, b *pipeline.Build) error {
func (c *client) RemoveBuild(ctx context.Context, b *pipeline.Build) error {
c.Logger.Tracef("removing build %s", b.ID)

// PodTracker gets created in SetupBuild before pod is created
defer func() {
// check for nil as RemoveBuild may get called multiple times
if c.PodTracker != nil {
c.PodTracker.Stop()
c.PodTracker = nil
}
}()

if !c.createdPod {
// nothing to do
return nil
Expand Down
91 changes: 88 additions & 3 deletions runtime/kubernetes/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,82 @@ func TestKubernetes_SetupBuild(t *testing.T) {
}
}

func TestKubernetes_StreamBuild(t *testing.T) {
tests := []struct {
name string
failure bool
doCancel bool
doReady bool
pipeline *pipeline.Build
pod *v1.Pod
}{
{
name: "stages canceled",
failure: false,
doCancel: true,
pipeline: _stages,
pod: _stagesPod,
},
{
name: "steps canceled",
failure: false,
doCancel: true,
pipeline: _steps,
pod: _pod,
},
{
name: "stages ready",
failure: false,
doReady: true,
pipeline: _stages,
pod: _stagesPod,
},
{
name: "steps ready",
failure: false,
doReady: true,
pipeline: _steps,
pod: _pod,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_engine, err := NewMock(test.pod)
if err != nil {
t.Errorf("unable to create runtime engine: %v", err)
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// StreamBuild and AssembleBuild coordinate their work.
go func() {
if test.doCancel {
// simulate canceled build
cancel()
} else if test.doReady {
// simulate AssembleBuild
close(_engine.PodTracker.Ready)
}
}()

err = _engine.StreamBuild(ctx, test.pipeline)

if test.failure {
if err == nil {
t.Errorf("StreamBuild should have returned err")
}

return // continue to next test
}

if err != nil {
t.Errorf("StreamBuild returned err: %v", err)
}
})
}
}

func TestKubernetes_AssembleBuild(t *testing.T) {
// setup tests
tests := []struct {
Expand Down Expand Up @@ -421,16 +497,25 @@ func TestKubernetes_AssembleBuild(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_engine, err := NewMock(test.k8sPod)
if err != nil {
t.Errorf("unable to create runtime engine: %v", err)
}

_engine.Pod = test.enginePod

_engine.containersLookup = map[string]int{}
for i, ctn := range test.enginePod.Spec.Containers {
_engine.containersLookup[ctn.Name] = i
}

if err != nil {
t.Errorf("unable to create runtime engine: %v", err)
}
// StreamBuild and AssembleBuild coordinate their work, so, emulate
// executor.StreamBuild which calls runtime.StreamBuild concurrently.
go func() {
err := _engine.StreamBuild(context.Background(), test.pipeline)
if err != nil {
t.Errorf("unable to start PodTracker via StreamBuild")
}
}()

err = _engine.AssembleBuild(context.Background(), test.pipeline)

Expand Down
Loading