diff --git a/client/go/internal/cli/cmd/prod.go b/client/go/internal/cli/cmd/prod.go index 5373cbf5f498..c585515dbcb3 100644 --- a/client/go/internal/cli/cmd/prod.go +++ b/client/go/internal/cli/cmd/prod.go @@ -12,6 +12,7 @@ import ( "path/filepath" "strconv" "strings" + "time" "github.com/fatih/color" "github.com/spf13/cobra" @@ -110,6 +111,7 @@ type prodDeployOptions struct { description string authorEmail string sourceURL string + waitSecs int } func newProdDeployCmd(cli *CLI) *cobra.Command { @@ -170,10 +172,20 @@ $ vespa prod deploy`, build, err := vespa.Submit(deployment, submission) if err != nil { return fmt.Errorf("could not deploy application: %w", err) - } else { - cli.printSuccess(fmt.Sprintf("Deployed '%s' with build number %s", color.CyanString(pkg.Path), color.CyanString(strconv.FormatInt(build, 10)))) - log.Printf("See %s for deployment progress\n", color.CyanString(fmt.Sprintf("%s/tenant/%s/application/%s/prod/deployment", - deployment.Target.Deployment().System.ConsoleURL, deployment.Target.Deployment().Application.Tenant, deployment.Target.Deployment().Application.Application))) + } + cli.printSuccess(fmt.Sprintf("Deployed '%s' with build number %s", color.CyanString(pkg.Path), color.CyanString(strconv.FormatInt(build, 10)))) + log.Printf("See %s for deployment progress\n", color.CyanString(fmt.Sprintf("%s/tenant/%s/application/%s/prod/deployment", + deployment.Target.Deployment().System.ConsoleURL, deployment.Target.Deployment().Application.Tenant, deployment.Target.Deployment().Application.Application))) + if options.waitSecs > 0 { + skipped, err := vespa.AwaitBuild(target, build, time.Duration(options.waitSecs)*time.Second, cli.Stderr) + if err != nil { + return err + } + if skipped { + cli.printSuccess(fmt.Sprintf("Build %d completed", build)) + } else { + cli.printSuccess(fmt.Sprintf("Build %d deployed to production", build)) + } } return nil }, @@ -184,6 +196,7 @@ $ vespa prod deploy`, cmd.Flags().StringVarP(&options.description, "description", "", "", "Description of the source code being deployed. For example a git commit message") cmd.Flags().StringVarP(&options.authorEmail, "author-email", "", "", "Email of the author of the commit being deployed") cmd.Flags().StringVarP(&options.sourceURL, "source-url", "", "", "URL which points to the source code being deployed. For example the build job running the submission") + cmd.Flags().IntVarP(&options.waitSecs, "wait", "", 0, "Seconds to wait for the build to complete before returning (0 to return immediately)") return cmd } diff --git a/client/go/internal/cli/cmd/prod_test.go b/client/go/internal/cli/cmd/prod_test.go index 075f8533228d..eeef08ffd449 100644 --- a/client/go/internal/cli/cmd/prod_test.go +++ b/client/go/internal/cli/cmd/prod_test.go @@ -263,6 +263,53 @@ func TestProdDeployInvalidZip(t *testing.T) { assert.Equal(t, "Error: found invalid path inside zip: ../../../../../../../tmp/foo\n", stderr.String()) } +func TestProdDeployWithWait(t *testing.T) { + pkgDir := filepath.Join(t.TempDir(), "app") + createApplication(t, pkgDir, false, false) + + httpClient := &mock.HTTPClient{} + cli, stdout, stderr := newTestCLI(t, "CI=true") + cli.httpClient = httpClient + assert.Nil(t, cli.Run("config", "set", "application", "t1.a1.i1")) + assert.Nil(t, cli.Run("config", "set", "target", "cloud")) + assert.Nil(t, cli.Run("auth", "api-key")) + assert.Nil(t, cli.Run("auth", "cert", "--no-add")) + cli.Environment["VESPA_CLI_API_KEY_FILE"] = filepath.Join(cli.config.homeDir, "t1.api-key.pem") + + if cwd, err := os.Getwd(); err != nil { + t.Fatal(err) + } else { + defer os.Chdir(cwd) + } + if err := os.Chdir(pkgDir); err != nil { + t.Fatal(err) + } + + // Deployed successfully + stdout.Reset() + httpClient.NextResponseString(200, `{"build": 42}`) + httpClient.NextResponseString(200, `{"deployed": true, "jobs": []}`) + assert.Nil(t, cli.Run("prod", "deploy", "--wait", "5", "--add-cert")) + assert.Contains(t, stdout.String(), "Success: Deployed '.' with build number 42") + assert.Contains(t, stdout.String(), "Success: Build 42 deployed to production") + + // Skipped due to no changes + stdout.Reset() + httpClient.NextResponseString(200, `{"build": 43}`) + httpClient.NextResponseString(200, `{"skipReason": "no changes detected"}`) + assert.Nil(t, cli.Run("prod", "deploy", "--wait", "5", "--add-cert")) + assert.Contains(t, stdout.String(), "Success: Build 43 completed") + + // Job failure (runId=0 avoids goroutine log streaming so test is deterministic) + stdout.Reset() + stderr.Reset() + httpClient.NextResponseString(200, `{"build": 44}`) + httpClient.NextResponseString(200, `{"jobs": [{"jobName": "production-aws-us-east-1c", "runStatus": "failure", "runId": 0, "instance": "default"}]}`) + assert.NotNil(t, cli.Run("prod", "deploy", "--wait", "5", "--add-cert")) + assert.Contains(t, stderr.String(), "Deployment failed") + assert.Contains(t, stderr.String(), "production-aws-us-east-1c") +} + func TestProdDeployWarnsOnInstance(t *testing.T) { pkgDir := filepath.Join(t.TempDir(), "app") createApplication(t, pkgDir, false, false) diff --git a/client/go/internal/vespa/system.go b/client/go/internal/vespa/system.go index 0fad8d8059de..805255390d7f 100644 --- a/client/go/internal/vespa/system.go +++ b/client/go/internal/vespa/system.go @@ -83,6 +83,12 @@ func (s System) SubmitURL(deployment Deployment) string { return fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/submit", s.URL, deployment.Application.Tenant, deployment.Application.Application) } +// BuildStatusURL returns the API URL for the build status of a submitted production build. +func (s System) BuildStatusURL(deployment Deployment, build int64) string { + return fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/build-status/%d", + s.URL, deployment.Application.Tenant, deployment.Application.Application, build) +} + // DeploymentURL returns the API URL of given deployment. func (s System) DeploymentURL(deployment Deployment) string { return fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/instance/%s/environment/%s/region/%s", diff --git a/client/go/internal/vespa/target_cloud.go b/client/go/internal/vespa/target_cloud.go index 37113b5135f8..72797adc34a6 100644 --- a/client/go/internal/vespa/target_cloud.go +++ b/client/go/internal/vespa/target_cloud.go @@ -3,11 +3,14 @@ package vespa import ( "encoding/json" + "errors" "fmt" + "io" "net/http" "sort" "strconv" "strings" + "sync" "time" "github.com/vespa-engine/vespa/client/go/internal/httputil" @@ -90,6 +93,20 @@ type logMessage struct { Message string `json:"message"` } +type buildStatusResponse struct { + Deployed bool `json:"deployed"` + Status string `json:"status"` + SkipReason string `json:"skipReason,omitempty"` + Jobs []buildStatusJob `json:"jobs"` +} + +type buildStatusJob struct { + JobName string `json:"jobName"` + RunStatus string `json:"runStatus"` + RunID int64 `json:"runId"` + Instance string `json:"instance"` +} + // CloudTarget creates a Target for the Vespa Cloud or hosted Vespa platform. func CloudTarget(httpClient httputil.Client, apiAuth Authenticator, deploymentAuth Authenticator, apiOptions APIOptions, deploymentOptions CloudDeploymentOptions, @@ -359,6 +376,153 @@ func (t *cloudTarget) printLog(response runResponse, last int64, muteStep string return response.LastID } +func printBuildJobLog(response runResponse, last int64, writer io.Writer, prefix string) int64 { + if response.LastID == 0 { + return last + } + var msgs []logMessage + for _, stepMsgs := range response.Log { + for _, msg := range stepMsgs { + if LogLevel(msg.Type) == 3 { // skip debug + continue + } + msgs = append(msgs, msg) + } + } + sort.Slice(msgs, func(i, j int) bool { return msgs[i].At < msgs[j].At }) + for _, msg := range msgs { + tm := time.UnixMilli(msg.At) + fmt.Fprintf(writer, "%s[%s] %-7s %s\n", prefix, tm.Format("15:04:05"), msg.Type, msg.Message) + } + return response.LastID +} + +type syncWriter struct { + mu sync.Mutex + w io.Writer +} + +func (s *syncWriter) Write(p []byte) (n int, err error) { + s.mu.Lock() + defer s.mu.Unlock() + return s.w.Write(p) +} + +func streamBuildJobLogs(target Target, job buildStatusJob, timeout time.Duration, writer io.Writer, retryInterval time.Duration) error { + d := target.Deployment() + runURL := fmt.Sprintf("%s/application/v4/tenant/%s/application/%s/instance/%s/job/%s/run/%d", + d.System.URL, d.Application.Tenant, d.Application.Application, + job.Instance, job.JobName, job.RunID) + req, err := http.NewRequest("GET", runURL, nil) + if err != nil { + return err + } + prefix := fmt.Sprintf("[%s] ", job.JobName) + lastID := int64(-1) + logFunc := func(status int, response []byte) (bool, error) { + if ok, err := isOK(status); !ok { + return ok, err + } + var resp runResponse + if err := json.Unmarshal(response, &resp); err != nil { + return false, err + } + lastID = printBuildJobLog(resp, lastID, writer, prefix) + if resp.Active { + return false, nil + } + if resp.Status != "success" { + return false, fmt.Errorf("%w: %s failed with status %s", ErrDeployment, job.JobName, resp.Status) + } + return true, nil + } + requestFunc := func() *http.Request { + q := req.URL.Query() + q.Set("after", strconv.FormatInt(lastID, 10)) + req.URL.RawQuery = q.Encode() + return req + } + _, err = deployRequest(target, logFunc, requestFunc, timeout, retryInterval) + return err +} + +// AwaitBuild waits for a production build to deploy by polling the build-status endpoint +// and streaming per-job run logs. Returns skipped=true if the build was skipped due to no +// changes. logWriter may be nil to suppress log output. +func AwaitBuild(target Target, buildID int64, timeout time.Duration, logWriter io.Writer) (skipped bool, _ error) { + if !target.IsCloud() { + return false, fmt.Errorf("AwaitBuild is only supported for cloud targets") + } + d := target.Deployment() + buildStatusURL := d.System.BuildStatusURL(d, buildID) + req, err := http.NewRequest("GET", buildStatusURL, nil) + if err != nil { + return false, err + } + var sw io.Writer + { + sw = &syncWriter{w: logWriter} + } + var ( + mutex sync.Mutex + waitGroup sync.WaitGroup + trackedJobs = make(map[string]bool) + jobErrors []error + isSkipped bool + ) + retryInterval := 2 * time.Second + statusFunc := func(status int, response []byte) (bool, error) { + if ok, err := isOK(status); !ok { + return ok, err + } + var resp buildStatusResponse + if err := json.Unmarshal(response, &resp); err != nil { + return false, err + } + if resp.SkipReason != "" { + isSkipped = true + return true, nil + } + if resp.Status == "cancelled" { + return false, fmt.Errorf("%w: build %d was cancelled", ErrDeployment, buildID) + } + for _, job := range resp.Jobs { + if job.RunStatus == "failure" || job.RunStatus == "error" || job.RunStatus == "aborted" { + return false, fmt.Errorf("%w: %s failed with status %s", ErrDeployment, job.JobName, job.RunStatus) + } + if sw != nil && job.RunID > 0 { + mutex.Lock() + if !trackedJobs[job.JobName] { + trackedJobs[job.JobName] = true + mutex.Unlock() + waitGroup.Go(func() { + if err := streamBuildJobLogs(target, job, timeout, sw, retryInterval); err != nil && !errors.Is(err, ErrWaitTimeout) { + mutex.Lock() + jobErrors = append(jobErrors, err) + mutex.Unlock() + } + }) + } else { + mutex.Unlock() + } + } + } + return resp.Deployed, nil + } + _, mainErr := deployRequest(target, statusFunc, func() *http.Request { return req }, timeout, retryInterval) + if mainErr != nil { + return false, mainErr + } + waitGroup.Wait() + mutex.Lock() + errs := jobErrors + mutex.Unlock() + if len(errs) > 0 { + return false, errs[0] + } + return isSkipped, nil +} + func (t *cloudTarget) discoverPrivateServices(timeout time.Duration) (map[string]*PrivateServiceInfo, error) { deploymentURL := t.apiOptions.System.PrivateServicesURL(t.deploymentOptions.Deployment) req, err := http.NewRequest("GET", deploymentURL.String(), nil) diff --git a/client/go/internal/vespa/target_test.go b/client/go/internal/vespa/target_test.go index 996cdf4ed7de..0b2e1511b267 100644 --- a/client/go/internal/vespa/target_test.go +++ b/client/go/internal/vespa/target_test.go @@ -3,6 +3,8 @@ package vespa import ( "bytes" + "errors" + "fmt" "io" "net/http" "strings" @@ -420,6 +422,144 @@ func TestCloudTargetPrivateServicesError(t *testing.T) { assert.Nil(t, services[0].PrivateService, "Should not have private service info when endpoint fails") } +func TestAwaitBuild(t *testing.T) { + target, client := createCloudTarget(t, io.Discard) + buildStatusURI := "/application/v4/tenant/t1/application/a1/build-status/42" + + // Deployed successfully + client.NextResponse(mock.HTTPResponse{ + URI: buildStatusURI, + Status: 200, + Body: []byte(`{"deployed": true, "status": "success", "jobs": []}`), + }) + skipped, err := AwaitBuild(target, 42, time.Second, nil) + assert.Nil(t, err) + assert.False(t, skipped) + + // Skipped due to no changes + client.NextResponse(mock.HTTPResponse{ + URI: buildStatusURI, + Status: 200, + Body: []byte(`{"skipReason": "no changes detected"}`), + }) + skipped, err = AwaitBuild(target, 42, time.Second, nil) + assert.Nil(t, err) + assert.True(t, skipped) + + // Build cancelled + client.NextResponse(mock.HTTPResponse{ + URI: buildStatusURI, + Status: 200, + Body: []byte(`{"status": "cancelled"}`), + }) + _, err = AwaitBuild(target, 42, time.Second, nil) + require.NotNil(t, err) + assert.True(t, errors.Is(err, ErrDeployment)) + assert.Contains(t, err.Error(), "cancelled") + + // Production job failure + client.NextResponse(mock.HTTPResponse{ + URI: buildStatusURI, + Status: 200, + Body: []byte(`{"jobs": [{"jobName": "production-aws-us-east-1c", "runStatus": "failure", "runId": 0, "instance": "default"}]}`), + }) + _, err = AwaitBuild(target, 42, time.Second, nil) + require.NotNil(t, err) + assert.True(t, errors.Is(err, ErrDeployment)) + assert.Contains(t, err.Error(), "production-aws-us-east-1c") + + // Test job failure (system-test job name) + client.NextResponse(mock.HTTPResponse{ + URI: buildStatusURI, + Status: 200, + Body: []byte(`{"jobs": [{"jobName": "system-test.aws-us-east-1c", "runStatus": "failure", "runId": 0, "instance": "i1"}]}`), + }) + _, err = AwaitBuild(target, 42, time.Second, nil) + require.NotNil(t, err) + assert.True(t, errors.Is(err, ErrDeployment)) + assert.Contains(t, err.Error(), "system-test.aws-us-east-1c") + + // Timeout + client.NextResponse(mock.HTTPResponse{ + URI: buildStatusURI, + Status: 200, + Body: []byte(`{"deployed": false, "status": "running"}`), + }) + _, err = AwaitBuild(target, 42, time.Millisecond, nil) + assert.True(t, errors.Is(err, ErrWaitTimeout)) +} + +func TestStreamBuildJobLogs(t *testing.T) { + target, client := createCloudTarget(t, io.Discard) + job := buildStatusJob{JobName: "production-aws-us-east-1c", RunID: 100, Instance: "i1"} + runURI := "/application/v4/tenant/t1/application/a1/instance/i1/job/production-aws-us-east-1c/run/100?after=-1" + + // Happy path: log is written and job succeeds + var buf bytes.Buffer + client.NextResponse(mock.HTTPResponse{ + URI: runURI, + Status: 200, + Body: []byte(`{"active": false, "status": "success", "lastId": 10, "log": {"step": [{"at": 1631707708431, "type": "info", "message": "Deploying"}]}}`), + }) + err := streamBuildJobLogs(target, job, time.Second, &buf, 0) + assert.Nil(t, err) + assert.Contains(t, buf.String(), "[production-aws-us-east-1c]") + assert.Contains(t, buf.String(), "Deploying") + + // Job failure is reported as ErrDeployment + client.NextResponse(mock.HTTPResponse{ + URI: runURI, + Status: 200, + Body: []byte(`{"active": false, "status": "error", "lastId": 5}`), + }) + err = streamBuildJobLogs(target, job, time.Second, io.Discard, 0) + require.NotNil(t, err) + assert.True(t, errors.Is(err, ErrDeployment)) + assert.Contains(t, err.Error(), "production-aws-us-east-1c") +} + +func TestPrintBuildJobLog(t *testing.T) { + // Empty response (LastID == 0): returns last unchanged, no output + var buf bytes.Buffer + last := printBuildJobLog(runResponse{}, 10, &buf, "[job] ") + assert.Equal(t, int64(10), last) + assert.Empty(t, buf.String()) + + // Messages sorted by timestamp and formatted with prefix + resp := runResponse{ + LastID: 7, + Log: map[string][]logMessage{ + "step": { + {At: 1631707708432, Type: "warning", Message: "second"}, + {At: 1631707708431, Type: "info", Message: "first"}, + }, + }, + } + last = printBuildJobLog(resp, -1, &buf, "[job] ") + assert.Equal(t, int64(7), last) + tm1 := time.Unix(1631707708, 431000) + tm2 := time.Unix(1631707708, 432000) + expected := fmt.Sprintf("[job] [%s] %-7s %s\n[job] [%s] %-7s %s\n", + tm1.Format("15:04:05"), "info", "first", + tm2.Format("15:04:05"), "warning", "second") + assert.Equal(t, expected, buf.String()) + + // Debug messages are filtered out + buf.Reset() + resp = runResponse{ + LastID: 8, + Log: map[string][]logMessage{ + "step": { + {At: 1631707708431, Type: "debug", Message: "should be filtered"}, + {At: 1631707708432, Type: "info", Message: "visible"}, + }, + }, + } + printBuildJobLog(resp, -1, &buf, "") + assert.NotContains(t, buf.String(), "should be filtered") + assert.Contains(t, buf.String(), "visible") +} + func createCloudTarget(t *testing.T, logWriter io.Writer) (Target, *mock.HTTPClient) { apiKey, err := CreateAPIKey() require.Nil(t, err)