From b59784bbcf9f57a49e5e4803a3db13d74df1bbf4 Mon Sep 17 00:00:00 2001 From: Suhas Karanth Date: Fri, 11 Nov 2022 15:19:24 +0530 Subject: [PATCH] refactor: add decode hook function for anypb in structmap - Eliminate the need for special handling of asset proto by introducing the decode hook function for anypb type. - Instantiate concrete type for the anypb field using the @type field in given map. This makes it possible into an empty asset proto. - Verify data type and presense of data field in given map while decoding into an asset proto. - Remove unused type structmap.AssetWrapper. Port the tests for it into structmap_test.go. --- .../tengoutil/structmap/asset_wrapper.go | 59 ---- .../tengoutil/structmap/asset_wrapper_test.go | 319 ------------------ .../internal/tengoutil/structmap/structmap.go | 68 +++- .../tengoutil/structmap/structmap_test.go | 295 +++++++++++++++- plugins/processors/script/tengo_script.go | 9 +- 5 files changed, 356 insertions(+), 394 deletions(-) delete mode 100644 plugins/internal/tengoutil/structmap/asset_wrapper.go delete mode 100644 plugins/internal/tengoutil/structmap/asset_wrapper_test.go diff --git a/plugins/internal/tengoutil/structmap/asset_wrapper.go b/plugins/internal/tengoutil/structmap/asset_wrapper.go deleted file mode 100644 index ca7cc273d..000000000 --- a/plugins/internal/tengoutil/structmap/asset_wrapper.go +++ /dev/null @@ -1,59 +0,0 @@ -package structmap - -import ( - "fmt" - - v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2" - "google.golang.org/protobuf/reflect/protoregistry" - "google.golang.org/protobuf/types/known/anypb" -) - -type AssetWrapper struct { - A *v1beta2.Asset -} - -func (w AssetWrapper) AsMap() (map[string]interface{}, error) { - v, err := AsMap(w.A) - if err != nil { - return nil, fmt.Errorf("structmap: asset as map: %w", err) - } - - m, ok := v.(map[string]interface{}) - if !ok { - return nil, fmt.Errorf("structmap: asset as map: unexpected type for asset map: %T", v) - } - - return m, err -} - -func (w *AssetWrapper) OverwriteWith(m map[string]interface{}) error { - dataMap, ok := m["data"].(map[string]interface{}) - if !ok { - return fmt.Errorf("structmap: overwrite asset: unexpected type for asset data: %T", m["data"]) - } - - mt, err := protoregistry.GlobalTypes.FindMessageByName(w.A.Data.MessageName()) - if err != nil { - return fmt.Errorf("structmap: overwrite asset: resolve type by full name %s: %w", w.A.Data.MessageName(), err) - } - - msg := mt.New().Interface() - delete(dataMap, "@type") - if err := AsStruct(m["data"], &msg); err != nil { - return fmt.Errorf("structmap: overwrite asset: decode asset data: %w", err) - } - - delete(m, "data") - if err := AsStruct(m, w.A); err != nil { - return fmt.Errorf("structmap: overwrite asset: decode asset: %w", err) - } - - data, err := anypb.New(msg) - if err != nil { - return fmt.Errorf("structmap: overwrite asset: marshal data as any: %w", err) - } - - w.A.Data = data - - return nil -} diff --git a/plugins/internal/tengoutil/structmap/asset_wrapper_test.go b/plugins/internal/tengoutil/structmap/asset_wrapper_test.go deleted file mode 100644 index 11a9f8363..000000000 --- a/plugins/internal/tengoutil/structmap/asset_wrapper_test.go +++ /dev/null @@ -1,319 +0,0 @@ -//go:build plugins -// +build plugins - -package structmap - -import ( - "testing" - "time" - - v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2" - testutils "github.com/odpf/meteor/test/utils" - "github.com/stretchr/testify/assert" - "google.golang.org/protobuf/types/known/anypb" - "google.golang.org/protobuf/types/known/structpb" - "google.golang.org/protobuf/types/known/timestamppb" -) - -func TestAssetWrapper_AsMap(t *testing.T) { - cases := []struct { - name string - w AssetWrapper - expected map[string]interface{} - }{ - { - name: "AssetWithFeatureTable", - w: AssetWrapper{A: &v1beta2.Asset{ - Urn: "urn:caramlstore:test-caramlstore:feature_table:avg_dispatch_arrival_time_10_mins", - Name: "avg_dispatch_arrival_time_10_mins", - Service: "caramlstore", - Type: "feature_table", - Data: testutils.BuildAny(t, &v1beta2.FeatureTable{ - Namespace: "sauron", - Entities: []*v1beta2.FeatureTable_Entity{ - {Name: "merchant_uuid", Labels: map[string]string{ - "description": "merchant uuid", - "value_type": "STRING", - }}, - }, - Features: []*v1beta2.Feature{ - {Name: "ongoing_placed_and_waiting_acceptance_orders", DataType: "INT64"}, - {Name: "ongoing_orders", DataType: "INT64"}, - {Name: "merchant_avg_dispatch_arrival_time_10m", DataType: "FLOAT"}, - {Name: "ongoing_accepted_orders", DataType: "INT64"}, - }, - CreateTime: timestamppb.New(time.Date(2022, time.September, 19, 22, 42, 0o4, 0, time.UTC)), - UpdateTime: timestamppb.New(time.Date(2022, time.September, 21, 13, 23, 0o2, 0, time.UTC)), - }), - Lineage: &v1beta2.Lineage{ - Upstreams: []*v1beta2.Resource{ - { - Urn: "urn:kafka:int-dagstream-kafka.yonkou.io:topic:GO_FOOD-delay-allocation-merchant-feature-10m-log", - Service: "kafka", - Type: "topic", - }, - }, - }, - }}, - expected: map[string]interface{}{ - "data": map[string]interface{}{ - "@type": "type.googleapis.com/odpf.assets.v1beta2.FeatureTable", - "create_time": "2022-09-19T22:42:04Z", - "entities": []interface{}{ - map[string]interface{}{ - "labels": map[string]interface{}{"description": "merchant uuid", "value_type": "STRING"}, - "name": "merchant_uuid", - }, - }, - "features": []interface{}{ - map[string]interface{}{"data_type": "INT64", "name": "ongoing_placed_and_waiting_acceptance_orders"}, - map[string]interface{}{"data_type": "INT64", "name": "ongoing_orders"}, - map[string]interface{}{"data_type": "FLOAT", "name": "merchant_avg_dispatch_arrival_time_10m"}, - map[string]interface{}{"data_type": "INT64", "name": "ongoing_accepted_orders"}, - }, - "namespace": "sauron", - "update_time": "2022-09-21T13:23:02Z", - }, - "lineage": map[string]interface{}{ - "upstreams": []interface{}{ - map[string]interface{}{ - "service": "kafka", - "type": "topic", - "urn": "urn:kafka:int-dagstream-kafka.yonkou.io:topic:GO_FOOD-delay-allocation-merchant-feature-10m-log", - }, - }, - }, - "name": "avg_dispatch_arrival_time_10_mins", - "service": "caramlstore", - "type": "feature_table", - "urn": "urn:caramlstore:test-caramlstore:feature_table:avg_dispatch_arrival_time_10_mins", - }, - }, - { - name: "AssetWithTable", - w: AssetWrapper{A: &v1beta2.Asset{ - Urn: "urn:cassandra:test-cassandra:table:cassandra_meteor_test.applicant", - Name: "applicant", - Type: "table", - Service: "cassandra", - Data: testutils.BuildAny(t, &v1beta2.Table{ - Columns: []*v1beta2.Column{ - {Name: "applicantid", DataType: "int"}, - {Name: "first_name", DataType: "text"}, - {Name: "last_name", DataType: "text"}, - }, - Attributes: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - "id": structpb.NewStringValue("test-id"), - "name": structpb.NewStringValue("test-name"), - }, - }, - }), - }}, - expected: map[string]interface{}{ - "data": map[string]interface{}{ - "@type": "type.googleapis.com/odpf.assets.v1beta2.Table", - "columns": []interface{}{ - map[string]interface{}{"data_type": "int", "name": "applicantid"}, - map[string]interface{}{"data_type": "text", "name": "first_name"}, - map[string]interface{}{"data_type": "text", "name": "last_name"}, - }, - "attributes": map[string]interface{}{ - "id": "test-id", - "name": "test-name", - }, - }, - "name": "applicant", - "service": "cassandra", - "type": "table", - "urn": "urn:cassandra:test-cassandra:table:cassandra_meteor_test.applicant", - }, - }, - } - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - res, err := tc.w.AsMap() - assert.NoError(t, err) - assert.Equal(t, tc.expected, res) - }) - } -} - -func TestAssetWrapper_OverwriteWith(t *testing.T) { - cases := []struct { - name string - w AssetWrapper - input map[string]interface{} - expected *v1beta2.Asset - expectedErr bool - }{ - { - name: "AssetWithFeatureTable", - w: AssetWrapper{A: &v1beta2.Asset{ - Data: &anypb.Any{TypeUrl: "type.googleapis.com/odpf.assets.v1beta2.FeatureTable"}, - }}, - input: map[string]interface{}{ - "data": map[string]interface{}{ - "@type": "type.googleapis.com/odpf.assets.v1beta2.FeatureTable", - "create_time": "2022-09-19T22:42:04Z", - "entities": []interface{}{ - map[string]interface{}{ - "labels": map[string]interface{}{"description": "merchant uuid", "value_type": "STRING"}, - "name": "merchant_uuid", - }, - }, - "features": []interface{}{ - map[string]interface{}{"data_type": "INT64", "name": "ongoing_placed_and_waiting_acceptance_orders"}, - map[string]interface{}{"data_type": "INT64", "name": "ongoing_orders"}, - map[string]interface{}{"data_type": "FLOAT", "name": "merchant_avg_dispatch_arrival_time_10m"}, - map[string]interface{}{"data_type": "INT64", "name": "ongoing_accepted_orders"}, - }, - "namespace": "sauron", - "update_time": "2022-09-21T13:23:02Z", - }, - "lineage": map[string]interface{}{ - "upstreams": []interface{}{ - map[string]interface{}{ - "service": "kafka", - "type": "topic", - "urn": "urn:kafka:int-dagstream-kafka.yonkou.io:topic:GO_FOOD-delay-allocation-merchant-feature-10m-log", - }, - }, - }, - "create_time": "2022-10-19T22:42:04Z", - "name": "avg_dispatch_arrival_time_10_mins", - "service": "caramlstore", - "type": "feature_table", - "urn": "urn:caramlstore:test-caramlstore:feature_table:avg_dispatch_arrival_time_10_mins", - }, - expected: &v1beta2.Asset{ - Urn: "urn:caramlstore:test-caramlstore:feature_table:avg_dispatch_arrival_time_10_mins", - Name: "avg_dispatch_arrival_time_10_mins", - Service: "caramlstore", - Type: "feature_table", - Data: testutils.BuildAny(t, &v1beta2.FeatureTable{ - Namespace: "sauron", - Entities: []*v1beta2.FeatureTable_Entity{ - { - Name: "merchant_uuid", - Labels: map[string]string{"description": "merchant uuid", "value_type": "STRING"}, - }, - }, - Features: []*v1beta2.Feature{ - {Name: "ongoing_placed_and_waiting_acceptance_orders", DataType: "INT64"}, - {Name: "ongoing_orders", DataType: "INT64"}, - {Name: "merchant_avg_dispatch_arrival_time_10m", DataType: "FLOAT"}, - {Name: "ongoing_accepted_orders", DataType: "INT64"}, - }, - CreateTime: timestamppb.New(time.Date(2022, time.September, 19, 22, 42, 4, 0, time.UTC)), - UpdateTime: timestamppb.New(time.Date(2022, time.September, 21, 13, 23, 2, 0, time.UTC)), - }), - Lineage: &v1beta2.Lineage{ - Upstreams: []*v1beta2.Resource{ - { - Urn: "urn:kafka:int-dagstream-kafka.yonkou.io:topic:GO_FOOD-delay-allocation-merchant-feature-10m-log", - Service: "kafka", - Type: "topic", - }, - }, - }, - CreateTime: timestamppb.New(time.Date(2022, time.October, 19, 22, 42, 4, 0, time.UTC)), - }, - }, - { - name: "AssetWithTable", - w: AssetWrapper{A: &v1beta2.Asset{ - Data: &anypb.Any{TypeUrl: "type.googleapis.com/odpf.assets.v1beta2.Table"}, - }}, - input: map[string]interface{}{ - "data": map[string]interface{}{ - "@type": "type.googleapis.com/odpf.assets.v1beta2.Table", - "columns": []interface{}{ - map[string]interface{}{"data_type": "int", "name": "applicantid"}, - map[string]interface{}{"data_type": "text", "name": "first_name"}, - map[string]interface{}{"data_type": "text", "name": "last_name"}, - }, - "attributes": map[string]interface{}{"id": "test-id", "name": "test-name"}, - }, - "name": "applicant", - "service": "cassandra", - "type": "table", - "urn": "urn:cassandra:test-cassandra:table:cassandra_meteor_test.applicant", - }, - expected: &v1beta2.Asset{ - Urn: "urn:cassandra:test-cassandra:table:cassandra_meteor_test.applicant", - Name: "applicant", - Type: "table", - Service: "cassandra", - Data: testutils.BuildAny(t, &v1beta2.Table{ - Columns: []*v1beta2.Column{ - {Name: "applicantid", DataType: "int"}, - {Name: "first_name", DataType: "text"}, - {Name: "last_name", DataType: "text"}, - }, - Attributes: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - "id": structpb.NewStringValue("test-id"), - "name": structpb.NewStringValue("test-name"), - }, - }, - }), - }, - }, - { - name: "WithoutData", - w: AssetWrapper{A: &v1beta2.Asset{ - Data: &anypb.Any{TypeUrl: "type.googleapis.com/odpf.assets.v1beta2.Table"}, - }}, - input: map[string]interface{}{ - "name": "applicant", - "service": "cassandra", - "type": "table", - "urn": "urn:cassandra:test-cassandra:table:cassandra_meteor_test.applicant", - }, - expected: &v1beta2.Asset{ - Data: &anypb.Any{TypeUrl: "type.googleapis.com/odpf.assets.v1beta2.Table"}, - }, - expectedErr: true, - }, - { - name: "UnknownKeys", - w: AssetWrapper{A: &v1beta2.Asset{ - Data: &anypb.Any{TypeUrl: "type.googleapis.com/odpf.assets.v1beta2.Table"}, - }}, - input: map[string]interface{}{ - "does-not-exist": "value", - "urn": "urn:cassandra:test-cassandra:table:cassandra_meteor_test.applicant", - "type": "table", - "data": map[string]interface{}{}, - }, - expected: &v1beta2.Asset{ - Urn: "urn:cassandra:test-cassandra:table:cassandra_meteor_test.applicant", - Type: "table", - Data: &anypb.Any{TypeUrl: "type.googleapis.com/odpf.assets.v1beta2.Table"}, - }, - expectedErr: true, - }, - { - name: "UnknownMessageName", - w: AssetWrapper{A: &v1beta2.Asset{ - Data: &anypb.Any{TypeUrl: "type.googleapis.com/odpf.assets.v1beta2.DoesNotExist"}, - }}, - input: map[string]interface{}{ - "data": map[string]interface{}{}, - }, - expected: &v1beta2.Asset{ - Data: &anypb.Any{TypeUrl: "type.googleapis.com/odpf.assets.v1beta2.DoesNotExist"}, - }, - expectedErr: true, - }, - } - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - err := tc.w.OverwriteWith(tc.input) - assert.Equalf(t, tc.expectedErr, (err != nil), - "AssetWrapper.OverwriteWith() err = %v,\nexpectedErr %v", err, tc.expectedErr) - testutils.AssertEqualProto(t, tc.expected, tc.w.A) - }) - } -} diff --git a/plugins/internal/tengoutil/structmap/structmap.go b/plugins/internal/tengoutil/structmap/structmap.go index ff48b4237..60a813bd8 100644 --- a/plugins/internal/tengoutil/structmap/structmap.go +++ b/plugins/internal/tengoutil/structmap/structmap.go @@ -7,8 +7,11 @@ import ( "time" "github.com/mitchellh/mapstructure" + v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/reflect/protoregistry" + "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/structpb" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -41,10 +44,12 @@ func AsMap(v interface{}) (interface{}, error) { func AsStruct(input, output interface{}) error { dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ DecodeHook: mapstructure.ComposeDecodeHookFunc( + checkAssetDataHookFunc(), stringToTimestampHookFunc(time.RFC3339), timeToTimestampHookFunc(), mapstructure.StringToTimeHookFunc(time.RFC3339), mapToAttributesHookFunc(), + mapToAnyPBHookFunc(), ), WeaklyTypedInput: true, ErrorUnused: true, @@ -57,12 +62,31 @@ func AsStruct(input, output interface{}) error { } if err := dec.Decode(input); err != nil { - return fmt.Errorf("structmap: decode into %T: %w", output, err) + return fmt.Errorf("structmap: decode as struct: %w", err) } return nil } +func checkAssetDataHookFunc() mapstructure.DecodeHookFuncType { + return func(_ reflect.Type, t reflect.Type, data interface{}) (interface{}, error) { + if t != reflect.TypeOf(v1beta2.Asset{}) && t != reflect.TypeOf(&v1beta2.Asset{}) { + return data, nil + } + + m, ok := data.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("mapstructure check asset: unexpected type: %T", data) + } + + if _, ok := m["data"].(map[string]interface{}); !ok { + return nil, fmt.Errorf("mapstructure check asset data: unexpected type: %T", m["data"]) + } + + return data, nil + } +} + // stringToTimestampHookFunc returns a DecodeHookFunc that converts // strings to timestamppb.Timestamp. func stringToTimestampHookFunc(layout string) mapstructure.DecodeHookFuncType { @@ -78,7 +102,7 @@ func stringToTimestampHookFunc(layout string) mapstructure.DecodeHookFuncType { // Convert it by parsing ts, err := time.Parse(layout, s) if err != nil { - return nil, fmt.Errorf("structmap: mapstructure string to timestamp hook: %w", err) + return nil, fmt.Errorf("mapstructure string to timestamp hook: %w", err) } return timestamppb.New(ts), nil @@ -113,3 +137,43 @@ func mapToAttributesHookFunc() mapstructure.DecodeHookFuncType { return structpb.NewStruct(m) } } + +func mapToAnyPBHookFunc() mapstructure.DecodeHookFuncType { + failure := func(step string, err error) (interface{}, error) { + return nil, fmt.Errorf("mapstructure map to anypb hook: %s: %w", step, err) + } + + return func(_ reflect.Type, t reflect.Type, data interface{}) (interface{}, error) { + m, ok := data.(map[string]interface{}) + if !ok { + return data, nil + } + + if t != reflect.TypeOf(anypb.Any{}) && t != reflect.TypeOf(&anypb.Any{}) { + return data, nil + } + + typ, ok := m["@type"].(string) + if !ok { + return data, nil + } + + msgtyp, err := protoregistry.GlobalTypes.FindMessageByURL(typ) + if err != nil { + return failure("resolve type", err) + } + + msg := msgtyp.New().Interface() + delete(m, "@type") + if err := AsStruct(m, &msg); err != nil { + return failure("decode", err) + } + + dataAny, err := anypb.New(msg) + if err != nil { + return failure("marshal as any", err) + } + + return dataAny, nil + } +} diff --git a/plugins/internal/tengoutil/structmap/structmap_test.go b/plugins/internal/tengoutil/structmap/structmap_test.go index 8914d7b0a..81df0d082 100644 --- a/plugins/internal/tengoutil/structmap/structmap_test.go +++ b/plugins/internal/tengoutil/structmap/structmap_test.go @@ -8,8 +8,10 @@ import ( "time" v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2" + testutils "github.com/odpf/meteor/test/utils" "github.com/odpf/meteor/utils" "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/structpb" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -37,7 +39,7 @@ func TestAsMap(t *testing.T) { expected: []interface{}{"s1", "s2"}, }, { - name: "WithProtoMessage", + name: "ProtoMessage", input: &v1beta2.Job{ Attributes: utils.TryParseMapToProto(map[string]interface{}{ "id": "test-id", @@ -55,6 +57,114 @@ func TestAsMap(t *testing.T) { "create_time": "2022-09-19T22:42:04Z", }, }, + { + name: "AssetWithFeatureTable", + input: &v1beta2.Asset{ + Urn: "urn:caramlstore:test-caramlstore:feature_table:avg_dispatch_arrival_time_10_mins", + Name: "avg_dispatch_arrival_time_10_mins", + Service: "caramlstore", + Type: "feature_table", + Data: testutils.BuildAny(t, &v1beta2.FeatureTable{ + Namespace: "sauron", + Entities: []*v1beta2.FeatureTable_Entity{ + {Name: "merchant_uuid", Labels: map[string]string{ + "description": "merchant uuid", + "value_type": "STRING", + }}, + }, + Features: []*v1beta2.Feature{ + {Name: "ongoing_placed_and_waiting_acceptance_orders", DataType: "INT64"}, + {Name: "ongoing_orders", DataType: "INT64"}, + {Name: "merchant_avg_dispatch_arrival_time_10m", DataType: "FLOAT"}, + {Name: "ongoing_accepted_orders", DataType: "INT64"}, + }, + CreateTime: timestamppb.New(time.Date(2022, time.September, 19, 22, 42, 0o4, 0, time.UTC)), + UpdateTime: timestamppb.New(time.Date(2022, time.September, 21, 13, 23, 0o2, 0, time.UTC)), + }), + Lineage: &v1beta2.Lineage{ + Upstreams: []*v1beta2.Resource{ + { + Urn: "urn:kafka:int-dagstream-kafka.yonkou.io:topic:GO_FOOD-delay-allocation-merchant-feature-10m-log", + Service: "kafka", + Type: "topic", + }, + }, + }, + }, + expected: map[string]interface{}{ + "data": map[string]interface{}{ + "@type": "type.googleapis.com/odpf.assets.v1beta2.FeatureTable", + "create_time": "2022-09-19T22:42:04Z", + "entities": []interface{}{ + map[string]interface{}{ + "labels": map[string]interface{}{"description": "merchant uuid", "value_type": "STRING"}, + "name": "merchant_uuid", + }, + }, + "features": []interface{}{ + map[string]interface{}{"data_type": "INT64", "name": "ongoing_placed_and_waiting_acceptance_orders"}, + map[string]interface{}{"data_type": "INT64", "name": "ongoing_orders"}, + map[string]interface{}{"data_type": "FLOAT", "name": "merchant_avg_dispatch_arrival_time_10m"}, + map[string]interface{}{"data_type": "INT64", "name": "ongoing_accepted_orders"}, + }, + "namespace": "sauron", + "update_time": "2022-09-21T13:23:02Z", + }, + "lineage": map[string]interface{}{ + "upstreams": []interface{}{ + map[string]interface{}{ + "service": "kafka", + "type": "topic", + "urn": "urn:kafka:int-dagstream-kafka.yonkou.io:topic:GO_FOOD-delay-allocation-merchant-feature-10m-log", + }, + }, + }, + "name": "avg_dispatch_arrival_time_10_mins", + "service": "caramlstore", + "type": "feature_table", + "urn": "urn:caramlstore:test-caramlstore:feature_table:avg_dispatch_arrival_time_10_mins", + }, + }, + { + name: "AssetWithTable", + input: &v1beta2.Asset{ + Urn: "urn:cassandra:test-cassandra:table:cassandra_meteor_test.applicant", + Name: "applicant", + Type: "table", + Service: "cassandra", + Data: testutils.BuildAny(t, &v1beta2.Table{ + Columns: []*v1beta2.Column{ + {Name: "applicantid", DataType: "int"}, + {Name: "first_name", DataType: "text"}, + {Name: "last_name", DataType: "text"}, + }, + Attributes: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "id": structpb.NewStringValue("test-id"), + "name": structpb.NewStringValue("test-name"), + }, + }, + }), + }, + expected: map[string]interface{}{ + "data": map[string]interface{}{ + "@type": "type.googleapis.com/odpf.assets.v1beta2.Table", + "columns": []interface{}{ + map[string]interface{}{"data_type": "int", "name": "applicantid"}, + map[string]interface{}{"data_type": "text", "name": "first_name"}, + map[string]interface{}{"data_type": "text", "name": "last_name"}, + }, + "attributes": map[string]interface{}{ + "id": "test-id", + "name": "test-name", + }, + }, + "name": "applicant", + "service": "cassandra", + "type": "table", + "urn": "urn:cassandra:test-cassandra:table:cassandra_meteor_test.applicant", + }, + }, { name: "MarshalFailure", input: make(chan int), @@ -97,6 +207,30 @@ func TestAsStruct(t *testing.T) { output: []string{}, expected: []string{"s1", "s2"}, }, + { + name: "MismatchedType", + input: []interface{}{"s1"}, + output: map[string]interface{}{}, + expected: map[string]interface{}{}, + expectedErr: true, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + err := AsStruct(tc.input, &tc.output) + assert.Equalf(t, tc.expectedErr, (err != nil), + "AsStruct() err = %v,\nexpectedErr %v", err, tc.expectedErr) + assert.Equal(t, tc.expected, tc.output) + }) + } + + protoCases := []struct { + name string + input interface{} + output proto.Message + expected proto.Message + expectedErr string + }{ { name: "WithProtoMessage", input: map[string]interface{}{ @@ -120,19 +254,160 @@ func TestAsStruct(t *testing.T) { }, }, { - name: "MismatchedType", - input: []interface{}{"s1"}, - output: map[string]interface{}{}, - expected: map[string]interface{}{}, - expectedErr: true, + name: "AssetWithFeatureTable", + output: &v1beta2.Asset{}, + input: map[string]interface{}{ + "data": map[string]interface{}{ + "@type": "type.googleapis.com/odpf.assets.v1beta2.FeatureTable", + "create_time": "2022-09-19T22:42:04Z", + "entities": []interface{}{ + map[string]interface{}{ + "labels": map[string]interface{}{"description": "merchant uuid", "value_type": "STRING"}, + "name": "merchant_uuid", + }, + }, + "features": []interface{}{ + map[string]interface{}{"data_type": "INT64", "name": "ongoing_placed_and_waiting_acceptance_orders"}, + map[string]interface{}{"data_type": "INT64", "name": "ongoing_orders"}, + map[string]interface{}{"data_type": "FLOAT", "name": "merchant_avg_dispatch_arrival_time_10m"}, + map[string]interface{}{"data_type": "INT64", "name": "ongoing_accepted_orders"}, + }, + "namespace": "sauron", + "update_time": "2022-09-21T13:23:02Z", + }, + "lineage": map[string]interface{}{ + "upstreams": []interface{}{ + map[string]interface{}{ + "service": "kafka", + "type": "topic", + "urn": "urn:kafka:int-dagstream-kafka.yonkou.io:topic:GO_FOOD-delay-allocation-merchant-feature-10m-log", + }, + }, + }, + "create_time": "2022-10-19T22:42:04Z", + "name": "avg_dispatch_arrival_time_10_mins", + "service": "caramlstore", + "type": "feature_table", + "urn": "urn:caramlstore:test-caramlstore:feature_table:avg_dispatch_arrival_time_10_mins", + }, + expected: &v1beta2.Asset{ + Urn: "urn:caramlstore:test-caramlstore:feature_table:avg_dispatch_arrival_time_10_mins", + Name: "avg_dispatch_arrival_time_10_mins", + Service: "caramlstore", + Type: "feature_table", + Data: testutils.BuildAny(t, &v1beta2.FeatureTable{ + Namespace: "sauron", + Entities: []*v1beta2.FeatureTable_Entity{ + { + Name: "merchant_uuid", + Labels: map[string]string{"description": "merchant uuid", "value_type": "STRING"}, + }, + }, + Features: []*v1beta2.Feature{ + {Name: "ongoing_placed_and_waiting_acceptance_orders", DataType: "INT64"}, + {Name: "ongoing_orders", DataType: "INT64"}, + {Name: "merchant_avg_dispatch_arrival_time_10m", DataType: "FLOAT"}, + {Name: "ongoing_accepted_orders", DataType: "INT64"}, + }, + CreateTime: timestamppb.New(time.Date(2022, time.September, 19, 22, 42, 4, 0, time.UTC)), + UpdateTime: timestamppb.New(time.Date(2022, time.September, 21, 13, 23, 2, 0, time.UTC)), + }), + Lineage: &v1beta2.Lineage{ + Upstreams: []*v1beta2.Resource{ + { + Urn: "urn:kafka:int-dagstream-kafka.yonkou.io:topic:GO_FOOD-delay-allocation-merchant-feature-10m-log", + Service: "kafka", + Type: "topic", + }, + }, + }, + CreateTime: timestamppb.New(time.Date(2022, time.October, 19, 22, 42, 4, 0, time.UTC)), + }, + }, + { + name: "AssetWithTable", + output: &v1beta2.Asset{}, + input: map[string]interface{}{ + "data": map[string]interface{}{ + "@type": "type.googleapis.com/odpf.assets.v1beta2.Table", + "columns": []interface{}{ + map[string]interface{}{"data_type": "int", "name": "applicantid"}, + map[string]interface{}{"data_type": "text", "name": "first_name"}, + map[string]interface{}{"data_type": "text", "name": "last_name"}, + }, + "attributes": map[string]interface{}{"id": "test-id", "name": "test-name"}, + }, + "name": "applicant", + "service": "cassandra", + "type": "table", + "urn": "urn:cassandra:test-cassandra:table:cassandra_meteor_test.applicant", + }, + expected: &v1beta2.Asset{ + Urn: "urn:cassandra:test-cassandra:table:cassandra_meteor_test.applicant", + Name: "applicant", + Type: "table", + Service: "cassandra", + Data: testutils.BuildAny(t, &v1beta2.Table{ + Columns: []*v1beta2.Column{ + {Name: "applicantid", DataType: "int"}, + {Name: "first_name", DataType: "text"}, + {Name: "last_name", DataType: "text"}, + }, + Attributes: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "id": structpb.NewStringValue("test-id"), + "name": structpb.NewStringValue("test-name"), + }, + }, + }), + }, + }, + { + name: "WithoutData", + output: &v1beta2.Asset{}, + input: map[string]interface{}{ + "name": "applicant", + "service": "cassandra", + "type": "table", + "urn": "urn:cassandra:test-cassandra:table:cassandra_meteor_test.applicant", + }, + expected: &v1beta2.Asset{}, + expectedErr: "mapstructure check asset data: unexpected type: ", + }, + { + name: "UnknownKeys", + output: &v1beta2.Asset{}, + input: map[string]interface{}{ + "does-not-exist": "value", + "urn": "urn:cassandra:test-cassandra:table:cassandra_meteor_test.applicant", + "type": "table", + "data": map[string]interface{}{}, + }, + expected: &v1beta2.Asset{}, + expectedErr: "invalid keys: does-not-exist", + }, + { + name: "UnknownMessageName", + output: &v1beta2.Asset{}, + input: map[string]interface{}{ + "data": map[string]interface{}{ + "@type": "type.googleapis.com/odpf.assets.v1beta2.DoesNotExist", + }, + }, + expected: &v1beta2.Asset{}, + expectedErr: "mapstructure map to anypb hook: resolve type: proto", }, } - for _, tc := range cases { + for _, tc := range protoCases { t.Run(tc.name, func(t *testing.T) { err := AsStruct(tc.input, &tc.output) - assert.Equalf(t, tc.expectedErr, (err != nil), - "AsStruct() err = %v,\nexpectedErr %v", err, tc.expectedErr) - assert.Equal(t, tc.expected, tc.output) + if tc.expectedErr == "" { + assert.NoError(t, err) + } else { + assert.ErrorContains(t, err, tc.expectedErr) + } + + testutils.AssertEqualProto(t, tc.expected, tc.output) }) } } diff --git a/plugins/processors/script/tengo_script.go b/plugins/processors/script/tengo_script.go index 70ec50bd0..0e3563f91 100644 --- a/plugins/processors/script/tengo_script.go +++ b/plugins/processors/script/tengo_script.go @@ -8,6 +8,7 @@ import ( "github.com/MakeNowJust/heredoc" "github.com/d5/tengo/v2" "github.com/odpf/meteor/models" + v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/internal/tengoutil" "github.com/odpf/meteor/plugins/internal/tengoutil/structmap" @@ -86,8 +87,7 @@ func (p *Processor) Init(ctx context.Context, config plugins.Config) error { // Process processes the data func (p *Processor) Process(ctx context.Context, src models.Record) (models.Record, error) { - astWrapper := structmap.AssetWrapper{A: src.Data()} - m, err := astWrapper.AsMap() + m, err := structmap.AsMap(src.Data()) if err != nil { return models.Record{}, fmt.Errorf("script processor: %w", err) } @@ -101,11 +101,12 @@ func (p *Processor) Process(ctx context.Context, src models.Record) (models.Reco return models.Record{}, fmt.Errorf("script processor: run script: %w", err) } - if err := astWrapper.OverwriteWith(c.Get("asset").Map()); err != nil { + var transformed *v1beta2.Asset + if err := structmap.AsStruct(c.Get("asset").Map(), &transformed); err != nil { return models.Record{}, fmt.Errorf("script processor: overwrite asset: %w", err) } - return models.NewRecord(astWrapper.A), nil + return models.NewRecord(transformed), nil } func (p *Processor) declareGlobals(s *tengo.Script) error {