Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions client/go/internal/cli/cmd/prod.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"path/filepath"
"strconv"
"strings"
"time"

"github.com/fatih/color"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -110,6 +111,7 @@ type prodDeployOptions struct {
description string
authorEmail string
sourceURL string
waitSecs int
}

func newProdDeployCmd(cli *CLI) *cobra.Command {
Expand Down Expand Up @@ -170,10 +172,20 @@ $ vespa prod deploy`,
build, err := vespa.Submit(deployment, submission)
if err != nil {
return fmt.Errorf("could not deploy application: %w", err)
} else {
cli.printSuccess(fmt.Sprintf("Deployed '%s' with build number %s", color.CyanString(pkg.Path), color.CyanString(strconv.FormatInt(build, 10))))
log.Printf("See %s for deployment progress\n", color.CyanString(fmt.Sprintf("%s/tenant/%s/application/%s/prod/deployment",
deployment.Target.Deployment().System.ConsoleURL, deployment.Target.Deployment().Application.Tenant, deployment.Target.Deployment().Application.Application)))
}
cli.printSuccess(fmt.Sprintf("Deployed '%s' with build number %s", color.CyanString(pkg.Path), color.CyanString(strconv.FormatInt(build, 10))))
log.Printf("See %s for deployment progress\n", color.CyanString(fmt.Sprintf("%s/tenant/%s/application/%s/prod/deployment",
deployment.Target.Deployment().System.ConsoleURL, deployment.Target.Deployment().Application.Tenant, deployment.Target.Deployment().Application.Application)))
if options.waitSecs > 0 {
skipped, err := vespa.AwaitBuild(target, build, time.Duration(options.waitSecs)*time.Second, cli.Stderr)
if err != nil {
return err
}
if skipped {
cli.printSuccess(fmt.Sprintf("Build %d completed", build))
} else {
cli.printSuccess(fmt.Sprintf("Build %d deployed to production", build))
}
}
return nil
},
Expand All @@ -184,6 +196,7 @@ $ vespa prod deploy`,
cmd.Flags().StringVarP(&options.description, "description", "", "", "Description of the source code being deployed. For example a git commit message")
cmd.Flags().StringVarP(&options.authorEmail, "author-email", "", "", "Email of the author of the commit being deployed")
cmd.Flags().StringVarP(&options.sourceURL, "source-url", "", "", "URL which points to the source code being deployed. For example the build job running the submission")
cmd.Flags().IntVarP(&options.waitSecs, "wait", "", 0, "Seconds to wait for the build to complete before returning (0 to return immediately)")
return cmd
}

Expand Down
47 changes: 47 additions & 0 deletions client/go/internal/cli/cmd/prod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,53 @@ func TestProdDeployInvalidZip(t *testing.T) {
assert.Equal(t, "Error: found invalid path inside zip: ../../../../../../../tmp/foo\n", stderr.String())
}

func TestProdDeployWithWait(t *testing.T) {
pkgDir := filepath.Join(t.TempDir(), "app")
createApplication(t, pkgDir, false, false)

httpClient := &mock.HTTPClient{}
cli, stdout, stderr := newTestCLI(t, "CI=true")
cli.httpClient = httpClient
assert.Nil(t, cli.Run("config", "set", "application", "t1.a1.i1"))
assert.Nil(t, cli.Run("config", "set", "target", "cloud"))
assert.Nil(t, cli.Run("auth", "api-key"))
assert.Nil(t, cli.Run("auth", "cert", "--no-add"))
cli.Environment["VESPA_CLI_API_KEY_FILE"] = filepath.Join(cli.config.homeDir, "t1.api-key.pem")

if cwd, err := os.Getwd(); err != nil {
t.Fatal(err)
} else {
defer os.Chdir(cwd)
}
if err := os.Chdir(pkgDir); err != nil {
t.Fatal(err)
}

// Deployed successfully
stdout.Reset()
httpClient.NextResponseString(200, `{"build": 42}`)
httpClient.NextResponseString(200, `{"deployed": true, "jobs": []}`)
assert.Nil(t, cli.Run("prod", "deploy", "--wait", "5", "--add-cert"))
assert.Contains(t, stdout.String(), "Success: Deployed '.' with build number 42")
assert.Contains(t, stdout.String(), "Success: Build 42 deployed to production")

// Skipped due to no changes
stdout.Reset()
httpClient.NextResponseString(200, `{"build": 43}`)
httpClient.NextResponseString(200, `{"skipReason": "no changes detected"}`)
assert.Nil(t, cli.Run("prod", "deploy", "--wait", "5", "--add-cert"))
assert.Contains(t, stdout.String(), "Success: Build 43 completed")

// Job failure (runId=0 avoids goroutine log streaming so test is deterministic)
stdout.Reset()
stderr.Reset()
httpClient.NextResponseString(200, `{"build": 44}`)
httpClient.NextResponseString(200, `{"jobs": [{"jobName": "production-aws-us-east-1c", "runStatus": "failure", "runId": 0, "instance": "default"}]}`)
assert.NotNil(t, cli.Run("prod", "deploy", "--wait", "5", "--add-cert"))
assert.Contains(t, stderr.String(), "Deployment failed")
assert.Contains(t, stderr.String(), "production-aws-us-east-1c")
}

func TestProdDeployWarnsOnInstance(t *testing.T) {
pkgDir := filepath.Join(t.TempDir(), "app")
createApplication(t, pkgDir, false, false)
Expand Down
6 changes: 6 additions & 0 deletions client/go/internal/vespa/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ func (s System) SubmitURL(deployment Deployment) string {
return fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/submit", s.URL, deployment.Application.Tenant, deployment.Application.Application)
}

// BuildStatusURL returns the API URL for the build status of a submitted production build.
func (s System) BuildStatusURL(deployment Deployment, build int64) string {
return fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/build-status/%d",
s.URL, deployment.Application.Tenant, deployment.Application.Application, build)
}

// DeploymentURL returns the API URL of given deployment.
func (s System) DeploymentURL(deployment Deployment) string {
return fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/instance/%s/environment/%s/region/%s",
Expand Down
164 changes: 164 additions & 0 deletions client/go/internal/vespa/target_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package vespa

import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/vespa-engine/vespa/client/go/internal/httputil"
Expand Down Expand Up @@ -90,6 +93,20 @@ type logMessage struct {
Message string `json:"message"`
}

type buildStatusResponse struct {
Deployed bool `json:"deployed"`
Status string `json:"status"`
SkipReason string `json:"skipReason,omitempty"`
Jobs []buildStatusJob `json:"jobs"`
}

type buildStatusJob struct {
JobName string `json:"jobName"`
RunStatus string `json:"runStatus"`
RunID int64 `json:"runId"`
Instance string `json:"instance"`
}

// CloudTarget creates a Target for the Vespa Cloud or hosted Vespa platform.
func CloudTarget(httpClient httputil.Client, apiAuth Authenticator, deploymentAuth Authenticator,
apiOptions APIOptions, deploymentOptions CloudDeploymentOptions,
Expand Down Expand Up @@ -359,6 +376,153 @@ func (t *cloudTarget) printLog(response runResponse, last int64, muteStep string
return response.LastID
}

func printBuildJobLog(response runResponse, last int64, writer io.Writer, prefix string) int64 {
if response.LastID == 0 {
return last
}
var msgs []logMessage
for _, stepMsgs := range response.Log {
for _, msg := range stepMsgs {
if LogLevel(msg.Type) == 3 { // skip debug
continue
}
msgs = append(msgs, msg)
}
}
sort.Slice(msgs, func(i, j int) bool { return msgs[i].At < msgs[j].At })
for _, msg := range msgs {
tm := time.UnixMilli(msg.At)
fmt.Fprintf(writer, "%s[%s] %-7s %s\n", prefix, tm.Format("15:04:05"), msg.Type, msg.Message)
}
return response.LastID
}

type syncWriter struct {
mu sync.Mutex
w io.Writer
}

func (s *syncWriter) Write(p []byte) (n int, err error) {
s.mu.Lock()
defer s.mu.Unlock()
return s.w.Write(p)
}

func streamBuildJobLogs(target Target, job buildStatusJob, timeout time.Duration, writer io.Writer, retryInterval time.Duration) error {
d := target.Deployment()
runURL := fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/instance/%s/job/%s/run/%d",
d.System.URL, d.Application.Tenant, d.Application.Application,
job.Instance, job.JobName, job.RunID)
req, err := http.NewRequest("GET", runURL, nil)
if err != nil {
return err
}
prefix := fmt.Sprintf("[%s] ", job.JobName)
lastID := int64(-1)
logFunc := func(status int, response []byte) (bool, error) {
if ok, err := isOK(status); !ok {
return ok, err
}
var resp runResponse
if err := json.Unmarshal(response, &resp); err != nil {
return false, err
}
lastID = printBuildJobLog(resp, lastID, writer, prefix)
if resp.Active {
return false, nil
}
if resp.Status != "success" {
return false, fmt.Errorf("%w: %s failed with status %s", ErrDeployment, job.JobName, resp.Status)
}
return true, nil
}
requestFunc := func() *http.Request {
q := req.URL.Query()
q.Set("after", strconv.FormatInt(lastID, 10))
req.URL.RawQuery = q.Encode()
return req
}
_, err = deployRequest(target, logFunc, requestFunc, timeout, retryInterval)
return err
}

// AwaitBuild waits for a production build to deploy by polling the build-status endpoint
// and streaming per-job run logs. Returns skipped=true if the build was skipped due to no
// changes. logWriter may be nil to suppress log output.
func AwaitBuild(target Target, buildID int64, timeout time.Duration, logWriter io.Writer) (skipped bool, _ error) {
if !target.IsCloud() {
return false, fmt.Errorf("AwaitBuild is only supported for cloud targets")
}
d := target.Deployment()
buildStatusURL := d.System.BuildStatusURL(d, buildID)
req, err := http.NewRequest("GET", buildStatusURL, nil)
if err != nil {
return false, err
}
var sw io.Writer
{
sw = &syncWriter{w: logWriter}
}
var (
mutex sync.Mutex
waitGroup sync.WaitGroup
trackedJobs = make(map[string]bool)
jobErrors []error
isSkipped bool
)
retryInterval := 2 * time.Second
statusFunc := func(status int, response []byte) (bool, error) {
if ok, err := isOK(status); !ok {
return ok, err
}
var resp buildStatusResponse
if err := json.Unmarshal(response, &resp); err != nil {
return false, err
}
if resp.SkipReason != "" {
isSkipped = true
return true, nil
}
if resp.Status == "cancelled" {
return false, fmt.Errorf("%w: build %d was cancelled", ErrDeployment, buildID)
}
for _, job := range resp.Jobs {
if job.RunStatus == "failure" || job.RunStatus == "error" || job.RunStatus == "aborted" {
return false, fmt.Errorf("%w: %s failed with status %s", ErrDeployment, job.JobName, job.RunStatus)
}
if sw != nil && job.RunID > 0 {
mutex.Lock()
if !trackedJobs[job.JobName] {
trackedJobs[job.JobName] = true
mutex.Unlock()
waitGroup.Go(func() {
if err := streamBuildJobLogs(target, job, timeout, sw, retryInterval); err != nil && !errors.Is(err, ErrWaitTimeout) {
mutex.Lock()
jobErrors = append(jobErrors, err)
mutex.Unlock()
}
})
} else {
mutex.Unlock()
}
}
}
return resp.Deployed, nil
}
_, mainErr := deployRequest(target, statusFunc, func() *http.Request { return req }, timeout, retryInterval)
if mainErr != nil {
return false, mainErr
}
waitGroup.Wait()
mutex.Lock()
errs := jobErrors
mutex.Unlock()
if len(errs) > 0 {
return false, errs[0]
}
return isSkipped, nil
}

func (t *cloudTarget) discoverPrivateServices(timeout time.Duration) (map[string]*PrivateServiceInfo, error) {
deploymentURL := t.apiOptions.System.PrivateServicesURL(t.deploymentOptions.Deployment)
req, err := http.NewRequest("GET", deploymentURL.String(), nil)
Expand Down
Loading
Loading