diff --git a/api/handler/v1/runtime.go b/api/handler/v1/runtime.go index 69882e3b55..1821288f52 100644 --- a/api/handler/v1/runtime.go +++ b/api/handler/v1/runtime.go @@ -754,19 +754,12 @@ func (sv *RuntimeServiceServer) ListResourceSpecification(ctx context.Context, r } func (sv *RuntimeServiceServer) ReplayDryRun(ctx context.Context, req *pb.ReplayDryRunRequest) (*pb.ReplayDryRunResponse, error) { - replayWorkerRequest, err := sv.parseReplayRequest(&pb.ReplayRequest{ - ProjectName: req.ProjectName, - JobName: req.JobName, - Namespace: req.Namespace, - StartDate: req.StartDate, - EndDate: req.EndDate, - Force: false, - }) + replayRequest, err := sv.parseReplayRequest(req.ProjectName, req.Namespace, req.JobName, req.StartDate, req.EndDate, false) if err != nil { return nil, err } - rootNode, err := sv.jobSvc.ReplayDryRun(replayWorkerRequest) + rootNode, err := sv.jobSvc.ReplayDryRun(replayRequest) if err != nil { return nil, status.Errorf(codes.Internal, "error while processing replay dry run: %v", err) } @@ -782,7 +775,7 @@ func (sv *RuntimeServiceServer) ReplayDryRun(ctx context.Context, req *pb.Replay } func (sv *RuntimeServiceServer) Replay(ctx context.Context, req *pb.ReplayRequest) (*pb.ReplayResponse, error) { - replayWorkerRequest, err := sv.parseReplayRequest(req) + replayWorkerRequest, err := sv.parseReplayRequest(req.ProjectName, req.Namespace, req.JobName, req.StartDate, req.EndDate, req.Force) if err != nil { return nil, err } @@ -808,7 +801,7 @@ func (sv *RuntimeServiceServer) GetReplayStatus(ctx context.Context, req *pb.Get return nil, err } - replayState, err := sv.jobSvc.GetStatus(ctx, replayRequest) + replayState, err := sv.jobSvc.GetReplayStatus(ctx, replayRequest) if err != nil { return nil, status.Errorf(codes.Internal, "error while getting replay: %v", err) } @@ -842,37 +835,66 @@ func (sv *RuntimeServiceServer) parseReplayStatusRequest(req *pb.GetReplayStatus return replayRequest, nil } -func (sv *RuntimeServiceServer) parseReplayRequest(req *pb.ReplayRequest) (models.ReplayRequest, error) { +func (sv *RuntimeServiceServer) ListReplays(ctx context.Context, req *pb.ListReplaysRequest) (*pb.ListReplaysResponse, error) { projSpec, err := sv.getProjectSpec(req.ProjectName) + if err != nil { + return nil, err + } + + replays, err := sv.jobSvc.GetReplayList(projSpec.ID) + if err != nil { + return nil, status.Errorf(codes.Internal, "error while getting replay list: %v", err) + } + + var replaySpecs []*pb.ReplaySpec + for _, replaySpec := range replays { + replaySpecs = append(replaySpecs, &pb.ReplaySpec{ + Id: replaySpec.ID.String(), + JobName: replaySpec.Job.GetName(), + StartDate: timestamppb.New(replaySpec.StartDate), + EndDate: timestamppb.New(replaySpec.EndDate), + State: replaySpec.Status, + CreatedAt: timestamppb.New(replaySpec.CreatedAt), + }) + } + + return &pb.ListReplaysResponse{ + ReplayList: replaySpecs, + }, nil +} + +func (sv *RuntimeServiceServer) parseReplayRequest(projectName string, namespace string, jobName string, startDate string, + endDate string, forceFlag bool) (models.ReplayRequest, error) { + projSpec, err := sv.getProjectSpec(projectName) if err != nil { return models.ReplayRequest{}, err } - jobSpec, err := sv.getJobSpec(projSpec, req.Namespace, req.JobName) + jobSpec, err := sv.getJobSpec(projSpec, namespace, jobName) if err != nil { return models.ReplayRequest{}, err } - startDate, err := time.Parse(job.ReplayDateFormat, req.StartDate) + windowStart, err := time.Parse(job.ReplayDateFormat, startDate) if err != nil { return models.ReplayRequest{}, status.Errorf(codes.InvalidArgument, "unable to parse replay start date(e.g. %s): %v", job.ReplayDateFormat, err) } - endDate := startDate - if req.EndDate != "" { - if endDate, err = time.Parse(job.ReplayDateFormat, req.EndDate); err != nil { + windowEnd := windowStart + if endDate != "" { + if windowEnd, err = time.Parse(job.ReplayDateFormat, endDate); err != nil { return models.ReplayRequest{}, status.Errorf(codes.InvalidArgument, "unable to parse replay end date(e.g. %s): %v", job.ReplayDateFormat, err) } } - if endDate.Before(startDate) { + if windowEnd.Before(windowStart) { return models.ReplayRequest{}, status.Errorf(codes.InvalidArgument, "replay end date cannot be before start date") } replayRequest := models.ReplayRequest{ Job: jobSpec, - Start: startDate, - End: endDate, + Start: windowStart, + End: windowEnd, Project: projSpec, - Force: req.Force, + Force: forceFlag, } return replayRequest, nil } diff --git a/api/handler/v1/runtime_test.go b/api/handler/v1/runtime_test.go index c60ff22b44..389e5df002 100644 --- a/api/handler/v1/runtime_test.go +++ b/api/handler/v1/runtime_test.go @@ -2273,7 +2273,7 @@ func TestRuntimeServiceServer(t *testing.T) { jobService := new(mock.JobService) defer jobService.AssertExpectations(t) - jobService.On("GetStatus", context.TODO(), replayRequest).Return(replayState, nil) + jobService.On("GetReplayStatus", context.TODO(), replayRequest).Return(replayState, nil) adapter := v1.NewAdapter(nil, nil) @@ -2316,7 +2316,7 @@ func TestRuntimeServiceServer(t *testing.T) { errMessage := "internal error" jobService := new(mock.JobService) defer jobService.AssertExpectations(t) - jobService.On("GetStatus", context.TODO(), replayRequest).Return(models.ReplayState{}, errors.New(errMessage)) + jobService.On("GetReplayStatus", context.TODO(), replayRequest).Return(models.ReplayState{}, errors.New(errMessage)) adapter := v1.NewAdapter(nil, nil) @@ -2345,4 +2345,148 @@ func TestRuntimeServiceServer(t *testing.T) { assert.Nil(t, replayStatusResponse) }) }) + + t.Run("ListReplays", func(t *testing.T) { + projectName := "a-data-project" + projectSpec := models.ProjectSpec{ + ID: uuid.Must(uuid.NewRandom()), + Name: projectName, + } + + t.Run("should get list of replay for a project", func(t *testing.T) { + jobName := "a-data-job" + jobSpec := models.JobSpec{ + ID: uuid.Must(uuid.NewRandom()), + Name: jobName, + Task: models.JobSpecTask{ + Config: models.JobSpecConfigs{ + { + Name: "do", + Value: "this", + }, + }, + }, + Assets: *models.JobAssets{}.New( + []models.JobSpecAsset{ + { + Name: "query.sql", + Value: "select * from 1", + }, + }), + } + + replaySpecs := []models.ReplaySpec{ + { + ID: uuid.Must(uuid.NewRandom()), + Job: jobSpec, + StartDate: time.Date(2020, 11, 25, 0, 0, 0, 0, time.UTC), + EndDate: time.Date(2020, 11, 28, 0, 0, 0, 0, time.UTC), + Status: models.ReplayStatusReplayed, + CreatedAt: time.Date(2021, 8, 1, 0, 0, 0, 0, time.UTC), + }, + { + ID: uuid.Must(uuid.NewRandom()), + Job: jobSpec, + StartDate: time.Date(2020, 12, 25, 0, 0, 0, 0, time.UTC), + EndDate: time.Date(2020, 12, 28, 0, 0, 0, 0, time.UTC), + Status: models.ReplayStatusInProgress, + CreatedAt: time.Date(2021, 8, 2, 0, 0, 0, 0, time.UTC), + }, + } + expectedReplayList := &pb.ListReplaysResponse{ + ReplayList: []*pb.ReplaySpec{ + { + Id: replaySpecs[0].ID.String(), + JobName: jobSpec.Name, + StartDate: timestamppb.New(time.Date(2020, 11, 25, 0, 0, 0, 0, time.UTC)), + EndDate: timestamppb.New(time.Date(2020, 11, 28, 0, 0, 0, 0, time.UTC)), + State: models.ReplayStatusReplayed, + CreatedAt: timestamppb.New(time.Date(2021, 8, 1, 0, 0, 0, 0, time.UTC)), + }, + { + Id: replaySpecs[1].ID.String(), + JobName: jobSpec.Name, + StartDate: timestamppb.New(time.Date(2020, 12, 25, 0, 0, 0, 0, time.UTC)), + EndDate: timestamppb.New(time.Date(2020, 12, 28, 0, 0, 0, 0, time.UTC)), + State: models.ReplayStatusInProgress, + CreatedAt: timestamppb.New(time.Date(2021, 8, 2, 0, 0, 0, 0, time.UTC)), + }, + }, + } + + projectRepository := new(mock.ProjectRepository) + projectRepository.On("GetByName", projectName).Return(projectSpec, nil) + defer projectRepository.AssertExpectations(t) + + projectRepoFactory := new(mock.ProjectRepoFactory) + projectRepoFactory.On("New").Return(projectRepository) + defer projectRepoFactory.AssertExpectations(t) + + jobService := new(mock.JobService) + defer jobService.AssertExpectations(t) + jobService.On("GetReplayList", projectSpec.ID).Return(replaySpecs, nil) + + adapter := v1.NewAdapter(nil, nil) + + runtimeServiceServer := v1.NewRuntimeServiceServer( + log, + "Version", + jobService, nil, + nil, + projectRepoFactory, + nil, + nil, + adapter, + nil, + nil, + nil, + ) + + replayRequestPb := pb.ListReplaysRequest{ + ProjectName: projectName, + } + replayStatusResponse, err := runtimeServiceServer.ListReplays(context.Background(), &replayRequestPb) + + assert.Nil(t, err) + assert.Equal(t, expectedReplayList, replayStatusResponse) + }) + t.Run("should failed when unable to get status of a replay", func(t *testing.T) { + projectRepository := new(mock.ProjectRepository) + projectRepository.On("GetByName", projectName).Return(projectSpec, nil) + defer projectRepository.AssertExpectations(t) + + projectRepoFactory := new(mock.ProjectRepoFactory) + projectRepoFactory.On("New").Return(projectRepository) + defer projectRepoFactory.AssertExpectations(t) + + errMessage := "internal error" + jobService := new(mock.JobService) + defer jobService.AssertExpectations(t) + jobService.On("GetReplayList", projectSpec.ID).Return([]models.ReplaySpec{}, errors.New(errMessage)) + + adapter := v1.NewAdapter(nil, nil) + + runtimeServiceServer := v1.NewRuntimeServiceServer( + log, + "Version", + jobService, nil, + nil, + projectRepoFactory, + nil, + nil, + adapter, + nil, + nil, + nil, + ) + + replayRequestPb := pb.ListReplaysRequest{ + ProjectName: projectName, + } + replayListResponse, err := runtimeServiceServer.ListReplays(context.Background(), &replayRequestPb) + + assert.Contains(t, err.Error(), errMessage) + assert.Nil(t, replayListResponse) + }) + }) } diff --git a/cmd/replay.go b/cmd/replay.go index ec4d5e929b..5582147634 100644 --- a/cmd/replay.go +++ b/cmd/replay.go @@ -67,6 +67,7 @@ func replayCommand(l log.Logger, conf config.Provider) *cli.Command { } cmd.AddCommand(replayRunSubCommand(l, conf)) cmd.AddCommand(replayStatusSubCommand(l, conf)) + cmd.AddCommand(replayListSubCommand(l, conf)) return cmd } @@ -349,3 +350,77 @@ func printStatusTree(instance *pb.ReplayStatusTreeNode, tree treeprint.Tree) tre } return tree } + +func replayListSubCommand(l log.Logger, conf config.Provider) *cli.Command { + var ( + replayProject string + ) + + reCmd := &cli.Command{ + Use: "list", + Short: "get list of a replay using project ID", + Example: "optimus replay status replay-id", + Long: ` +The list command is used to fetch the recent replay in one project. + `, + } + reCmd.Flags().StringVarP(&replayProject, "project", "p", "", "project name of optimus managed ocean repository") + reCmd.MarkFlagRequired("project") + reCmd.RunE = func(cmd *cli.Command, args []string) error { + dialTimeoutCtx, dialCancel := context.WithTimeout(context.Background(), OptimusDialTimeout) + defer dialCancel() + + conn, err := createConnection(dialTimeoutCtx, conf.GetHost()) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + l.Info("can't reach optimus service") + } + return err + } + defer conn.Close() + + replayRequestTimeout, replayRequestCancel := context.WithTimeout(context.Background(), replayTimeout) + defer replayRequestCancel() + + runtime := pb.NewRuntimeServiceClient(conn) + replayStatusRequest := &pb.ListReplaysRequest{ + ProjectName: replayProject, + } + replayResponse, err := runtime.ListReplays(replayRequestTimeout, replayStatusRequest) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + l.Info("replay request took too long, timing out") + } + return errors.Wrapf(err, "failed to get replay requests") + } + if len(replayResponse.ReplayList) == 0 { + l.Info(fmt.Sprintf("no replays were found in %s project.", replayProject)) + } else { + printReplayListResponse(l, replayResponse) + } + return nil + } + return reCmd +} + +func printReplayListResponse(l log.Logger, replayListResponse *pb.ListReplaysResponse) { + l.Info(coloredNotice("LATEST REPLAY")) + table := tablewriter.NewWriter(l.Writer()) + table.SetBorder(false) + table.SetHeader([]string{ + "ID", + "Job", + "Start Date", + "End Date", + "Requested At", + "Status", + }) + + for _, replaySpec := range replayListResponse.ReplayList { + table.Append([]string{replaySpec.Id, replaySpec.JobName, replaySpec.StartDate.AsTime().Format(models.JobDatetimeLayout), + replaySpec.EndDate.AsTime().Format(models.JobDatetimeLayout), replaySpec.CreatedAt.AsTime().Format(time.RFC3339), + replaySpec.State}) + } + + table.Render() +} diff --git a/cmd/server/server.go b/cmd/server/server.go index 6f71ceb0fa..073be12ad3 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -84,11 +84,11 @@ func (fac *replaySpecRepoRepository) New() store.ReplaySpecRepository { type replayWorkerFact struct { replaySpecRepoFac job.ReplaySpecRepoFactory scheduler models.SchedulerUnit - jsonLog log.Logger + logger log.Logger } func (fac *replayWorkerFact) New() job.ReplayWorker { - return job.NewReplayWorker(fac.jsonLog, fac.replaySpecRepoFac, fac.scheduler) + return job.NewReplayWorker(fac.logger, fac.replaySpecRepoFac, fac.scheduler) } // jobSpecRepoFactory stores raw specifications @@ -414,6 +414,7 @@ func Initialize(l log.Logger, conf config.Provider) error { replayWorkerFactory := &replayWorkerFact{ replaySpecRepoFac: replaySpecRepoFac, scheduler: models.BatchScheduler, + logger: l, } replayValidator := job.NewReplayValidator(models.BatchScheduler) replaySyncer := job.NewReplaySyncer( diff --git a/ext/third_party/OpenAPI/odpf/optimus/runtime_service.swagger.json b/ext/third_party/OpenAPI/odpf/optimus/runtime_service.swagger.json index 079e798eae..380b515e72 100644 --- a/ext/third_party/OpenAPI/odpf/optimus/runtime_service.swagger.json +++ b/ext/third_party/OpenAPI/odpf/optimus/runtime_service.swagger.json @@ -229,136 +229,6 @@ ] } }, - "/v1/project/{projectName}/job/{jobName}/replay": { - "post": { - "operationId": "RuntimeService_Replay", - "responses": { - "200": { - "description": "A successful response.", - "schema": { - "$ref": "#/definitions/optimusReplayResponse" - } - }, - "default": { - "description": "An unexpected error response.", - "schema": { - "$ref": "#/definitions/rpcStatus" - } - } - }, - "parameters": [ - { - "name": "projectName", - "in": "path", - "required": true, - "type": "string" - }, - { - "name": "jobName", - "in": "path", - "required": true, - "type": "string" - }, - { - "name": "body", - "in": "body", - "required": true, - "schema": { - "$ref": "#/definitions/optimusReplayRequest" - } - } - ], - "tags": [ - "RuntimeService" - ] - } - }, - "/v1/project/{projectName}/job/{jobName}/replay-dry-run": { - "post": { - "operationId": "RuntimeService_ReplayDryRun", - "responses": { - "200": { - "description": "A successful response.", - "schema": { - "$ref": "#/definitions/optimusReplayDryRunResponse" - } - }, - "default": { - "description": "An unexpected error response.", - "schema": { - "$ref": "#/definitions/rpcStatus" - } - } - }, - "parameters": [ - { - "name": "projectName", - "in": "path", - "required": true, - "type": "string" - }, - { - "name": "jobName", - "in": "path", - "required": true, - "type": "string" - }, - { - "name": "body", - "in": "body", - "required": true, - "schema": { - "$ref": "#/definitions/optimusReplayRequest" - } - } - ], - "tags": [ - "RuntimeService" - ] - } - }, - "/v1/project/{projectName}/job/{jobName}/replay/{id}": { - "get": { - "operationId": "RuntimeService_GetReplayStatus", - "responses": { - "200": { - "description": "A successful response.", - "schema": { - "$ref": "#/definitions/optimusGetReplayStatusResponse" - } - }, - "default": { - "description": "An unexpected error response.", - "schema": { - "$ref": "#/definitions/rpcStatus" - } - } - }, - "parameters": [ - { - "name": "projectName", - "in": "path", - "required": true, - "type": "string" - }, - { - "name": "jobName", - "in": "path", - "required": true, - "type": "string" - }, - { - "name": "id", - "in": "path", - "required": true, - "type": "string" - } - ], - "tags": [ - "RuntimeService" - ] - } - }, "/v1/project/{projectName}/job/{jobName}/status": { "get": { "summary": "JobStatus returns the current and past run status of jobs", @@ -832,6 +702,152 @@ ] } }, + "/v1/project/{projectName}/replay": { + "get": { + "operationId": "RuntimeService_ListReplays", + "responses": { + "200": { + "description": "A successful response.", + "schema": { + "$ref": "#/definitions/optimusListReplaysResponse" + } + }, + "default": { + "description": "An unexpected error response.", + "schema": { + "$ref": "#/definitions/rpcStatus" + } + } + }, + "parameters": [ + { + "name": "projectName", + "in": "path", + "required": true, + "type": "string" + } + ], + "tags": [ + "RuntimeService" + ] + }, + "post": { + "operationId": "RuntimeService_Replay", + "responses": { + "200": { + "description": "A successful response.", + "schema": { + "$ref": "#/definitions/optimusReplayResponse" + } + }, + "default": { + "description": "An unexpected error response.", + "schema": { + "$ref": "#/definitions/rpcStatus" + } + } + }, + "parameters": [ + { + "name": "projectName", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "body", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/optimusReplayRequest" + } + } + ], + "tags": [ + "RuntimeService" + ] + } + }, + "/v1/project/{projectName}/replay/dryrun": { + "post": { + "operationId": "RuntimeService_DryRunReplay", + "responses": { + "200": { + "description": "A successful response.", + "schema": { + "$ref": "#/definitions/optimusDryRunReplayResponse" + } + }, + "default": { + "description": "An unexpected error response.", + "schema": { + "$ref": "#/definitions/rpcStatus" + } + } + }, + "parameters": [ + { + "name": "projectName", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "body", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/optimusDryRunReplayRequest" + } + } + ], + "tags": [ + "RuntimeService" + ] + } + }, + "/v1/project/{projectName}/replay/{id}": { + "get": { + "operationId": "RuntimeService_GetReplayStatus", + "responses": { + "200": { + "description": "A successful response.", + "schema": { + "$ref": "#/definitions/optimusGetReplayStatusResponse" + } + }, + "default": { + "description": "An unexpected error response.", + "schema": { + "$ref": "#/definitions/rpcStatus" + } + } + }, + "parameters": [ + { + "name": "projectName", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "id", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "jobName", + "in": "query", + "required": false, + "type": "string" + } + ], + "tags": [ + "RuntimeService" + ] + } + }, "/v1/project/{projectName}/secret/{secretName}": { "post": { "summary": "RegisterSecret creates a new secret of a project", @@ -1150,6 +1166,37 @@ } } }, + "optimusDryRunReplayRequest": { + "type": "object", + "properties": { + "projectName": { + "type": "string" + }, + "jobName": { + "type": "string" + }, + "namespace": { + "type": "string" + }, + "startDate": { + "type": "string" + }, + "endDate": { + "type": "string" + } + } + }, + "optimusDryRunReplayResponse": { + "type": "object", + "properties": { + "success": { + "type": "boolean" + }, + "response": { + "$ref": "#/definitions/optimusReplayExecutionTreeNode" + } + } + }, "optimusDumpJobSpecificationResponse": { "type": "object", "properties": { @@ -1447,6 +1494,17 @@ } } }, + "optimusListReplaysResponse": { + "type": "object", + "properties": { + "replayList": { + "type": "array", + "items": { + "$ref": "#/definitions/optimusReplaySpec" + } + } + } + }, "optimusListResourceSpecificationResponse": { "type": "object", "properties": { @@ -1644,17 +1702,6 @@ } } }, - "optimusReplayDryRunResponse": { - "type": "object", - "properties": { - "success": { - "type": "boolean" - }, - "response": { - "$ref": "#/definitions/optimusReplayExecutionTreeNode" - } - } - }, "optimusReplayExecutionTreeNode": { "type": "object", "properties": { @@ -1707,6 +1754,32 @@ } } }, + "optimusReplaySpec": { + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "jobName": { + "type": "string" + }, + "startDate": { + "type": "string", + "format": "date-time" + }, + "endDate": { + "type": "string", + "format": "date-time" + }, + "state": { + "type": "string" + }, + "createdAt": { + "type": "string", + "format": "date-time" + } + } + }, "optimusReplayStatusRun": { "type": "object", "properties": { diff --git a/job/replay.go b/job/replay.go index e2b85b110f..72881f88cb 100644 --- a/job/replay.go +++ b/job/replay.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/google/uuid" "github.com/odpf/optimus/core/cron" "github.com/odpf/optimus/core/set" "github.com/odpf/optimus/core/tree" @@ -201,7 +202,7 @@ func getRunsBetweenDates(start time.Time, end time.Time, schedule string) ([]tim return runs, nil } -func (srv *Service) GetStatus(ctx context.Context, replayRequest models.ReplayRequest) (models.ReplayState, error) { +func (srv *Service) GetReplayStatus(ctx context.Context, replayRequest models.ReplayRequest) (models.ReplayState, error) { // Get replay replaySpec, err := srv.replayManager.GetReplay(replayRequest.ID) if err != nil { @@ -265,3 +266,7 @@ func (srv *Service) populateDownstreamRunsWithStatus(ctx context.Context, projec } return parentNode, nil } + +func (srv *Service) GetReplayList(projectUUID uuid.UUID) ([]models.ReplaySpec, error) { + return srv.replayManager.GetReplayList(projectUUID) +} diff --git a/job/replay_manager.go b/job/replay_manager.go index 46161fcb93..12f99759f9 100644 --- a/job/replay_manager.go +++ b/job/replay_manager.go @@ -28,6 +28,8 @@ var ( TimestampLogFormat = "2006-01-02T15:04:05+00:00" // schedulerBatchSize number of run instances to be checked per request schedulerBatchSize = 100 + //replayListWindow window interval to fetch recent replays + replayListWindow = -3 * 30 * 24 * time.Hour ) const ( @@ -155,6 +157,25 @@ func (m *Manager) GetReplay(replayUUID uuid.UUID) (models.ReplaySpec, error) { return m.replaySpecRepoFac.New().GetByID(replayUUID) } +// GetReplayList using Project ID +func (m *Manager) GetReplayList(projectUUID uuid.UUID) ([]models.ReplaySpec, error) { + replays, err := m.replaySpecRepoFac.New().GetByProjectID(projectUUID) + if err != nil { + if err == store.ErrResourceNotFound { + return []models.ReplaySpec{}, nil + } + return []models.ReplaySpec{}, err + } + + var recentReplays []models.ReplaySpec + for _, replay := range replays { + if replay.CreatedAt.After(time.Now().UTC().Add(replayListWindow)) { + recentReplays = append(recentReplays, replay) + } + } + return recentReplays, nil +} + // GetRunsStatus func (m *Manager) GetRunStatus(ctx context.Context, projectSpec models.ProjectSpec, startDate time.Time, endDate time.Time, jobName string) ([]models.JobStatus, error) { diff --git a/job/replay_manager_test.go b/job/replay_manager_test.go index e845420a18..2eb272f2eb 100644 --- a/job/replay_manager_test.go +++ b/job/replay_manager_test.go @@ -336,6 +336,128 @@ func TestReplayManager(t *testing.T) { assert.Nil(t, err) }) }) + t.Run("GetReplayList", func(t *testing.T) { + t.Run("should return replay list given a valid project UUID", func(t *testing.T) { + projectUUID := uuid.Must(uuid.NewRandom()) + replayJob := models.JobSpec{ + Name: "sample-job", + } + startDate := time.Date(2020, time.Month(8), 5, 0, 0, 0, 0, time.UTC) + endDate := time.Date(2020, time.Month(8), 7, 0, 0, 0, 0, time.UTC) + replaySpecs := []models.ReplaySpec{ + { + ID: uuid.Must(uuid.NewRandom()), + Job: replayJob, + StartDate: startDate, + EndDate: endDate, + Status: models.ReplayStatusReplayed, + CreatedAt: time.Now().UTC().Add(time.Hour * -1), + }, + } + + replayRepository := new(mock.ReplayRepository) + defer replayRepository.AssertExpectations(t) + replayRepository.On("GetByProjectID", projectUUID).Return(replaySpecs, nil) + + replaySpecRepoFac := new(mock.ReplaySpecRepoFactory) + defer replaySpecRepoFac.AssertExpectations(t) + replaySpecRepoFac.On("New").Return(replayRepository) + + replayManager := job.NewManager(log, nil, replaySpecRepoFac, nil, job.ReplayManagerConfig{}, nil, nil, nil) + replayListResult, err := replayManager.GetReplayList(projectUUID) + + assert.Nil(t, err) + assert.Equal(t, replaySpecs, replayListResult) + + err = replayManager.Close() + assert.Nil(t, err) + }) + t.Run("should only return recent replays given a valid project UUID", func(t *testing.T) { + projectUUID := uuid.Must(uuid.NewRandom()) + replayJob := models.JobSpec{ + Name: "sample-job", + } + startDate := time.Date(2020, time.Month(8), 5, 0, 0, 0, 0, time.UTC) + endDate := time.Date(2020, time.Month(8), 7, 0, 0, 0, 0, time.UTC) + replaySpecs := []models.ReplaySpec{ + { + ID: uuid.Must(uuid.NewRandom()), + Job: replayJob, + StartDate: startDate, + EndDate: endDate, + Status: models.ReplayStatusReplayed, + CreatedAt: time.Now().UTC().Add(time.Hour * -1), + }, + { + ID: uuid.Must(uuid.NewRandom()), + Job: replayJob, + StartDate: startDate, + EndDate: endDate, + Status: models.ReplayStatusReplayed, + CreatedAt: time.Now().UTC().Add(time.Hour * -24 * 100), + }, + } + + replayRepository := new(mock.ReplayRepository) + defer replayRepository.AssertExpectations(t) + replayRepository.On("GetByProjectID", projectUUID).Return(replaySpecs, nil) + + replaySpecRepoFac := new(mock.ReplaySpecRepoFactory) + defer replaySpecRepoFac.AssertExpectations(t) + replaySpecRepoFac.On("New").Return(replayRepository) + + replayManager := job.NewManager(log, nil, replaySpecRepoFac, nil, job.ReplayManagerConfig{}, nil, nil, nil) + replayListResult, err := replayManager.GetReplayList(projectUUID) + + expectedReplaySpecs := []models.ReplaySpec{replaySpecs[0]} + assert.Nil(t, err) + assert.Equal(t, expectedReplaySpecs, replayListResult) + + err = replayManager.Close() + assert.Nil(t, err) + }) + t.Run("should not return error when replay is not found", func(t *testing.T) { + projectUUID := uuid.Must(uuid.NewRandom()) + + replayRepository := new(mock.ReplayRepository) + defer replayRepository.AssertExpectations(t) + replayRepository.On("GetByProjectID", projectUUID).Return([]models.ReplaySpec{}, store.ErrResourceNotFound) + + replaySpecRepoFac := new(mock.ReplaySpecRepoFactory) + defer replaySpecRepoFac.AssertExpectations(t) + replaySpecRepoFac.On("New").Return(replayRepository) + + replayManager := job.NewManager(log, nil, replaySpecRepoFac, nil, job.ReplayManagerConfig{}, nil, nil, nil) + replayResult, err := replayManager.GetReplayList(projectUUID) + + assert.Nil(t, err) + assert.Equal(t, []models.ReplaySpec{}, replayResult) + + err = replayManager.Close() + assert.Nil(t, err) + }) + t.Run("should return error when unable to get replay list", func(t *testing.T) { + projectUUID := uuid.Must(uuid.NewRandom()) + + replayRepository := new(mock.ReplayRepository) + defer replayRepository.AssertExpectations(t) + errorMsg := "unable to get list of replays" + replayRepository.On("GetByProjectID", projectUUID).Return([]models.ReplaySpec{}, errors.New(errorMsg)) + + replaySpecRepoFac := new(mock.ReplaySpecRepoFactory) + defer replaySpecRepoFac.AssertExpectations(t) + replaySpecRepoFac.On("New").Return(replayRepository) + + replayManager := job.NewManager(log, nil, replaySpecRepoFac, nil, job.ReplayManagerConfig{}, nil, nil, nil) + replayResult, err := replayManager.GetReplayList(projectUUID) + + assert.Equal(t, errorMsg, err.Error()) + assert.Equal(t, []models.ReplaySpec{}, replayResult) + + err = replayManager.Close() + assert.Nil(t, err) + }) + }) t.Run("GetRunStatus", func(t *testing.T) { projectSpec := models.ProjectSpec{ Name: "project-name", diff --git a/job/replay_test.go b/job/replay_test.go index 8a6f1f46e6..e569abac76 100644 --- a/job/replay_test.go +++ b/job/replay_test.go @@ -443,7 +443,7 @@ func TestReplay(t *testing.T) { } jobSvc := job.NewService(nil, nil, nil, dumpAssets, nil, nil, nil, nil, replayManager) - _, err := jobSvc.GetStatus(ctx, replayRequest) + _, err := jobSvc.GetReplayStatus(ctx, replayRequest) assert.NotNil(t, err) assert.Equal(t, errorMsg, err.Error()) @@ -478,7 +478,7 @@ func TestReplay(t *testing.T) { } jobSvc := job.NewService(nil, nil, nil, dumpAssets, nil, nil, nil, nil, replayManager) - _, err := jobSvc.GetStatus(ctx, replayRequest) + _, err := jobSvc.GetReplayStatus(ctx, replayRequest) assert.Equal(t, errorMsg, err.Error()) }) @@ -545,7 +545,7 @@ func TestReplay(t *testing.T) { } jobSvc := job.NewService(nil, nil, nil, dumpAssets, nil, nil, nil, nil, replayManager) - _, err := jobSvc.GetStatus(ctx, replayRequest) + _, err := jobSvc.GetReplayStatus(ctx, replayRequest) assert.Equal(t, errorMsg, err.Error()) }) @@ -612,9 +612,48 @@ func TestReplay(t *testing.T) { } jobSvc := job.NewService(nil, nil, nil, dumpAssets, nil, nil, nil, nil, replayManager) - _, err := jobSvc.GetStatus(ctx, replayRequest) + _, err := jobSvc.GetReplayStatus(ctx, replayRequest) assert.Equal(t, errorMsg, err.Error()) }) }) + + t.Run("GetReplayList", func(t *testing.T) { + t.Run("should return list of replay given project id", func(t *testing.T) { + replayID := uuid.Must(uuid.NewRandom()) + startDate := time.Date(2020, time.Month(8), 5, 0, 0, 0, 0, time.UTC) + endDate := time.Date(2020, time.Month(8), 7, 0, 0, 0, 0, time.UTC) + replaySpecs := []models.ReplaySpec{ + { + ID: replayID, + Job: specs[spec1], + StartDate: startDate, + EndDate: endDate, + Status: models.ReplayStatusReplayed, + }, + } + + replayManager := new(mock.ReplayManager) + defer replayManager.AssertExpectations(t) + replayManager.On("GetReplayList", projSpec.ID).Return(replaySpecs, nil) + + jobSvc := job.NewService(nil, nil, nil, dumpAssets, nil, nil, nil, nil, replayManager) + replayList, err := jobSvc.GetReplayList(projSpec.ID) + + assert.Nil(t, err) + assert.Equal(t, replaySpecs, replayList) + }) + t.Run("should return error when unable to get replay list", func(t *testing.T) { + replayManager := new(mock.ReplayManager) + defer replayManager.AssertExpectations(t) + errorMsg := "unable to get replay list" + replayManager.On("GetReplayList", projSpec.ID).Return([]models.ReplaySpec{}, errors.New(errorMsg)) + + jobSvc := job.NewService(nil, nil, nil, dumpAssets, nil, nil, nil, nil, replayManager) + replayList, err := jobSvc.GetReplayList(projSpec.ID) + + assert.Equal(t, errorMsg, err.Error()) + assert.Equal(t, []models.ReplaySpec{}, replayList) + }) + }) } diff --git a/job/service.go b/job/service.go index 0a810b4688..26284ce9d0 100644 --- a/job/service.go +++ b/job/service.go @@ -61,6 +61,7 @@ type ReplayManager interface { Init() Replay(context.Context, models.ReplayRequest) (string, error) GetReplay(uuid.UUID) (models.ReplaySpec, error) + GetReplayList(projectID uuid.UUID) ([]models.ReplaySpec, error) GetRunStatus(ctx context.Context, projectSpec models.ProjectSpec, startDate time.Time, endDate time.Time, jobName string) ([]models.JobStatus, error) } diff --git a/mock/job.go b/mock/job.go index 9018874963..6dee7f2c44 100644 --- a/mock/job.go +++ b/mock/job.go @@ -3,6 +3,8 @@ package mock import ( "context" + "github.com/google/uuid" + "github.com/odpf/optimus/job" "github.com/odpf/optimus/core/tree" @@ -161,11 +163,16 @@ func (j *JobService) Replay(ctx context.Context, replayRequest models.ReplayRequ return args.Get(0).(string), args.Error(1) } -func (j *JobService) GetStatus(ctx context.Context, replayRequest models.ReplayRequest) (models.ReplayState, error) { +func (j *JobService) GetReplayStatus(ctx context.Context, replayRequest models.ReplayRequest) (models.ReplayState, error) { args := j.Called(ctx, replayRequest) return args.Get(0).(models.ReplayState), args.Error(1) } +func (j *JobService) GetReplayList(projectUUID uuid.UUID) ([]models.ReplaySpec, error) { + args := j.Called(projectUUID) + return args.Get(0).([]models.ReplaySpec), args.Error(1) +} + func (j *JobService) Run(ctx context.Context, ns models.NamespaceSpec, js []models.JobSpec, obs progress.Observer) error { args := j.Called(ctx, ns, js, obs) return args.Error(0) diff --git a/mock/replay.go b/mock/replay.go index a97e2d73db..b6f24eada4 100644 --- a/mock/replay.go +++ b/mock/replay.go @@ -54,6 +54,11 @@ func (repo *ReplayRepository) GetByProjectIDAndStatus(projectID uuid.UUID, statu return args.Get(0).([]models.ReplaySpec), args.Error(1) } +func (repo *ReplayRepository) GetByProjectID(projectID uuid.UUID) ([]models.ReplaySpec, error) { + args := repo.Called(projectID) + return args.Get(0).([]models.ReplaySpec), args.Error(1) +} + type ReplaySpecRepoFactory struct { mock.Mock } @@ -81,6 +86,11 @@ func (rm *ReplayManager) GetReplay(uuid uuid.UUID) (models.ReplaySpec, error) { return args.Get(0).(models.ReplaySpec), args.Error(1) } +func (rm *ReplayManager) GetReplayList(projectUUID uuid.UUID) ([]models.ReplaySpec, error) { + args := rm.Called(projectUUID) + return args.Get(0).([]models.ReplaySpec), args.Error(1) +} + func (rm *ReplayManager) GetRunStatus(ctx context.Context, projectSpec models.ProjectSpec, startDate time.Time, endDate time.Time, jobName string) ([]models.JobStatus, error) { args := rm.Called(ctx, projectSpec, startDate, endDate, jobName) diff --git a/models/job.go b/models/job.go index 6c1ffa2f05..982ebfc235 100644 --- a/models/job.go +++ b/models/job.go @@ -318,8 +318,10 @@ type JobService interface { ReplayDryRun(ReplayRequest) (*tree.TreeNode, error) // Replay replays the jobSpec and its dependencies between start and endDate Replay(context.Context, ReplayRequest) (string, error) - // GetStatus of a replay using its ID - GetStatus(context.Context, ReplayRequest) (ReplayState, error) + // GetReplayStatus of a replay using its ID + GetReplayStatus(context.Context, ReplayRequest) (ReplayState, error) + //GetReplayList of a project + GetReplayList(projectID uuid.UUID) ([]ReplaySpec, error) } // JobCompiler takes template file of a scheduler and after applying diff --git a/store/postgres/replay_repository.go b/store/postgres/replay_repository.go index 5b5568701b..153192d112 100644 --- a/store/postgres/replay_repository.go +++ b/store/postgres/replay_repository.go @@ -265,3 +265,29 @@ func (repo *replayRepository) GetByProjectIDAndStatus(projectID uuid.UUID, statu } return replaySpecs, nil } + +func (repo *replayRepository) GetByProjectID(projectID uuid.UUID) ([]models.ReplaySpec, error) { + var replays []Replay + if err := repo.DB.Preload("Job").Joins("JOIN job ON replay.job_id = job.id"). + Where("job.project_id = ?", projectID).Order("created_at DESC").Find(&replays).Error; err != nil { + return []models.ReplaySpec{}, err + } + + if len(replays) == 0 { + return []models.ReplaySpec{}, store.ErrResourceNotFound + } + + var replaySpecs []models.ReplaySpec + for _, r := range replays { + jobSpec, err := repo.adapter.ToSpec(r.Job) + if err != nil { + return []models.ReplaySpec{}, err + } + replaySpec, err := r.ToSpec(jobSpec) + if err != nil { + return []models.ReplaySpec{}, err + } + replaySpecs = append(replaySpecs, replaySpec) + } + return replaySpecs, nil +} diff --git a/store/postgres/replay_repository_test.go b/store/postgres/replay_repository_test.go index fd324cb75f..211061d1ab 100644 --- a/store/postgres/replay_repository_test.go +++ b/store/postgres/replay_repository_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" + "github.com/odpf/optimus/store" + "github.com/odpf/optimus/core/tree" "github.com/google/uuid" @@ -402,4 +404,114 @@ func TestReplayRepository(t *testing.T) { assert.ElementsMatch(t, []uuid.UUID{testModels[0].ID, testModels[2].ID}, []uuid.UUID{replays[0].ID, replays[1].ID}) }) }) + t.Run("GetByProjectID", func(t *testing.T) { + t.Run("should return list of replay specs given project_id", func(t *testing.T) { + db := DBSetup() + defer db.Close() + var testModels []*models.ReplaySpec + testModels = append(testModels, testConfigs...) + expectedUUIDs := []uuid.UUID{testModels[0].ID, testModels[1].ID, testModels[2].ID} + + execUnit1 := new(mock.BasePlugin) + defer execUnit1.AssertExpectations(t) + execUnit1.On("PluginInfo").Return(&models.PluginInfoResponse{ + Name: gTask, + }, nil) + depMod1 := new(mock.DependencyResolverMod) + defer depMod1.AssertExpectations(t) + for idx, jobConfig := range jobConfigs { + jobConfig.Task = models.JobSpecTask{Unit: &models.Plugin{Base: execUnit1, DependencyMod: depMod1}} + testConfigs[idx].Job = jobConfig + } + + pluginRepo := new(mock.SupportedPluginRepo) + defer pluginRepo.AssertExpectations(t) + pluginRepo.On("GetByName", gTask).Return(&models.Plugin{Base: execUnit1, DependencyMod: depMod1}, nil) + adapter := NewAdapter(pluginRepo) + + unitData := models.GenerateDestinationRequest{ + Config: models.PluginConfigs{}.FromJobSpec(jobConfigs[0].Task.Config), + Assets: models.PluginAssets{}.FromJobSpec(jobConfigs[0].Assets), + } + depMod1.On("GenerateDestination", context.TODO(), unitData).Return(&models.GenerateDestinationResponse{Destination: "p.d.t"}, nil) + + projectJobSpecRepo := NewProjectJobSpecRepository(db, projectSpec, adapter) + jobRepo := NewJobSpecRepository(db, namespaceSpec, projectJobSpecRepo, adapter) + projectRepo := NewProjectRepository(db, hash) + + err := projectRepo.Insert(projectSpec) + assert.Nil(t, err) + err = jobRepo.Insert(testModels[0].Job) + assert.Nil(t, err) + err = jobRepo.Insert(testModels[1].Job) + assert.Nil(t, err) + err = jobRepo.Insert(testModels[2].Job) + assert.Nil(t, err) + + repo := NewReplayRepository(db, adapter) + err = repo.Insert(testModels[0]) + assert.Nil(t, err) + err = repo.Insert(testModels[1]) + assert.Nil(t, err) + err = repo.Insert(testModels[2]) + assert.Nil(t, err) + + replays, err := repo.GetByProjectID(projectSpec.ID) + assert.Nil(t, err) + assert.ElementsMatch(t, expectedUUIDs, []uuid.UUID{replays[0].ID, replays[1].ID, replays[2].ID}) + }) + t.Run("should return not found if no recent replay is found", func(t *testing.T) { + db := DBSetup() + defer db.Close() + var testModels []*models.ReplaySpec + testModels = append(testModels, testConfigs...) + + execUnit1 := new(mock.BasePlugin) + defer execUnit1.AssertExpectations(t) + execUnit1.On("PluginInfo").Return(&models.PluginInfoResponse{ + Name: gTask, + }, nil) + depMod1 := new(mock.DependencyResolverMod) + defer depMod1.AssertExpectations(t) + for idx, jobConfig := range jobConfigs { + jobConfig.Task = models.JobSpecTask{Unit: &models.Plugin{Base: execUnit1, DependencyMod: depMod1}} + testConfigs[idx].Job = jobConfig + } + + pluginRepo := new(mock.SupportedPluginRepo) + defer pluginRepo.AssertExpectations(t) + adapter := NewAdapter(pluginRepo) + + unitData := models.GenerateDestinationRequest{ + Config: models.PluginConfigs{}.FromJobSpec(jobConfigs[0].Task.Config), + Assets: models.PluginAssets{}.FromJobSpec(jobConfigs[0].Assets), + } + depMod1.On("GenerateDestination", context.TODO(), unitData).Return(&models.GenerateDestinationResponse{Destination: "p.d.t"}, nil) + + projectJobSpecRepo := NewProjectJobSpecRepository(db, projectSpec, adapter) + jobRepo := NewJobSpecRepository(db, namespaceSpec, projectJobSpecRepo, adapter) + projectRepo := NewProjectRepository(db, hash) + + err := projectRepo.Insert(projectSpec) + assert.Nil(t, err) + err = jobRepo.Insert(testModels[0].Job) + assert.Nil(t, err) + err = jobRepo.Insert(testModels[1].Job) + assert.Nil(t, err) + err = jobRepo.Insert(testModels[2].Job) + assert.Nil(t, err) + + repo := NewReplayRepository(db, adapter) + err = repo.Insert(testModels[0]) + assert.Nil(t, err) + err = repo.Insert(testModels[1]) + assert.Nil(t, err) + err = repo.Insert(testModels[2]) + assert.Nil(t, err) + + replays, err := repo.GetByProjectID(uuid.Must(uuid.NewRandom())) + assert.Equal(t, store.ErrResourceNotFound, err) + assert.Equal(t, []models.ReplaySpec{}, replays) + }) + }) } diff --git a/store/store.go b/store/store.go index 01b5529f5c..cb10bdb22b 100644 --- a/store/store.go +++ b/store/store.go @@ -92,4 +92,5 @@ type ReplaySpecRepository interface { GetByStatus(status []string) ([]models.ReplaySpec, error) GetByJobIDAndStatus(jobID uuid.UUID, status []string) ([]models.ReplaySpec, error) GetByProjectIDAndStatus(projectID uuid.UUID, status []string) ([]models.ReplaySpec, error) + GetByProjectID(projectID uuid.UUID) ([]models.ReplaySpec, error) }