diff --git a/agent/agent_test.go b/agent/agent_test.go index fdbfeb83b..9fe62ee52 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/MakeNowJust/heredoc" "github.com/odpf/meteor/agent" "github.com/odpf/meteor/models" "github.com/odpf/meteor/plugins" @@ -15,8 +16,12 @@ import ( "github.com/odpf/meteor/test/utils" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/timestamppb" v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2" + _ "github.com/odpf/meteor/plugins/extractors" // populate extractors registry + _ "github.com/odpf/meteor/plugins/processors" // populate processors registry ) var ( @@ -764,6 +769,99 @@ func TestAgentRun(t *testing.T) { assert.Equal(t, validRecipe, run.Recipe) }) + t.Run("should work correctly with multiple processors", func(t *testing.T) { + var records []models.Record + sink := mocks.NewSink() + sink.On("Init", mockCtx, mock.AnythingOfType("plugins.Config")). + Return(nil) + sink.On("Sink", mockCtx, mock.Anything). + Run(func(args mock.Arguments) { + records = append(records, args.Get(1).([]models.Record)...) + }). + Return(nil) + sink.On("Close").Return(nil) + sf := registry.NewSinkFactory() + if err := sf.Register("test-sink", newSink(sink)); err != nil { + t.Fatal(err) + } + + r := agent.NewAgent(agent.Config{ + ExtractorFactory: registry.Extractors, + ProcessorFactory: registry.Processors, + SinkFactory: sf, + Logger: utils.Logger, + }) + run := r.Run(ctx, recipe.Recipe{ + Name: "sample", + Version: "v1beta1", + Source: recipe.PluginRecipe{ + Name: "application_yaml", + Scope: "application-test", + Config: map[string]interface{}{ + "file": "../plugins/extractors/application_yaml/testdata/application.detailed.yaml", + }, + }, + Processors: []recipe.PluginRecipe{ + { + Name: "script", + Config: map[string]interface{}{ + "engine": "tengo", + "script": heredoc.Doc(` + text := import("text") + + for u in asset.lineage.upstreams { + u.urn = text.contains(u.urn, "kafka") ? text.replace(u.urn, ".yonkou.io", "", -1) : u.urn + } + for u in asset.lineage.downstreams { + u.urn = text.contains(u.urn, "kafka") ? text.replace(u.urn, ".company.com", "", -1) : u.urn + } + `), + }, + }, + { + Name: "labels", + Config: map[string]interface{}{ + "labels": map[string]string{"field_a": "1"}, + }, + }, + }, + Sinks: []recipe.PluginRecipe{ + {Name: "test-sink", Config: map[string]interface{}{}}, + }, + }) + assert.NoError(t, run.Error) + expected := &v1beta2.Asset{ + Urn: "urn:application_yaml:application-test:application:test", + Name: "test", + Service: "application_yaml", + Type: "application", + Url: "http://company.com/myteam/test", + Description: "My incredible project", + Data: utils.BuildAny(t, &v1beta2.Application{ + Id: "test-id", + Version: "c23sdf6", + CreateTime: ts(t, "2006-01-02T15:04:05Z"), + UpdateTime: ts(t, "2006-01-02T15:04:05Z"), + }), + Owners: []*v1beta2.Owner{{ + Urn: "123", + Name: "myteam", + Email: "myteam@company.com", + }}, + Lineage: &v1beta2.Lineage{ + Upstreams: []*v1beta2.Resource{ + {Urn: "urn:bigquery:bq-raw-internal:table:bq-raw-internal:dagstream.production_feast09_s2id13_30min_demand"}, + {Urn: "urn:kafka:int-dagstream-kafka:topic:staging_feast09_s2id13_30min_demand"}, + }, + Downstreams: []*v1beta2.Resource{ + {Urn: "urn:kafka:1-my-kafka,2-my-kafka:topic:staging_feast09_mixed_granularity_demand_forecast_3es"}, + }, + }, + Labels: map[string]string{"field_a": "1", "x": "y"}, + } + utils.AssertEqualProto(t, expected, records[0].Data()) + }) + } func TestAgentRunMultiple(t *testing.T) { @@ -975,3 +1073,9 @@ func enrichInvalidConfigError(err error, pluginName string, pluginType plugins.P func buildPluginConfig(pr recipe.PluginRecipe) plugins.Config { return plugins.Config{RawConfig: pr.Config, URNScope: pr.Scope} } + +func ts(t *testing.T, s string) *timestamppb.Timestamp { + ts, err := time.Parse(time.RFC3339, s) + require.NoError(t, err) + return timestamppb.New(ts) +} diff --git a/agent/stream.go b/agent/stream.go index a6fc1f4f8..75c9370e0 100644 --- a/agent/stream.go +++ b/agent/stream.go @@ -130,14 +130,15 @@ func (s *stream) Close() { } } -func (s *stream) runMiddlewares(d models.Record) (res models.Record, err error) { - res = d +func (s *stream) runMiddlewares(d models.Record) (models.Record, error) { + res := d for _, middleware := range s.middlewares { - res, err = middleware(d) + var err error + res, err = middleware(res) if err != nil { - return + return models.Record{}, err } } - return + return res, nil }