Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
NAME="github.com/odpf/compass"
VERSION=$(shell git describe --always --tags 2>/dev/null)
COVERFILE="/tmp/compass.coverprofile"
PROTON_COMMIT := "9eaca223dfd5ca0297abdb593fa73e8522d8ef62"
PROTON_COMMIT := "9d1d2af4192b0b31e95d417bbd17752825776a08"
Comment thread
sudo-suhas marked this conversation as resolved.
Outdated
.PHONY: all build test clean install proto

all: build
Expand Down
2 changes: 1 addition & 1 deletion core/asset/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (s *Service) GetAllAssets(ctx context.Context, flt Filter, withTotal bool)
return assets, totalCount, nil
}

func (s *Service) UpsertPatchAsset(ctx context.Context, ast *Asset, upstreams, downstreams []LineageNode) (string, error) {
func (s *Service) UpsertAsset(ctx context.Context, ast *Asset, upstreams, downstreams []LineageNode) (string, error) {
var assetID string
var err error

Expand Down
2 changes: 1 addition & 1 deletion core/asset/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func TestService_UpsertAsset(t *testing.T) {
}

svc := asset.NewService(mockAssetRepo, mockDiscoveryRepo, mockLineageRepo)
rid, err := svc.UpsertPatchAsset(ctx, tc.Asset, tc.Upstreams, tc.Downstreams)
rid, err := svc.UpsertAsset(ctx, tc.Asset, tc.Upstreams, tc.Downstreams)
if err != nil && errors.Is(tc.Err, err) {
t.Fatalf("got error %v, expected error was %v", err, tc.Err)
}
Expand Down
113 changes: 84 additions & 29 deletions internal/server/v1beta1/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type AssetService interface {
GetAssetByID(ctx context.Context, id string) (asset.Asset, error)
GetAssetByVersion(ctx context.Context, id string, version string) (asset.Asset, error)
GetAssetVersionHistory(ctx context.Context, flt asset.Filter, id string) ([]asset.Asset, error)
UpsertPatchAsset(context.Context, *asset.Asset, []asset.LineageNode, []asset.LineageNode) (string, error)
UpsertAsset(context.Context, *asset.Asset, []asset.LineageNode, []asset.LineageNode) (string, error)
DeleteAsset(context.Context, string) error

GetLineage(ctx context.Context, node asset.LineageNode, query asset.LineageQuery) (asset.LineageGraph, error)
Expand Down Expand Up @@ -211,6 +211,58 @@ func (server *APIServer) GetAssetByVersion(ctx context.Context, req *compassv1be
}, nil
}

func (server *APIServer) UpsertAsset(ctx context.Context, req *compassv1beta1.UpsertAssetRequest) (*compassv1beta1.UpsertAssetResponse, error) {
userID, err := server.validateUserInCtx(ctx)
if err != nil {
return nil, err
}

baseAsset := req.GetAsset()
if baseAsset == nil {
return nil, status.Error(codes.InvalidArgument, "asset cannot be empty")
}

ast := server.buildAsset(baseAsset)
ast.UpdatedBy.ID = userID
Comment thread
sudo-suhas marked this conversation as resolved.

if err := server.validateAsset(ast); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}

upstreams := []asset.LineageNode{}
for _, pb := range req.GetUpstreams() {
upstreams = append(upstreams, lineageNodeFromProto(pb))
}
downstreams := []asset.LineageNode{}
for _, pb := range req.GetDownstreams() {
downstreams = append(downstreams, lineageNodeFromProto(pb))
}

assetID, err := server.assetService.UpsertAsset(ctx, &ast, upstreams, downstreams)
if err != nil {
if errors.As(err, new(asset.InvalidError)) {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
if errors.As(err, new(asset.DiscoveryError)) {
server.sendStatsDCounterMetric("discovery_error",
map[string]string{
"method": "upsert",
})
}
return nil, internalServerError(server.logger, err.Error())
}

server.sendStatsDCounterMetric("asset_upsert",
map[string]string{
"type": ast.Type.String(),
"service": ast.Service,
})

return &compassv1beta1.UpsertAssetResponse{
Comment thread
sudo-suhas marked this conversation as resolved.
Id: assetID,
}, nil
}

func (server *APIServer) UpsertPatchAsset(ctx context.Context, req *compassv1beta1.UpsertPatchAssetRequest) (*compassv1beta1.UpsertPatchAssetResponse, error) {
userID, err := server.validateUserInCtx(ctx)
if err != nil {
Expand Down Expand Up @@ -249,21 +301,21 @@ func (server *APIServer) UpsertPatchAsset(ctx context.Context, req *compassv1bet
downstreams = append(downstreams, lineageNodeFromProto(pb))
}

assetID, err := server.assetService.UpsertPatchAsset(ctx, &ast, upstreams, downstreams)
assetID, err := server.assetService.UpsertAsset(ctx, &ast, upstreams, downstreams)
if err != nil {
if errors.As(err, new(asset.InvalidError)) {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
if errors.As(err, new(asset.DiscoveryError)) {
server.sendStatsDCounterMetric("discovery_error",
map[string]string{
"method": "upsert",
"method": "upsert_patch",
})
}
return nil, internalServerError(server.logger, err.Error())
}

server.sendStatsDCounterMetric("asset_upsert",
server.sendStatsDCounterMetric("asset_upsert_patch",
map[string]string{
"type": ast.Type.String(),
"service": ast.Service,
Expand Down Expand Up @@ -299,6 +351,31 @@ func (server *APIServer) DeleteAsset(ctx context.Context, req *compassv1beta1.De
return &compassv1beta1.DeleteAssetResponse{}, nil
}

func (server *APIServer) buildAsset(baseAsset *compassv1beta1.UpsertAssetRequest_BaseAsset) asset.Asset {
ast := asset.Asset{
URN: baseAsset.GetUrn(),
Service: baseAsset.GetService(),
Type: asset.Type(baseAsset.GetType()),
Name: baseAsset.GetName(),
Description: baseAsset.GetDescription(),
Data: baseAsset.GetData().AsMap(),
Labels: baseAsset.GetLabels(),
}

var owners []user.User
for _, owner := range baseAsset.GetOwners() {
owners = append(owners, user.User{
ID: owner.Id,
UUID: owner.Uuid,
Email: owner.Email,
Provider: owner.Provider,
})
}
ast.Owners = owners

return ast
}

func (server *APIServer) validateAsset(ast asset.Asset) error {
if ast.URN == "" {
return fmt.Errorf("urn is required")
Expand Down Expand Up @@ -361,7 +438,7 @@ func decodePatchAssetToMap(pb *compassv1beta1.UpsertPatchAssetRequest_BaseAsset)
m["data"] = pb.GetData().AsMap()
}
if pb.GetLabels() != nil {
m["labels"] = pb.GetLabels().AsMap()
m["labels"] = pb.GetLabels()
}
if len(pb.GetOwners()) > 0 {
ownersMap := []map[string]interface{}{}
Expand Down Expand Up @@ -398,18 +475,6 @@ func assetToProto(a asset.Asset, withChangelog bool) (assetPB *compassv1beta1.As
}
}

var labels *structpb.Struct
if len(a.Labels) > 0 {
labelsMapInterface := make(map[string]interface{}, len(a.Labels))
for k, v := range a.Labels {
labelsMapInterface[k] = v
}
labels, err = structpb.NewStruct(labelsMapInterface)
if err != nil {
return
}
}

owners := []*compassv1beta1.User{}
for _, o := range a.Owners {
owners = append(owners, userToProto(o))
Expand Down Expand Up @@ -441,7 +506,7 @@ func assetToProto(a asset.Asset, withChangelog bool) (assetPB *compassv1beta1.As
Name: a.Name,
Description: a.Description,
Data: data,
Labels: labels,
Labels: a.Labels,
Owners: owners,
Version: a.Version,
UpdatedBy: userToProto(a.UpdatedBy),
Expand Down Expand Up @@ -498,16 +563,6 @@ func assetFromProto(pb *compassv1beta1.Asset) asset.Asset {
assetOwners = append(assetOwners, userFromProto(op))
}

var labels map[string]string
if pb.GetLabels() != nil {
labels = make(map[string]string)
for key, value := range pb.GetLabels().AsMap() {
strKey := fmt.Sprintf("%v", key)
strValue := fmt.Sprintf("%v", value)
labels[strKey] = strValue
}
}

var dataValue map[string]interface{}
if pb.GetData() != nil {
dataValue = pb.GetData().AsMap()
Expand Down Expand Up @@ -546,7 +601,7 @@ func assetFromProto(pb *compassv1beta1.Asset) asset.Asset {
Name: pb.GetName(),
Description: pb.GetDescription(),
Data: dataValue,
Labels: labels,
Labels: pb.GetLabels(),
Owners: assetOwners,
CreatedAt: createdAt,
UpdatedAt: updatedAt,
Expand Down
Loading