Skip to content
27 changes: 14 additions & 13 deletions cmd/vela-worker/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,20 @@ func (w *Worker) exec(index int) error {
//
// https://godoc.org/github.com/go-vela/worker/executor#New
_executor, err := executor.New(&executor.Setup{
Logger: logger,
Mock: w.Config.Mock,
Driver: w.Config.Executor.Driver,
LogMethod: w.Config.Executor.LogMethod,
MaxLogSize: w.Config.Executor.MaxLogSize,
Client: w.VelaClient,
Hostname: w.Config.API.Address.Hostname(),
Runtime: w.Runtime,
Build: item.Build,
Pipeline: item.Pipeline.Sanitize(w.Config.Runtime.Driver),
Repo: item.Repo,
User: item.User,
Version: v.Semantic(),
Logger: logger,
Mock: w.Config.Mock,
Driver: w.Config.Executor.Driver,
LogMethod: w.Config.Executor.LogMethod,
MaxLogSize: w.Config.Executor.MaxLogSize,
LogStreamingTimeout: w.Config.Executor.LogStreamingTimeout,
Client: w.VelaClient,
Hostname: w.Config.API.Address.Hostname(),
Runtime: w.Runtime,
Build: item.Build,
Pipeline: item.Pipeline.Sanitize(w.Config.Runtime.Driver),
Repo: item.Repo,
User: item.User,
Version: v.Semantic(),
})

// add the executor to the worker
Expand Down
7 changes: 4 additions & 3 deletions cmd/vela-worker/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,10 @@ func run(c *cli.Context) error {
CheckIn: c.Duration("checkIn"),
// executor configuration
Executor: &executor.Setup{
Driver: c.String("executor.driver"),
LogMethod: c.String("executor.log_method"),
MaxLogSize: c.Uint("executor.max_log_size"),
Driver: c.String("executor.driver"),
LogMethod: c.String("executor.log_method"),
MaxLogSize: c.Uint("executor.max_log_size"),
LogStreamingTimeout: c.Duration("executor.log_streaming_timeout"),
},
// logger configuration
Logger: &Logger{
Expand Down
8 changes: 8 additions & 0 deletions executor/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package executor

import (
"time"

"github.com/go-vela/types/constants"

"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -37,4 +39,10 @@ var Flags = []cli.Flag{
Name: "executor.max_log_size",
Usage: "maximum log size (in bytes)",
},
&cli.DurationFlag{
EnvVars: []string{"WORKER_LOG_STREAMING_TIMEOUT", "VELA_LOG_STREAMING_TIMEOUT", "LOG_STREAMING_TIMEOUT"},
Name: "executor.log_streaming_timeout",
Usage: "maximum amount of time to wait for log streaming after build completes",
Value: 5 * time.Minute,
},
}
12 changes: 10 additions & 2 deletions executor/linux/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/go-vela/types/constants"
"github.com/go-vela/worker/internal/build"
context2 "github.com/go-vela/worker/internal/context"
"github.com/go-vela/worker/internal/step"
)

Expand Down Expand Up @@ -496,10 +497,15 @@ func (c *client) ExecBuild(ctx context.Context) error {
// StreamBuild receives a StreamRequest and then
// runs StreamService or StreamStep in a goroutine.
func (c *client) StreamBuild(ctx context.Context) error {
// cancel streaming after a timeout once the build has finished
delayedCtx, cancelStreaming := context2.
WithDelayedCancelPropagation(ctx, c.logStreamingTimeout, "streaming", c.Logger)
defer cancelStreaming()

// create an error group with the parent context
//
// https://pkg.go.dev/golang.org/x/sync/errgroup?tab=doc#WithContext
streams, streamCtx := errgroup.WithContext(ctx)
streams, streamCtx := errgroup.WithContext(delayedCtx)

defer func() {
c.Logger.Trace("waiting for stream functions to return")
Expand All @@ -509,6 +515,8 @@ func (c *client) StreamBuild(ctx context.Context) error {
c.Logger.Errorf("error in a stream request, %v", err)
}

cancelStreaming()

c.Logger.Info("all stream functions have returned")
}()

Expand Down Expand Up @@ -537,7 +545,7 @@ func (c *client) StreamBuild(ctx context.Context) error {

return nil
})
case <-ctx.Done():
case <-delayedCtx.Done():
c.Logger.Debug("streaming context canceled")
// build done or canceled
return nil
Expand Down
1 change: 1 addition & 0 deletions executor/linux/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,7 @@ func TestLinux_StreamBuild(t *testing.T) {
WithPipeline(_pipeline),
WithRepo(_repo),
WithRuntime(_runtime),
WithLogStreamingTimeout(1*time.Second),
WithUser(_user),
WithVelaClient(_client),
withStreamRequests(streamRequests),
Expand Down
24 changes: 13 additions & 11 deletions executor/linux/linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package linux
import (
"reflect"
"sync"
"time"

"github.com/go-vela/sdk-go/vela"
"github.com/go-vela/types/library"
Expand All @@ -31,17 +32,18 @@ type (
secret *secretSvc

// private fields
init *pipeline.Container
logMethod string
maxLogSize uint
build *library.Build
pipeline *pipeline.Build
repo *library.Repo
secrets sync.Map
services sync.Map
serviceLogs sync.Map
steps sync.Map
stepLogs sync.Map
init *pipeline.Container
logMethod string
maxLogSize uint
logStreamingTimeout time.Duration
build *library.Build
pipeline *pipeline.Build
repo *library.Repo
secrets sync.Map
services sync.Map
serviceLogs sync.Map
steps sync.Map
stepLogs sync.Map

streamRequests chan message.StreamRequest

Expand Down
13 changes: 13 additions & 0 deletions executor/linux/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package linux

import (
"fmt"
"time"

"github.com/go-vela/sdk-go/vela"
"github.com/go-vela/types/library"
Expand Down Expand Up @@ -64,6 +65,18 @@ func WithMaxLogSize(size uint) Opt {
}
}

// WithLogStreamingTimeout sets the log streaming timeout in the executor client for Linux.
func WithLogStreamingTimeout(timeout time.Duration) Opt {
return func(c *client) error {
c.Logger.Trace("configuring log streaming timeout in linux executor client")

// set the maximum log size in the client
c.logStreamingTimeout = timeout

return nil
}
}

// WithHostname sets the hostname in the executor client for Linux.
func WithHostname(hostname string) Opt {
return func(c *client) error {
Expand Down
41 changes: 41 additions & 0 deletions executor/linux/opts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http/httptest"
"reflect"
"testing"
"time"

"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -161,6 +162,46 @@ func TestLinux_Opt_WithMaxLogSize(t *testing.T) {
}
}

func TestLinux_Opt_WithLogStreamingTimeout(t *testing.T) {
// setup tests
tests := []struct {
name string
failure bool
logStreamingTimeout time.Duration
}{
{
name: "defined",
failure: false,
logStreamingTimeout: 1 * time.Second,
},
}

// run tests
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_engine, err := New(
WithLogStreamingTimeout(test.logStreamingTimeout),
)

if test.failure {
if err == nil {
t.Errorf("WithLogStreamingTimeout should have returned err")
}

return // continue to next test
}

if err != nil {
t.Errorf("WithLogStreamingTimeout returned err: %v", err)
}

if !reflect.DeepEqual(_engine.logStreamingTimeout, test.logStreamingTimeout) {
t.Errorf("WithLogStreamingTimeout is %v, want %v", _engine.logStreamingTimeout, test.logStreamingTimeout)
}
})
}
}

func TestLinux_Opt_WithHostname(t *testing.T) {
// setup tests
tests := []struct {
Expand Down
5 changes: 5 additions & 0 deletions executor/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package executor
import (
"fmt"
"strings"
"time"

"github.com/go-vela/sdk-go/vela"

Expand Down Expand Up @@ -40,6 +41,9 @@ type Setup struct {
LogMethod string
// specifies the maximum log size
MaxLogSize uint
// specifies how long to wait after the build finishes
// for log streaming to complete
LogStreamingTimeout time.Duration
// specifies the executor hostname
Hostname string
// specifies the executor version
Expand Down Expand Up @@ -81,6 +85,7 @@ func (s *Setup) Linux() (Engine, error) {
linux.WithBuild(s.Build),
linux.WithLogMethod(s.LogMethod),
linux.WithMaxLogSize(s.MaxLogSize),
linux.WithLogStreamingTimeout(s.LogStreamingTimeout),
linux.WithHostname(s.Hostname),
linux.WithPipeline(s.Pipeline),
linux.WithRepo(s.Repo),
Expand Down
2 changes: 2 additions & 0 deletions executor/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package executor
import (
"net/http/httptest"
"testing"
"time"

"github.com/gin-gonic/gin"
"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -79,6 +80,7 @@ func TestExecutor_Setup_Linux(t *testing.T) {
linux.WithBuild(_build),
linux.WithLogMethod("byte-chunks"),
linux.WithMaxLogSize(2097152),
linux.WithLogStreamingTimeout(1*time.Second),
linux.WithHostname("localhost"),
linux.WithPipeline(_pipeline),
linux.WithRepo(_repo),
Expand Down
49 changes: 49 additions & 0 deletions internal/context/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) 2022 Target Brands, Inc. All rights reserved.
//
// Use of this source code is governed by the LICENSE file in this repository.

package context

import (
"context"
"time"

"github.com/sirupsen/logrus"
)

func WithDelayedCancelPropagation(parent context.Context, timeout time.Duration, name string, logger *logrus.Entry) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())

go func() {
var timer *time.Timer

// start the timer once the parent context is canceled
select {
case <-parent.Done():
logger.Tracef("parent context is done, starting %s timer for %s", name, timeout)
timer = time.NewTimer(timeout)

break
case <-ctx.Done():
logger.Tracef("%s finished before the parent context", name)

return
}

// wait for the timer to elapse or the context to naturally finish.
select {
case <-timer.C:
logger.Tracef("%s timed out, propagating cancel to %s context", name, name)
cancel()

return
case <-ctx.Done():
logger.Tracef("%s finished, stopping timeout timer", name)
timer.Stop()

return
}
}()

return ctx, cancel
}
Loading