Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 29 additions & 124 deletions utils/custom_properties.go
Original file line number Diff line number Diff line change
@@ -1,146 +1,64 @@
package utils

import (
"errors"
"fmt"
"reflect"

v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2"
"github.com/pkg/errors"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/structpb"
)

// GetAttributes returns custom properties of the given asset
func GetAttributes(asset *v1beta2.Asset) map[string]interface{} {
dataAny := asset.GetData()

var table v1beta2.Table
if err := anypb.UnmarshalTo(dataAny, &table, proto.UnmarshalOptions{}); err == nil {
return getMap(&table)
}
var topic v1beta2.Topic
if err := anypb.UnmarshalTo(dataAny, &topic, proto.UnmarshalOptions{}); err == nil {
return getMap(&topic)
}
var dashboard v1beta2.Dashboard
if err := anypb.UnmarshalTo(dataAny, &dashboard, proto.UnmarshalOptions{}); err == nil {
return getMap(&dashboard)
}
var job v1beta2.Job
if err := anypb.UnmarshalTo(dataAny, &job, proto.UnmarshalOptions{}); err == nil {
return getMap(&job)
}
var user v1beta2.User
if err := anypb.UnmarshalTo(dataAny, &user, proto.UnmarshalOptions{}); err == nil {
return getMap(&user)
}
var bucket v1beta2.Bucket
if err := anypb.UnmarshalTo(dataAny, &bucket, proto.UnmarshalOptions{}); err == nil {
return getMap(&bucket)
msg, err := anypb.UnmarshalNew(asset.Data, proto.UnmarshalOptions{})
if err != nil {
return make(map[string]interface{})
}
var group v1beta2.Group
if err := anypb.UnmarshalTo(dataAny, &group, proto.UnmarshalOptions{}); err == nil {
return getMap(&group)

attrMsg, ok := msg.(interface{ GetAttributes() *structpb.Struct })
if !ok {
return make(map[string]interface{})
}

return make(map[string]interface{})
return attrMsg.GetAttributes().AsMap()
}

// SetAttributes sets custom properties of the given asset
func SetAttributes(asset *v1beta2.Asset, customFields map[string]interface{}) (*v1beta2.Asset, error) {
dataAny := asset.GetData()

attr, err := structpb.NewStruct(customFields)
func SetAttributes(asset *v1beta2.Asset, customFields map[string]interface{}) (res *v1beta2.Asset, err error) {
msg, err := anypb.UnmarshalNew(asset.Data, proto.UnmarshalOptions{})
if err != nil {
return asset, fmt.Errorf("error transforming map into structpb: %w", err)
}

var table v1beta2.Table
if err := anypb.UnmarshalTo(dataAny, &table, proto.UnmarshalOptions{}); err == nil {
table.Attributes = attr

data, err := anypb.New(&table)
if err != nil {
return asset, fmt.Errorf("error transforming table into anypb: %w", err)
}
asset.Data = data

return asset, nil
}
var topic v1beta2.Topic
if err := anypb.UnmarshalTo(dataAny, &topic, proto.UnmarshalOptions{}); err == nil {
topic.Attributes = attr

data, err := anypb.New(&topic)
if err != nil {
return asset, fmt.Errorf("error transforming topic into anypb: %w", err)
}
asset.Data = data

return asset, nil
return nil, fmt.Errorf("unmarshal asset data: %w", err)
}
var dashboard v1beta2.Dashboard
if err := anypb.UnmarshalTo(dataAny, &dashboard, proto.UnmarshalOptions{}); err == nil {
dashboard.Attributes = attr

data, err := anypb.New(&dashboard)
if err != nil {
return asset, fmt.Errorf("error transforming table dashboard anypb: %w", err)
}
asset.Data = data

return asset, nil
}
var job v1beta2.Job
if err := anypb.UnmarshalTo(dataAny, &job, proto.UnmarshalOptions{}); err == nil {
job.Attributes = attr

data, err := anypb.New(&job)
if err != nil {
return asset, fmt.Errorf("error job table into anypb: %w", err)
}
asset.Data = data

return asset, nil
newAttrs, err := structpb.NewStruct(customFields)
if err != nil {
return nil, fmt.Errorf("error transforming map into structpb: %w", err)
}
var user v1beta2.User
if err := anypb.UnmarshalTo(dataAny, &user, proto.UnmarshalOptions{}); err == nil {
user.Attributes = attr

data, err := anypb.New(&user)
if err != nil {
return asset, fmt.Errorf("error transforming user into anypb: %w", err)
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("could not find matching model: %v", r)
}
asset.Data = data
}()

return asset, nil
f := reflect.ValueOf(msg).Elem().FieldByName("Attributes")
if !f.IsValid() || !f.CanSet() {
return nil, errors.New("could not find matching model")
}
var bucket v1beta2.Bucket
if err := anypb.UnmarshalTo(dataAny, &bucket, proto.UnmarshalOptions{}); err == nil {
bucket.Attributes = attr

data, err := anypb.New(&bucket)
if err != nil {
return asset, fmt.Errorf("error transforming bucket into anypb: %w", err)
}
asset.Data = data
f.Set(reflect.ValueOf(newAttrs))

return asset, nil
data, err := anypb.New(msg)
if err != nil {
return nil, fmt.Errorf("error transforming msg into anypb: %w", err)
}
var group v1beta2.Group
if err := anypb.UnmarshalTo(dataAny, &group, proto.UnmarshalOptions{}); err == nil {
group.Attributes = attr

data, err := anypb.New(&group)
if err != nil {
return asset, fmt.Errorf("error transforming group into anypb: %w", err)
}
asset.Data = data
asset.Data = data

return asset, nil
}

return asset, errors.New("could not find matching model")
return asset, nil
}

// TryParseMapToProto parses given map to proto struct
Expand All @@ -153,19 +71,6 @@ func TryParseMapToProto(src map[string]interface{}) *structpb.Struct {
return res
}

type hasAttributes interface {
GetAttributes() *structpb.Struct
}

func getMap(model hasAttributes) map[string]interface{} {
attr := model.GetAttributes()
if attr == nil {
return make(map[string]interface{})
}

return attr.AsMap()
}

func parseMapToProto(src map[string]interface{}) (*structpb.Struct, error) {
if src == nil {
return nil, nil
Expand Down
Loading