Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions cmd/vela-worker/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,12 @@ func (w *Worker) exec(index int) error {
}

// create a background context
ctx := context.Background()
buildCtx, done := context.WithCancel(context.Background())
defer done()

// add to the background context with a timeout
// built in for ensuring a build doesn't run forever
ctx, timeout := context.WithTimeout(ctx, t)
ctx, timeout := context.WithTimeout(buildCtx, t)
defer timeout()

defer func() {
Expand Down Expand Up @@ -128,6 +129,16 @@ func (w *Worker) exec(index int) error {
return nil
}

// log streaming uses buildCtx so that it is not subject to the timeout.
go func() {
logger.Info("streaming build logs")
// execute the build with the executor
err = _executor.StreamBuild(buildCtx)
if err != nil {
logger.Errorf("unable to stream build logs: %v", err)
}
}()

logger.Info("assembling build")
// assemble the build with the executor
err = _executor.AssembleBuild(ctx)
Expand Down
3 changes: 3 additions & 0 deletions executor/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type Engine interface {
// ExecBuild defines a function that
// runs a pipeline for a build.
ExecBuild(context.Context) error
// StreamBuild defines a function that receives a StreamRequest
// and then runs StreamService or StreamStep in a goroutine.
StreamBuild(context.Context) error
// DestroyBuild defines a function that
// cleans up the build after execution.
DestroyBuild(context.Context) error
Expand Down
26 changes: 18 additions & 8 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"

"github.com/gin-gonic/gin"
"github.com/google/go-cmp/cmp"

"github.com/go-vela/server/mock/server"

Expand Down Expand Up @@ -77,6 +78,7 @@ func TestExecutor_New(t *testing.T) {
failure bool
setup *Setup
want Engine
equal interface{}
}{
{
name: "driver-darwin",
Expand All @@ -91,7 +93,8 @@ func TestExecutor_New(t *testing.T) {
User: _user,
Version: "v1.0.0",
},
want: nil,
want: nil,
equal: reflect.DeepEqual,
},
{
name: "driver-linux",
Expand All @@ -108,7 +111,8 @@ func TestExecutor_New(t *testing.T) {
User: _user,
Version: "v1.0.0",
},
want: _linux,
want: _linux,
equal: linux.Equal,
},
{
name: "driver-local",
Expand All @@ -123,7 +127,8 @@ func TestExecutor_New(t *testing.T) {
User: _user,
Version: "v1.0.0",
},
want: _local,
want: _local,
equal: local.Equal,
},
{
name: "driver-windows",
Expand All @@ -138,7 +143,8 @@ func TestExecutor_New(t *testing.T) {
User: _user,
Version: "v1.0.0",
},
want: nil,
want: nil,
equal: reflect.DeepEqual,
},
{
name: "driver-invalid",
Expand All @@ -153,7 +159,8 @@ func TestExecutor_New(t *testing.T) {
User: _user,
Version: "v1.0.0",
},
want: nil,
want: nil,
equal: reflect.DeepEqual,
},
{
name: "driver-empty",
Expand All @@ -168,7 +175,8 @@ func TestExecutor_New(t *testing.T) {
User: _user,
Version: "v1.0.0",
},
want: nil,
want: nil,
equal: reflect.DeepEqual,
},
}

Expand All @@ -193,8 +201,10 @@ func TestExecutor_New(t *testing.T) {
t.Errorf("New returned err: %v", err)
}

if !reflect.DeepEqual(got, test.want) {
t.Errorf("New is %v, want %v", got, test.want)
// Comparing with reflect.DeepEqual(x, y interface) panics due to the
// unexported streamRequests channel.
if diff := cmp.Diff(test.want, got, cmp.Comparer(test.equal)); diff != "" {
t.Errorf("engine mismatch (-want +got):\n%v", diff)
}
})
}
Expand Down
44 changes: 44 additions & 0 deletions executor/linux/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,50 @@ func (c *client) ExecBuild(ctx context.Context) error {
return c.err
}

// StreamBuild receives a StreamRequest and then
// runs StreamService or StreamStep in a goroutine.
func (c *client) StreamBuild(ctx context.Context) error {
// create an error group with the parent context
//
// https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext
streams, streamCtx := errgroup.WithContext(ctx)

defer func() {
c.Logger.Trace("waiting for stream functions to return")

err := streams.Wait()
if err != nil {
c.Logger.Errorf("error in a stream request, %v", err)
}

c.Logger.Trace("all stream functions have returned")
}()

for {
select {
case req := <-c.streamRequests:
streams.Go(func() error {
// update engine logger with step metadata
//
// https://pkg.go.dev/github.com/sirupsen/logrus?tab=doc#Entry.WithField
logger := c.Logger.WithField(req.Key, req.Container.Name)

logger.Debugf("streaming %s container %s", req.Key, req.Container.ID)

err := req.Stream(streamCtx, req.Container)
if err != nil {
logger.Error(err)
}

return nil
})
case <-ctx.Done():
// build done or canceled
return nil
}
}
}

// DestroyBuild cleans up the build after execution.
func (c *client) DestroyBuild(ctx context.Context) error {
var err error
Expand Down
Loading