Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ brew install odpf/taps/optimus
optimus version
```

### Stability and Compatibility
Optimus is currently undergoing heavy development with frequent, breaking API changes.

> ⭐ Current major version is zero (v0.x.x) to accommodate rapid development and fast iteration while getting early feedback from users (feedback on APIs are appreciated).
> The public API could change without a major version update before v1.0.0 release.

## Credits

This project exists thanks to all the [contributors](https://github.com/odpf/optimus/graphs/contributors).
Expand Down
34 changes: 28 additions & 6 deletions api/handler/v1/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,22 @@ func (adapt *Adapter) FromJobProto(spec *pb.JobSpecification) (models.JobSpec, e
retryDelay := time.Duration(0)
retryCount := 0
retryExponentialBackoff := false
if spec.Behavior != nil && spec.Behavior.Retry != nil {
retryCount = int(spec.Behavior.Retry.Count)
retryExponentialBackoff = spec.Behavior.Retry.ExponentialBackoff
if spec.Behavior.Retry.Delay != nil && spec.Behavior.Retry.Delay.IsValid() {
retryDelay = spec.Behavior.Retry.Delay.AsDuration()
var notifiers []models.JobSpecNotifier
if spec.Behavior != nil {
if spec.Behavior.Retry != nil {
retryCount = int(spec.Behavior.Retry.Count)
retryExponentialBackoff = spec.Behavior.Retry.ExponentialBackoff
if spec.Behavior.Retry.Delay != nil && spec.Behavior.Retry.Delay.IsValid() {
retryDelay = spec.Behavior.Retry.Delay.AsDuration()
}
}

for _, notify := range spec.Behavior.Notify {
notifiers = append(notifiers, models.JobSpecNotifier{
On: models.JobEventType(strings.ToLower(notify.On.String())),
Config: notify.Config,
Channels: notify.Channels,
})
}
}
return models.JobSpec{
Expand All @@ -100,6 +111,7 @@ func (adapt *Adapter) FromJobProto(spec *pb.JobSpecification) (models.JobSpec, e
Delay: retryDelay,
ExponentialBackoff: retryExponentialBackoff,
},
Notify: notifiers,
},
Task: models.JobSpecTask{
Unit: execUnit,
Expand Down Expand Up @@ -150,6 +162,15 @@ func (adapt *Adapter) ToJobProto(spec models.JobSpec) (*pb.JobSpecification, err
return nil, err
}

var notifyProto []*pb.JobSpecification_Behavior_Notifiers
for _, notify := range spec.Behavior.Notify {
notifyProto = append(notifyProto, &pb.JobSpecification_Behavior_Notifiers{
On: pb.JobEvent_Type(pb.JobEvent_Type_value[strings.ToUpper(string(notify.On))]),
Channels: notify.Channels,
Config: notify.Config,
})
}

conf := &pb.JobSpecification{
Version: int32(spec.Version),
Name: spec.Name,
Expand All @@ -173,6 +194,7 @@ func (adapt *Adapter) ToJobProto(spec models.JobSpec) (*pb.JobSpecification, err
Delay: ptypes.DurationProto(spec.Behavior.Retry.Delay),
ExponentialBackoff: spec.Behavior.Retry.ExponentialBackoff,
},
Notify: notifyProto,
},
}
if spec.Schedule.EndDate != nil {
Expand All @@ -185,7 +207,7 @@ func (adapt *Adapter) ToJobProto(spec models.JobSpec) (*pb.JobSpecification, err
})
}

taskConfigs := []*pb.JobConfigItem{}
var taskConfigs []*pb.JobConfigItem
for _, c := range spec.Task.Config {
taskConfigs = append(taskConfigs, &pb.JobConfigItem{
Name: strings.ToUpper(c.Name),
Expand Down
32 changes: 31 additions & 1 deletion api/handler/v1/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ func TestAdapter(t *testing.T) {
allTasksRepo.On("GetByName", "sample-task").Return(execUnit1, nil)
defer allTasksRepo.AssertExpectations(t)

hookUnit1 := new(mock.HookPlugin)
hookUnit1.On("GetHookSchema", context.Background(), models.GetHookSchemaRequest{}).Return(models.GetHookSchemaResponse{
Name: "sample-hook",
}, nil)
defer hookUnit1.AssertExpectations(t)

allHookRepo := new(mock.SupportedHookRepo)
allHookRepo.On("GetByName", "sample-hook").Return(hookUnit1, nil)
defer allHookRepo.AssertExpectations(t)

jobSpec := models.JobSpec{
Name: "test-job",
Schedule: models.JobSpecSchedule{
Expand All @@ -55,6 +65,15 @@ func TestAdapter(t *testing.T) {
Delay: 0,
ExponentialBackoff: true,
},
Notify: []models.JobSpecNotifier{
{
On: models.JobEventTypeFailure,
Config: map[string]string{
"key": "val",
},
Channels: []string{"slack://@devs"},
},
},
},
Task: models.JobSpecTask{
Unit: execUnit1,
Expand All @@ -79,9 +98,20 @@ func TestAdapter(t *testing.T) {
},
),
Dependencies: map[string]models.JobSpecDependency{},
Hooks: []models.JobSpecHook{
{
Config: models.JobSpecConfigs{
{
Name: "PROJECT",
Value: "this",
},
},
Unit: hookUnit1,
},
},
}

adapter := v1.NewAdapter(allTasksRepo, nil, nil)
adapter := v1.NewAdapter(allTasksRepo, allHookRepo, nil)
inProto, err := adapter.ToJobProto(jobSpec)
assert.Nil(t, err)
original, err := adapter.FromJobProto(inProto)
Expand Down
Loading