Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
3630b8e
refactor: update config and resource deployment mechanism
irainia Mar 2, 2022
2d03c54
refactor: restructure deployment of project, namespaces, job, and res…
irainia Mar 8, 2022
3bebfd3
refactor: fix bug when sending request and response to the server dur…
irainia Mar 8, 2022
76baef9
refactor: config to load multiple namespaces
Mar 8, 2022
b8169bf
test: load config for optimus
Mar 8, 2022
caffe70
test: load config for namespaces
Mar 8, 2022
eb2e8b6
fix: load duplicated namespaces
Mar 9, 2022
8422305
fix: error when testing job deployment
irainia Mar 9, 2022
4359922
fix: requirement to set the namespace on secret
irainia Mar 10, 2022
50da107
fix: generated mock by mockery
deryrahman Mar 14, 2022
1eb63c6
fix: bug command cannot be properly initialized
irainia Mar 14, 2022
92cceee
refactor: rework config loader to minimise side-effect
irainia Mar 14, 2022
ee79753
fix: bug when deploying job and resource from client and server side
irainia Mar 14, 2022
1b6a486
fix: linter
deryrahman Mar 15, 2022
5841ad0
refactor: test on test deploy job specification when success
deryrahman Mar 16, 2022
abe7b2b
test: add test when there's grpc recv error
deryrahman Mar 16, 2022
4328d4e
test: add test when namespace get error
deryrahman Mar 16, 2022
539e6ed
test: add test when adapt from proto or job spec save is failed
deryrahman Mar 16, 2022
e4c5c70
test: add test when job service keep only error
deryrahman Mar 16, 2022
e6e29fa
test: add test when job service sync error
deryrahman Mar 16, 2022
c09f288
fix: linter context in struct
deryrahman Mar 16, 2022
8b1d9d6
refactor: job spec deploy test
deryrahman Mar 16, 2022
03a1ebf
refactor: remove unused code
deryrahman Mar 16, 2022
3f3c70a
refactor: new runtime service server suite on runtime_test
deryrahman Mar 16, 2022
745d742
test: add mock for resource specification server
deryrahman Mar 16, 2022
a4e8dd9
test: add test when deploy resource specification success
deryrahman Mar 17, 2022
1a06548
test: add test when deploy resource specification error on grpc receiver
deryrahman Mar 17, 2022
3f810db
test: add test when deploy resource specification error on namespaceS…
deryrahman Mar 17, 2022
0919ed3
test: add test when deploy resource specification error on adapt from…
deryrahman Mar 17, 2022
45084d4
test: add test when deploy resource specification error on update res…
deryrahman Mar 17, 2022
e47a8cf
test: add test when get job specification success and failed
deryrahman Mar 17, 2022
6ab4a39
test: add test when get job specification failed & remove unused test
deryrahman Mar 17, 2022
ec4ff2d
refactor: simplify how to store and load config for multiple namespaces
irainia Mar 21, 2022
a503334
fix: linter issue for namespace pointer
irainia Mar 21, 2022
a6c8038
refactor: add placeholder for config structure
deryrahman Mar 21, 2022
922b9a9
refactor: version command to not accept conf directly
deryrahman Mar 22, 2022
2a143af
feat: implement load project and server config
deryrahman Mar 22, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 44 additions & 34 deletions .optimus.sample.yaml
Original file line number Diff line number Diff line change
@@ -1,46 +1,19 @@
version: 1

########################################
# COMMON CONFIG (APPLIED FOR SERVER & CLIENT)
########################################
# logging configuration
log:
# debug, info, warning, error, fatal - default 'info'
level: info

#
# cli configurations
#

# used to connect optimus service
#host: localhost:9100

# for configuring optimus project
#project:
# name: sample_project
# # project variables usable in specifications
# config:
# environment: integration
# scheduler_host: http://example.io/
# # storage_path is used for storing compiled job specifications that can be
# # consumed by schedulers like Airflow
# # it supports multiple schemes like: file://, gcs://
# storage_path: file://absolute_path_to_a_directory

# for configuring optimus namespace
#namespace:
# name: sample_namespace
# jobs:
# # folder where job specifications are stored
# path: "job"
# datastore:
# # optimus is capable of supporting multiple datastores
# type: bigquery
# # path where resource spec for BQ are stored
# path: "bq"
# # namespace variables usable in specifications
# config: {}

#
# server configurations
#
########################################
# SERVER CONFIG
########################################

# for configuring optimus service
#serve:
Expand Down Expand Up @@ -83,4 +56,41 @@ log:
# profile_addr: ":9110"
#
# # jaeger collector address to send application traces
# jaeger_addr: "http://localhost:14268/api/traces"
# jaeger_addr: "http://localhost:14268/api/traces"




########################################
# PROJECT CONFIG
########################################

# used to connect optimus service
#host: localhost:9100

# for configuring optimus project
#project:
# name: sample_project
# # project variables usable in specifications
# config:
# environment: integration
# scheduler_host: http://example.io/
# # storage_path is used for storing compiled job specifications that can be
# # consumed by schedulers like Airflow
# # it supports multiple schemes like: file://, gcs://
# storage_path: file://absolute_path_to_a_directory

# for configuring optimus namespace
#namespace:
# name: sample_namespace
# job:
# # relative path pointing to folder where job specifications are stored
# path: "job"
# datastore:
# # optimus is capable of supporting multiple datastores
# type: bigquery
# # relative path where resource spec for BQ are stored
# path: "bq"
# # namespace variables usable in specifications
# config: {}

2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ NAME = "github.com/odpf/optimus"
LAST_COMMIT := $(shell git rev-parse --short HEAD)
LAST_TAG := "$(shell git rev-list --tags --max-count=1)"
OPMS_VERSION := "$(shell git describe --tags ${LAST_TAG})-next"
PROTON_COMMIT := "2a30976f7b40884ddd90e1792576c0941426e8bc"
PROTON_COMMIT := "a8eb38041e37c152daba7fbb8eb9721f56e902e6"

.PHONY: build test test-ci generate-proto unit-test-ci smoke-test integration-test vet coverage clean install lint

Expand Down
120 changes: 94 additions & 26 deletions api/handler/v1beta1/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package v1beta1

import (
"context"
"errors"
"fmt"
"io"
"sync"
"time"

Expand All @@ -13,48 +15,114 @@ import (
"google.golang.org/grpc/status"
)

func (sv *RuntimeServiceServer) DeployJobSpecification(req *pb.DeployJobSpecificationRequest, respStream pb.RuntimeService_DeployJobSpecificationServer) error {
func (sv *RuntimeServiceServer) DeployJobSpecification(stream pb.RuntimeService_DeployJobSpecificationServer) error {
startTime := time.Now()
errNamespaces := []string{}

namespaceSpec, err := sv.namespaceService.Get(respStream.Context(), req.GetProjectName(), req.GetNamespaceName())
if err != nil {
return mapToGRPCErr(sv.l, err, "unable to get namespace")
for {
req, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
stream.Send(&pb.DeployJobSpecificationResponse{
Success: false,
Ack: true,
Message: err.Error(),
})
return err // immediate error returned (grpc error level)
}
namespaceSpec, err := sv.namespaceService.Get(stream.Context(), req.GetProjectName(), req.GetNamespaceName())
if err != nil {
stream.Send(&pb.DeployJobSpecificationResponse{
Success: false,
Ack: true,
Message: err.Error(),
})
errNamespaces = append(errNamespaces, req.NamespaceName)
continue
}

jobsToKeep, err := sv.getJobsToKeep(stream.Context(), namespaceSpec, req)
if err != nil {
stream.Send(&pb.DeployJobSpecificationResponse{
Success: false,
Ack: true,
Message: err.Error(),
})
errNamespaces = append(errNamespaces, req.NamespaceName)
continue
}

observers := new(progress.ObserverChain)
observers.Join(sv.progressObserver)
observers.Join(&jobSyncObserver{
stream: stream,
log: sv.l,
mu: new(sync.Mutex),
})

// delete specs not sent for deployment from internal repository
if err := sv.jobSvc.KeepOnly(stream.Context(), namespaceSpec, jobsToKeep, observers); err != nil {
stream.Send(&pb.DeployJobSpecificationResponse{
Success: false,
Ack: true,
Message: fmt.Sprintf("failed to delete jobs: \n%s", err.Error()),
})
errNamespaces = append(errNamespaces, req.NamespaceName)
continue
}
if err := sv.jobSvc.Sync(stream.Context(), namespaceSpec, observers); err != nil {
stream.Send(&pb.DeployJobSpecificationResponse{
Success: false,
Ack: true,
Message: fmt.Sprintf("failed to sync jobs: \n%s", err.Error()),
})
errNamespaces = append(errNamespaces, req.NamespaceName)
continue
}
runtimeDeployJobSpecificationCounter.Add(float64(len(req.Jobs)))
stream.Send(&pb.DeployJobSpecificationResponse{
Success: true,
Ack: true,
Message: "success",
})
}
sv.l.Info("finished job deployment", "time", time.Since(startTime))
if len(errNamespaces) > 0 {
sv.l.Warn("there's error while deploying namespaces: %v", errNamespaces)
return fmt.Errorf("error when deploying: %v", errNamespaces)
}
return nil
}

func (sv *RuntimeServiceServer) getJobsToKeep(ctx context.Context, namespaceSpec models.NamespaceSpec, req *pb.DeployJobSpecificationRequest) ([]models.JobSpec, error) {
jobs := req.GetJobs()
if len(jobs) == 0 {
return []models.JobSpec{}, nil
}

var jobsToKeep []models.JobSpec
for _, reqJob := range req.GetJobs() {
for _, reqJob := range jobs {
adaptJob, err := sv.adapter.FromJobProto(reqJob)
if err != nil {
return status.Errorf(codes.Internal, "%s: cannot adapt job %s", err.Error(), reqJob.GetName())
sv.l.Error(fmt.Sprintf("%s: cannot adapt job %s", err.Error(), reqJob.GetName()))
continue
}

err = sv.jobSvc.Create(respStream.Context(), namespaceSpec, adaptJob)
err = sv.jobSvc.Create(ctx, namespaceSpec, adaptJob)
if err != nil {
return status.Errorf(codes.Internal, "%s: failed to save %s", err.Error(), adaptJob.Name)
sv.l.Error(fmt.Sprintf("%s: failed to save %s", err.Error(), adaptJob.Name))
continue
}
jobsToKeep = append(jobsToKeep, adaptJob)
}

observers := new(progress.ObserverChain)
observers.Join(sv.progressObserver)
observers.Join(&jobSyncObserver{
stream: respStream,
log: sv.l,
mu: new(sync.Mutex),
})

// delete specs not sent for deployment from internal repository
if err := sv.jobSvc.KeepOnly(respStream.Context(), namespaceSpec, jobsToKeep, observers); err != nil {
return status.Errorf(codes.Internal, "failed to delete jobs: \n%s", err.Error())
if jobsToKeep == nil {
return nil, errors.New("job spec creation is failed")
}

if err := sv.jobSvc.Sync(respStream.Context(), namespaceSpec, observers); err != nil {
return status.Errorf(codes.Internal, "failed to sync jobs: \n%s", err.Error())
}

runtimeDeployJobSpecificationCounter.Add(float64(len(req.Jobs)))
sv.l.Info("finished job deployment", "time", time.Since(startTime))
return nil
return jobsToKeep, nil
}

func (sv *RuntimeServiceServer) ListJobSpecification(ctx context.Context, req *pb.ListJobSpecificationRequest) (*pb.ListJobSpecificationResponse, error) {
Expand Down
Loading