Skip to content

Commit c55798b

Browse files
authored
Merge branch 'master' into k8s-pause-running
2 parents 17b5ad0 + c0823e8 commit c55798b

31 files changed

+1068
-89
lines changed

cmd/vela-worker/exec.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,12 @@ func (w *Worker) exec(index int) error {
9494
}
9595

9696
// create a background context
97-
ctx := context.Background()
97+
buildCtx, done := context.WithCancel(context.Background())
98+
defer done()
9899

99100
// add to the background context with a timeout
100101
// built in for ensuring a build doesn't run forever
101-
ctx, timeout := context.WithTimeout(ctx, t)
102+
ctx, timeout := context.WithTimeout(buildCtx, t)
102103
defer timeout()
103104

104105
defer func() {
@@ -128,6 +129,16 @@ func (w *Worker) exec(index int) error {
128129
return nil
129130
}
130131

132+
// log streaming uses buildCtx so that it is not subject to the timeout.
133+
go func() {
134+
logger.Info("streaming build logs")
135+
// execute the build with the executor
136+
err = _executor.StreamBuild(buildCtx)
137+
if err != nil {
138+
logger.Errorf("unable to stream build logs: %v", err)
139+
}
140+
}()
141+
131142
logger.Info("assembling build")
132143
// assemble the build with the executor
133144
err = _executor.AssembleBuild(ctx)

executor/engine.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ type Engine interface {
5353
// ExecBuild defines a function that
5454
// runs a pipeline for a build.
5555
ExecBuild(context.Context) error
56+
// StreamBuild defines a function that receives a StreamRequest
57+
// and then runs StreamService or StreamStep in a goroutine.
58+
StreamBuild(context.Context) error
5659
// DestroyBuild defines a function that
5760
// cleans up the build after execution.
5861
DestroyBuild(context.Context) error

executor/executor_test.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"testing"
1111

1212
"github.com/gin-gonic/gin"
13+
"github.com/google/go-cmp/cmp"
1314

1415
"github.com/go-vela/server/mock/server"
1516

@@ -77,6 +78,7 @@ func TestExecutor_New(t *testing.T) {
7778
failure bool
7879
setup *Setup
7980
want Engine
81+
equal interface{}
8082
}{
8183
{
8284
name: "driver-darwin",
@@ -91,7 +93,8 @@ func TestExecutor_New(t *testing.T) {
9193
User: _user,
9294
Version: "v1.0.0",
9395
},
94-
want: nil,
96+
want: nil,
97+
equal: reflect.DeepEqual,
9598
},
9699
{
97100
name: "driver-linux",
@@ -108,7 +111,8 @@ func TestExecutor_New(t *testing.T) {
108111
User: _user,
109112
Version: "v1.0.0",
110113
},
111-
want: _linux,
114+
want: _linux,
115+
equal: linux.Equal,
112116
},
113117
{
114118
name: "driver-local",
@@ -123,7 +127,8 @@ func TestExecutor_New(t *testing.T) {
123127
User: _user,
124128
Version: "v1.0.0",
125129
},
126-
want: _local,
130+
want: _local,
131+
equal: local.Equal,
127132
},
128133
{
129134
name: "driver-windows",
@@ -138,7 +143,8 @@ func TestExecutor_New(t *testing.T) {
138143
User: _user,
139144
Version: "v1.0.0",
140145
},
141-
want: nil,
146+
want: nil,
147+
equal: reflect.DeepEqual,
142148
},
143149
{
144150
name: "driver-invalid",
@@ -153,7 +159,8 @@ func TestExecutor_New(t *testing.T) {
153159
User: _user,
154160
Version: "v1.0.0",
155161
},
156-
want: nil,
162+
want: nil,
163+
equal: reflect.DeepEqual,
157164
},
158165
{
159166
name: "driver-empty",
@@ -168,7 +175,8 @@ func TestExecutor_New(t *testing.T) {
168175
User: _user,
169176
Version: "v1.0.0",
170177
},
171-
want: nil,
178+
want: nil,
179+
equal: reflect.DeepEqual,
172180
},
173181
}
174182

@@ -193,8 +201,10 @@ func TestExecutor_New(t *testing.T) {
193201
t.Errorf("New returned err: %v", err)
194202
}
195203

196-
if !reflect.DeepEqual(got, test.want) {
197-
t.Errorf("New is %v, want %v", got, test.want)
204+
// Comparing with reflect.DeepEqual(x, y interface) panics due to the
205+
// unexported streamRequests channel.
206+
if diff := cmp.Diff(test.want, got, cmp.Comparer(test.equal)); diff != "" {
207+
t.Errorf("engine mismatch (-want +got):\n%v", diff)
198208
}
199209
})
200210
}

executor/linux/build.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,50 @@ func (c *client) ExecBuild(ctx context.Context) error {
491491
return c.err
492492
}
493493

494+
// StreamBuild receives a StreamRequest and then
495+
// runs StreamService or StreamStep in a goroutine.
496+
func (c *client) StreamBuild(ctx context.Context) error {
497+
// create an error group with the parent context
498+
//
499+
// https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext
500+
streams, streamCtx := errgroup.WithContext(ctx)
501+
502+
defer func() {
503+
c.Logger.Trace("waiting for stream functions to return")
504+
505+
err := streams.Wait()
506+
if err != nil {
507+
c.Logger.Errorf("error in a stream request, %v", err)
508+
}
509+
510+
c.Logger.Info("all stream functions have returned")
511+
}()
512+
513+
for {
514+
select {
515+
case req := <-c.streamRequests:
516+
streams.Go(func() error {
517+
// update engine logger with step metadata
518+
//
519+
// https://pkg.go.dev/github.com/sirupsen/logrus?tab=doc#Entry.WithField
520+
logger := c.Logger.WithField(req.Key, req.Container.Name)
521+
522+
logger.Debugf("streaming %s container %s", req.Key, req.Container.ID)
523+
524+
err := req.Stream(streamCtx, req.Container)
525+
if err != nil {
526+
logger.Error(err)
527+
}
528+
529+
return nil
530+
})
531+
case <-ctx.Done():
532+
// build done or canceled
533+
return nil
534+
}
535+
}
536+
}
537+
494538
// DestroyBuild cleans up the build after execution.
495539
func (c *client) DestroyBuild(ctx context.Context) error {
496540
var err error

0 commit comments

Comments
 (0)