From e31d56abd0782b01a3f01422086d5ce3286295cb Mon Sep 17 00:00:00 2001 From: Aditya Singh Sisodiya Date: Wed, 29 Sep 2021 12:42:34 +0530 Subject: [PATCH 1/9] feat: add e2e recipe testing --- agent/stream.go | 2 +- e2e/e2e_test.go | 356 ++++++++++++++++++++++++++++++++++++++++++++ e2e/mysql_kafka.yml | 15 ++ 3 files changed, 372 insertions(+), 1 deletion(-) create mode 100644 e2e/e2e_test.go create mode 100644 e2e/mysql_kafka.yml diff --git a/agent/stream.go b/agent/stream.go index dc3b04dd7..ac3f4e9e5 100644 --- a/agent/stream.go +++ b/agent/stream.go @@ -55,7 +55,7 @@ func (s *stream) broadcast() error { batch := newBatch(l.batchSize) // listen to channel and emit data to subscriber callback if batch is full for d := range l.channel { - if err := batch.add(d); err != nil { + if err := batch.add(d); err != nil { s.closeWithError(err) } if batch.isFull() { diff --git a/e2e/e2e_test.go b/e2e/e2e_test.go new file mode 100644 index 000000000..f9758bcf6 --- /dev/null +++ b/e2e/e2e_test.go @@ -0,0 +1,356 @@ +// +build integration + +package e2e_test + +import ( + "context" + "database/sql" + "fmt" + "log" + "net" + "os" + "strconv" + "strings" + "testing" + "time" + + "github.com/odpf/meteor/cmd" + "github.com/odpf/meteor/config" + "github.com/odpf/meteor/models/odpf/assets" + "github.com/odpf/meteor/models/odpf/assets/common" + "github.com/odpf/meteor/models/odpf/assets/facets" + _ "github.com/odpf/meteor/plugins/extractors" + _ "github.com/odpf/meteor/plugins/processors" + _ "github.com/odpf/meteor/plugins/sinks" + "github.com/odpf/meteor/test" + "github.com/ory/dockertest/v3" + "github.com/ory/dockertest/v3/docker" + "github.com/pkg/errors" + "github.com/segmentio/kafka-go" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/proto" +) + +var ( + db *sql.DB + conn *kafka.Conn + broker kafka.Broker +) + +const ( + testDB = "test_db" + user = "test_user" + pass = "pass" + mysqlHost = "localhost:3306" + brokerHost = "localhost:9093" + testTopic = "topic-a" + partition = 0 +) + +func TestMain(m *testing.M) { + // generate purge function + mysqlPurgeContainer, err := mysqlDockerSetup() + if err != nil { + return + } + kafkaPurgeContainer, err := kafkaDockerSetup() + if err != nil { + return + } + + // setup and populate data for testing + if err := setupMySQL(); err != nil { + log.Fatal(err) + } + if err := setupKafka(); err != nil { + log.Fatal(err) + } + + // run tests + code := m.Run() + + // clean tests + if err = conn.Close(); err != nil { + return + } + if err = db.Close(); err != nil { + return + } + + // purge containers + if err := mysqlPurgeContainer(); err != nil { + log.Fatal(err) + } + if err := kafkaPurgeContainer(); err != nil { + log.Fatal(err) + } + + os.Exit(code) +} + +// kafkaDockerSetup sets up a kafka docker container +func kafkaDockerSetup() (purge func() error, err error) { + // kafka setup test + kafkaOpts := dockertest.RunOptions{ + Repository: "moeenz/docker-kafka-kraft", + Tag: "latest", + Env: []string{ + "KRAFT_CONTAINER_HOST_NAME=kafka", + }, + ExposedPorts: []string{"9093"}, + PortBindings: map[docker.Port][]docker.PortBinding{ + "9093": { + {HostIP: "localhost", HostPort: "9093"}, + }, + }, + } + // exponential backoff-retry, because the application in the container might not be ready to accept connections yet + kafkaRetryFn := func(resource *dockertest.Resource) (err error) { + // create client + if conn, err = kafka.Dial("tcp", brokerHost); err != nil { + return errors.Wrap(err, "failed to kafka create client") + } + if broker, err = conn.Controller(); err != nil { + return errors.Wrap(err, "failed to generate broker request") + } + return + } + purgeContainer, err := test.CreateContainer(kafkaOpts, kafkaRetryFn) + if err != nil { + log.Fatal(err) + } + + return purgeContainer, nil +} + +// mysqlDockerSetup sets up a mysql docker container +func mysqlDockerSetup() (purge func() error, err error) { + // mysql setup test + mysqlOpts := dockertest.RunOptions{ + Repository: "mysql", + Tag: "latest", + Env: []string{ + "MYSQL_ROOT_PASSWORD=" + pass, + }, + ExposedPorts: []string{"3306"}, + PortBindings: map[docker.Port][]docker.PortBinding{ + "3306": { + {HostIP: "0.0.0.0", HostPort: "3306"}, + }, + }, + } + // exponential backoff-retry, because the application in the container might not be ready to accept connections yet + mysqlRetryFn := func(resource *dockertest.Resource) (err error) { + db, err = sql.Open("mysql", fmt.Sprintf("root:%s@tcp(%s)/", pass, mysqlHost)) + if err != nil { + return errors.Wrap(err, "failed to create mysql client") + } + return db.Ping() + } + purgeContainer, err := test.CreateContainer(mysqlOpts, mysqlRetryFn) + if err != nil { + log.Fatal(err) + } + + return purgeContainer, nil +} + +// TestRecipe tests the recipe from source to sink completely +func TestRecipe(t *testing.T) { + err := setupKafka() + if err != nil { + t.Fatal(err) + } + + var sinkData []*assets.Table + ctx, cancel := context.WithCancel(context.TODO()) + go func() { + err = listenToTopic(ctx, testTopic, &sinkData) + if err != nil { + t.Error(err) + } + }() + + // run mysql_kafka.yml file + cfg, err := config.Load() + if err != nil { + t.Error(err) + } + command := cmd.New(test.Logger, nil, cfg) + command.SetArgs([]string{"run", "mysql_kafka.yml"}) + if err := command.Execute(); err != nil { + if strings.HasPrefix(err.Error(), "unknown command ") { + if !strings.HasSuffix(err.Error(), "\n") { + t.Fatal(err) + } + t.Fatal(err) + } else { + t.Fatal(err) + } + } + + time.Sleep(2 * time.Second) // this is to wait consumer to finish adding data to sinkData + cancel() // cancel will cancel context, hinting the consumer to end + time.Sleep(100 * time.Millisecond) // this is to give time for the consumer to closing all its connections + + expected := getExpectedTables() + assert.Equal(t, len(getExpectedTables()), len(sinkData)) + for tableNum := 0; tableNum < len(getExpectedTables()); tableNum++ { + assert.Equal(t, expected[tableNum].Resource.Urn, sinkData[tableNum].Resource.Urn) + assert.Equal(t, expected[tableNum].Resource.Name, sinkData[tableNum].Resource.Name) + assert.Equal(t, len(expected[tableNum].Schema.Columns), len(sinkData[tableNum].Schema.Columns)) + } +} + +// listenToTopic listens to a topic and stores the data in sinkData +func listenToTopic(ctx context.Context, topic string, data *[]*assets.Table) error { + reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{brokerHost}, + Topic: topic, + }) + defer func(reader *kafka.Reader) { + if err := reader.Close(); err != nil { + return + } + }(reader) + + for { + msg, err := reader.ReadMessage(ctx) + if err != nil { + break + } + var convertMsg assets.Table + if err := proto.Unmarshal(msg.Value, &convertMsg); err != nil { + return errors.Wrap(err, "failed to parse kafka message") + } + *data = append(*data, &convertMsg) + } + + return nil +} + +// setupKafka intializes kafka broker with topic and partition +func setupKafka() error { + conn, err := kafka.DialLeader(context.TODO(), "tcp", net.JoinHostPort(broker.Host, strconv.Itoa(broker.Port)), testTopic, partition) + if err != nil { + return errors.Wrap(err, "failed to setup kafka connection") + } + defer func(conn *kafka.Conn) { + if err := conn.Close(); err != nil { + return + } + }(conn) + + if err := conn.DeleteTopics(testTopic); err != nil { + return errors.Wrap(err, "failed to delete topic") + } + if err := conn.CreateTopics(kafka.TopicConfig{ + Topic: testTopic, + NumPartitions: 1, + ReplicationFactor: 1, + }); err != nil { + return errors.Wrap(err, "failed to create topic") + } + + return nil +} + +// setupMySQL initializes mysql database +func setupMySQL() (err error) { + // create database, user and grant access + if err = execute(db, []string{ + fmt.Sprintf("DROP DATABASE IF EXISTS %s", testDB), + fmt.Sprintf("CREATE DATABASE %s", testDB), + fmt.Sprintf("USE %s;", testDB), + fmt.Sprintf(`CREATE USER IF NOT EXISTS '%s'@'%%' IDENTIFIED BY '%s';`, user, pass), + fmt.Sprintf(`GRANT ALL PRIVILEGES ON *.* TO '%s'@'%%';`, user), + }); err != nil { + return errors.Wrap(err, "failed to create database") + } + + // create and populate tables + if err = execute(db, []string{ + "CREATE TABLE applicant (applicant_id int, last_name varchar(255), first_name varchar(255));", + "INSERT INTO applicant VALUES (1, 'test1', 'test11');", + "CREATE TABLE jobs (job_id int, job varchar(255), department varchar(255));", + "INSERT INTO jobs VALUES (2, 'test2', 'test22');", + }); err != nil { + return errors.Wrap(err, "failed to populate database") + } + + return +} + +// execute executes a list of sql statements +func execute(db *sql.DB, queries []string) (err error) { + for _, query := range queries { + _, err = db.Exec(query) + if err != nil { + return + } + } + + return +} + +// getExpectedTables returns the expected tables +func getExpectedTables() []*assets.Table { + return []*assets.Table{ + { + Resource: &common.Resource{ + Urn: testDB + ".applicant", + Name: "applicant", + }, + Schema: &facets.Columns{ + Columns: []*facets.Column{ + { + Name: "applicant_id", + DataType: "int", + IsNullable: true, + Length: 0, + }, + { + Name: "first_name", + DataType: "varchar", + IsNullable: true, + Length: 255, + }, + { + Name: "last_name", + DataType: "varchar", + IsNullable: true, + Length: 255, + }, + }, + }, + }, + { + Resource: &common.Resource{ + Urn: testDB + ".jobs", + Name: "jobs", + }, + Schema: &facets.Columns{ + Columns: []*facets.Column{ + { + Name: "department", + DataType: "varchar", + IsNullable: true, + Length: 255, + }, + { + Name: "job", + DataType: "varchar", + IsNullable: true, + Length: 255, + }, + { + Name: "job_id", + DataType: "int", + IsNullable: true, + Length: 0, + }, + }, + }, + }, + } +} diff --git a/e2e/mysql_kafka.yml b/e2e/mysql_kafka.yml new file mode 100644 index 000000000..26d7dff6a --- /dev/null +++ b/e2e/mysql_kafka.yml @@ -0,0 +1,15 @@ +name: sample +source: + type: mysql + config: + user_id: root + password: admin + host: 127.0.0.1:3306 +sinks: + - name: kafka + config: + brokers: "localhost:9093" + topic: "topic-a" +processors: + - name: enrich + config: From 4cf263ae9d81cc79dce3aff6b9cfa3a4f6e06636 Mon Sep 17 00:00:00 2001 From: Aditya Singh Sisodiya Date: Tue, 5 Oct 2021 11:49:06 +0530 Subject: [PATCH 2/9] refactor: shift e2e to test folder --- {e2e => test/e2e}/e2e_test.go | 0 {e2e => test/e2e}/mysql_kafka.yml | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename {e2e => test/e2e}/e2e_test.go (100%) rename {e2e => test/e2e}/mysql_kafka.yml (100%) diff --git a/e2e/e2e_test.go b/test/e2e/e2e_test.go similarity index 100% rename from e2e/e2e_test.go rename to test/e2e/e2e_test.go diff --git a/e2e/mysql_kafka.yml b/test/e2e/mysql_kafka.yml similarity index 100% rename from e2e/mysql_kafka.yml rename to test/e2e/mysql_kafka.yml From 361e2705de3214bfa2ee110f5db280de686fe174 Mon Sep 17 00:00:00 2001 From: Aditya Singh Sisodiya Date: Tue, 5 Oct 2021 13:57:08 +0530 Subject: [PATCH 3/9] refactor: restructure test folder --- agent/agent_test.go | 36 +++++++++---------- plugins/extractors/bigquery/bigquery_test.go | 6 ++-- plugins/extractors/bigtable/bigtable_test.go | 9 +++-- .../extractors/cassandra/cassandra_test.go | 10 +++--- .../extractors/clickhouse/clickhouse_test.go | 6 ++-- plugins/extractors/couchdb/couchdb_test.go | 8 ++--- plugins/extractors/csv/csv_test.go | 8 ++--- plugins/extractors/elastic/elastic_test.go | 6 ++-- plugins/extractors/gcs/gcs_test.go | 4 +-- plugins/extractors/grafana/grafana_test.go | 8 ++--- plugins/extractors/kafka/kafka_test.go | 6 ++-- plugins/extractors/metabase/metabase_test.go | 8 ++--- plugins/extractors/mongodb/mongodb_test.go | 8 ++--- plugins/extractors/mssql/mssql_test.go | 8 ++--- plugins/extractors/mysql/mysql_test.go | 8 ++--- plugins/extractors/postgres/postgres_test.go | 8 ++--- plugins/extractors/superset/superset_test.go | 8 ++--- plugins/sinks/columbus/sink_test.go | 14 ++++---- test/e2e/e2e_test.go | 8 ++--- test/{ => utils}/dockertest.go | 2 +- test/{ => utils}/logger.go | 2 +- 21 files changed, 90 insertions(+), 91 deletions(-) rename test/{ => utils}/dockertest.go (98%) rename test/{ => utils}/logger.go (91%) diff --git a/agent/agent_test.go b/agent/agent_test.go index dc1703379..684f920f4 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -3,7 +3,7 @@ package agent_test import ( "context" "errors" - "github.com/odpf/meteor/test" + "github.com/odpf/meteor/test/utils" "testing" "time" @@ -43,7 +43,7 @@ func TestRunnerRun(t *testing.T) { ExtractorFactory: registry.NewExtractorFactory(), ProcessorFactory: registry.NewProcessorFactory(), SinkFactory: registry.NewSinkFactory(), - Logger: test.Logger, + Logger: utils.Logger, }) run := r.Run(validRecipe) assert.IsType(t, agent.Run{}, run) @@ -67,7 +67,7 @@ func TestRunnerRun(t *testing.T) { ExtractorFactory: registry.NewExtractorFactory(), ProcessorFactory: pf, SinkFactory: sf, - Logger: test.Logger, + Logger: utils.Logger, }) run := r.Run(validRecipe) assert.Error(t, run.Error) @@ -92,7 +92,7 @@ func TestRunnerRun(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: registry.NewProcessorFactory(), SinkFactory: sf, - Logger: test.Logger, + Logger: utils.Logger, }) run := r.Run(validRecipe) assert.Error(t, run.Error) @@ -119,7 +119,7 @@ func TestRunnerRun(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: pf, SinkFactory: registry.NewSinkFactory(), - Logger: test.Logger, + Logger: utils.Logger, }) run := r.Run(validRecipe) assert.Error(t, run.Error) @@ -150,7 +150,7 @@ func TestRunnerRun(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: pf, SinkFactory: sf, - Logger: test.Logger, + Logger: utils.Logger, }) run := r.Run(validRecipe) assert.Error(t, run.Error) @@ -183,7 +183,7 @@ func TestRunnerRun(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: pf, SinkFactory: sf, - Logger: test.Logger, + Logger: utils.Logger, }) run := r.Run(validRecipe) assert.Error(t, run.Error) @@ -218,7 +218,7 @@ func TestRunnerRun(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: pf, SinkFactory: sf, - Logger: test.Logger, + Logger: utils.Logger, }) run := r.Run(validRecipe) assert.Error(t, run.Error) @@ -253,7 +253,7 @@ func TestRunnerRun(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: pf, SinkFactory: sf, - Logger: test.Logger, + Logger: utils.Logger, }) run := r.Run(validRecipe) assert.Error(t, run.Error) @@ -287,7 +287,7 @@ func TestRunnerRun(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: pf, SinkFactory: sf, - Logger: test.Logger, + Logger: utils.Logger, }) run := r.Run(validRecipe) assert.Error(t, run.Error) @@ -328,7 +328,7 @@ func TestRunnerRun(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: pf, SinkFactory: sf, - Logger: test.Logger, + Logger: utils.Logger, }) run := r.Run(validRecipe) assert.Error(t, run.Error) @@ -368,7 +368,7 @@ func TestRunnerRun(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: pf, SinkFactory: sf, - Logger: test.Logger, + Logger: utils.Logger, }) run := r.Run(validRecipe) assert.Error(t, run.Error) @@ -410,7 +410,7 @@ func TestRunnerRun(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: pf, SinkFactory: sf, - Logger: test.Logger, + Logger: utils.Logger, }) run := r.Run(validRecipe) assert.NoError(t, run.Error) @@ -452,7 +452,7 @@ func TestRunnerRun(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: pf, SinkFactory: sf, - Logger: test.Logger, + Logger: utils.Logger, StopOnSinkError: true, }) run := r.Run(validRecipe) @@ -495,7 +495,7 @@ func TestRunnerRun(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: pf, SinkFactory: sf, - Logger: test.Logger, + Logger: utils.Logger, }) run := r.Run(validRecipe) assert.NoError(t, run.Error) @@ -544,7 +544,7 @@ func TestRunnerRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Monitor: monitor, - Logger: test.Logger, + Logger: utils.Logger, }) run := r.Run(validRecipe) assert.NoError(t, run.Error) @@ -589,7 +589,7 @@ func TestRunnerRun(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: pf, SinkFactory: sf, - Logger: test.Logger, + Logger: utils.Logger, MaxRetries: 2, // need to retry "at least" 2 times since Sink returns RetryError twice RetryInitialInterval: 1 * time.Millisecond, // this is to override default retry interval to reduce test time }) @@ -638,7 +638,7 @@ func TestRunnerRunMultiple(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: pf, SinkFactory: sf, - Logger: test.Logger, + Logger: utils.Logger, }) runs := r.RunMultiple(recipeList) diff --git a/plugins/extractors/bigquery/bigquery_test.go b/plugins/extractors/bigquery/bigquery_test.go index b240d2304..520cc69e9 100644 --- a/plugins/extractors/bigquery/bigquery_test.go +++ b/plugins/extractors/bigquery/bigquery_test.go @@ -4,17 +4,17 @@ package bigquery_test import ( "context" + "github.com/odpf/meteor/test/utils" "testing" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/extractors/bigquery" - "github.com/odpf/meteor/test" "github.com/stretchr/testify/assert" ) func TestInit(t *testing.T) { t.Run("should return error if config is invalid", func(t *testing.T) { - extr := bigquery.New(test.Logger) + extr := bigquery.New(utils.Logger) ctx, cancel := context.WithCancel(context.Background()) defer cancel() err := extr.Init(ctx, map[string]interface{}{ @@ -24,7 +24,7 @@ func TestInit(t *testing.T) { assert.Equal(t, plugins.InvalidConfigError{}, err) }) t.Run("should not return invalid config error if config is valid", func(t *testing.T) { - extr := bigquery.New(test.Logger) + extr := bigquery.New(utils.Logger) ctx, cancel := context.WithCancel(context.Background()) defer cancel() err := extr.Init(ctx, map[string]interface{}{ diff --git a/plugins/extractors/bigtable/bigtable_test.go b/plugins/extractors/bigtable/bigtable_test.go index 67bb7d9ef..27cc35e2b 100644 --- a/plugins/extractors/bigtable/bigtable_test.go +++ b/plugins/extractors/bigtable/bigtable_test.go @@ -4,6 +4,7 @@ package bigtable_test import ( "context" + "github.com/odpf/meteor/test/utils" "log" "os" "testing" @@ -11,8 +12,6 @@ import ( "cloud.google.com/go/bigtable" "github.com/odpf/meteor/plugins" bt "github.com/odpf/meteor/plugins/extractors/bigtable" - "github.com/odpf/meteor/test" - "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" "github.com/stretchr/testify/assert" @@ -38,7 +37,7 @@ func TestMain(m *testing.M) { _, err = bigtable.NewAdminClient(context.Background(), "dev", "dev") return } - purgeFn, err := test.CreateContainer(opts, retryFn) + purgeFn, err := utils.CreateContainer(opts, retryFn) if err != nil { log.Fatal("", err) } @@ -54,7 +53,7 @@ func TestMain(m *testing.M) { func TestInit(t *testing.T) { t.Run("should return error if no project_id in config", func(t *testing.T) { - err := bt.New(test.Logger).Init(context.TODO(), map[string]interface{}{ + err := bt.New(utils.Logger).Init(context.TODO(), map[string]interface{}{ "wrong-config": "sample-project", }) @@ -62,7 +61,7 @@ func TestInit(t *testing.T) { }) t.Run("should return error if project_id is empty", func(t *testing.T) { - err := bt.New(test.Logger).Init(context.TODO(), map[string]interface{}{ + err := bt.New(utils.Logger).Init(context.TODO(), map[string]interface{}{ "project_id": "", }) diff --git a/plugins/extractors/cassandra/cassandra_test.go b/plugins/extractors/cassandra/cassandra_test.go index ead175dab..ea4a0b0b3 100644 --- a/plugins/extractors/cassandra/cassandra_test.go +++ b/plugins/extractors/cassandra/cassandra_test.go @@ -5,6 +5,7 @@ package cassandra_test import ( "context" "fmt" + "github.com/odpf/meteor/test/utils" "log" "os" "testing" @@ -16,7 +17,6 @@ import ( "github.com/odpf/meteor/models/odpf/assets/facets" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/extractors/cassandra" - "github.com/odpf/meteor/test" "github.com/odpf/meteor/test/mocks" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" @@ -70,7 +70,7 @@ func TestMain(m *testing.M) { } return nil } - purgeFn, err := test.CreateContainer(opts, retryFn) + purgeFn, err := utils.CreateContainer(opts, retryFn) if err != nil { log.Fatal(err) } @@ -103,7 +103,7 @@ func TestEmptyHosts(t *testing.T) { // TestInit tests the configs func TestInit(t *testing.T) { t.Run("should return error for invalid configs", func(t *testing.T) { - err := cassandra.New(test.Logger).Init(context.TODO(), map[string]interface{}{ + err := cassandra.New(utils.Logger).Init(context.TODO(), map[string]interface{}{ "password": pass, "host": host, }) @@ -116,7 +116,7 @@ func TestInit(t *testing.T) { func TestExtract(t *testing.T) { t.Run("should extract and output tables metadata along with its columns", func(t *testing.T) { ctx := context.TODO() - extr := cassandra.New(test.Logger) + extr := cassandra.New(utils.Logger) err := extr.Init(ctx, map[string]interface{}{ "user_id": user, @@ -175,7 +175,7 @@ func execute(queries []string) (err error) { // newExtractor returns a new extractor func newExtractor() *cassandra.Extractor { - return cassandra.New(test.Logger) + return cassandra.New(utils.Logger) } // getExpected returns the expected result diff --git a/plugins/extractors/clickhouse/clickhouse_test.go b/plugins/extractors/clickhouse/clickhouse_test.go index 901f1076d..42b697d49 100644 --- a/plugins/extractors/clickhouse/clickhouse_test.go +++ b/plugins/extractors/clickhouse/clickhouse_test.go @@ -5,6 +5,7 @@ package clickhouse_test import ( "context" "fmt" + "github.com/odpf/meteor/test/utils" "log" "os" "testing" @@ -18,7 +19,6 @@ import ( "github.com/odpf/meteor/models/odpf/assets/facets" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/extractors/clickhouse" - "github.com/odpf/meteor/test" "github.com/odpf/meteor/test/mocks" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" @@ -65,7 +65,7 @@ func TestMain(m *testing.M) { } return db.Ping() } - purgeFn, err := test.CreateContainer(opts, retryFn) + purgeFn, err := utils.CreateContainer(opts, retryFn) if err != nil { log.Fatal(err) } @@ -206,5 +206,5 @@ func execute(db *sql.DB, queries []string) (err error) { } func newExtractor() *clickhouse.Extractor { - return clickhouse.New(test.Logger) + return clickhouse.New(utils.Logger) } diff --git a/plugins/extractors/couchdb/couchdb_test.go b/plugins/extractors/couchdb/couchdb_test.go index 887d589f5..dbc442f9e 100644 --- a/plugins/extractors/couchdb/couchdb_test.go +++ b/plugins/extractors/couchdb/couchdb_test.go @@ -5,6 +5,7 @@ package couchdb_test import ( "context" "fmt" + "github.com/odpf/meteor/test/utils" "log" "os" "strconv" @@ -14,7 +15,6 @@ import ( "github.com/go-kivik/kivik" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/extractors/couchdb" - "github.com/odpf/meteor/test" "github.com/odpf/meteor/test/mocks" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" @@ -60,7 +60,7 @@ func TestMain(m *testing.M) { _, err = client.Ping(context.TODO()) return } - purgeFn, err := test.CreateContainer(opts, retryFn) + purgeFn, err := utils.CreateContainer(opts, retryFn) if err != nil { log.Fatal(err) } @@ -81,7 +81,7 @@ func TestMain(m *testing.M) { func TestInit(t *testing.T) { t.Run("should return error for invalid configs", func(t *testing.T) { - err := couchdb.New(test.Logger).Init(context.TODO(), map[string]interface{}{ + err := couchdb.New(utils.Logger).Init(context.TODO(), map[string]interface{}{ "password": "pass", "host": host, }) @@ -93,7 +93,7 @@ func TestInit(t *testing.T) { func TestExtract(t *testing.T) { t.Run("should extract and output tables metadata along with its columns", func(t *testing.T) { ctx := context.TODO() - extr := couchdb.New(test.Logger) + extr := couchdb.New(utils.Logger) err := extr.Init(ctx, map[string]interface{}{ "user_id": user, diff --git a/plugins/extractors/csv/csv_test.go b/plugins/extractors/csv/csv_test.go index 72fdd4f8c..007b6e844 100644 --- a/plugins/extractors/csv/csv_test.go +++ b/plugins/extractors/csv/csv_test.go @@ -4,6 +4,7 @@ package csv_test import ( "context" + "github.com/odpf/meteor/test/utils" "testing" "github.com/odpf/meteor/models" @@ -12,7 +13,6 @@ import ( "github.com/odpf/meteor/models/odpf/assets/facets" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/extractors/csv" - "github.com/odpf/meteor/test" "github.com/odpf/meteor/test/mocks" "github.com/stretchr/testify/assert" ) @@ -20,7 +20,7 @@ import ( func TestInit(t *testing.T) { t.Run("should return error if fileName and directory both are empty", func(t *testing.T) { config := map[string]interface{}{} - err := csv.New(test.Logger).Init( + err := csv.New(utils.Logger).Init( context.TODO(), config) assert.Equal(t, plugins.InvalidConfigError{}, err) @@ -30,7 +30,7 @@ func TestInit(t *testing.T) { func TestExtract(t *testing.T) { t.Run("should extract data if path is a file", func(t *testing.T) { ctx := context.TODO() - extr := csv.New(test.Logger) + extr := csv.New(utils.Logger) err := extr.Init(ctx, map[string]interface{}{ "path": "./testdata/test.csv", }) @@ -64,7 +64,7 @@ func TestExtract(t *testing.T) { t.Run("should extract data from all files if path is a dir", func(t *testing.T) { ctx := context.TODO() - extr := csv.New(test.Logger) + extr := csv.New(utils.Logger) err := extr.Init(ctx, map[string]interface{}{ "path": "./testdata", }) diff --git a/plugins/extractors/elastic/elastic_test.go b/plugins/extractors/elastic/elastic_test.go index a936ed6fe..b893b2117 100644 --- a/plugins/extractors/elastic/elastic_test.go +++ b/plugins/extractors/elastic/elastic_test.go @@ -6,6 +6,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/odpf/meteor/test/utils" "log" "net/http" "os" @@ -20,7 +21,6 @@ import ( "github.com/odpf/meteor/models/odpf/assets/facets" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/extractors/elastic" - "github.com/odpf/meteor/test" "github.com/odpf/meteor/test/mocks" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" @@ -83,7 +83,7 @@ func TestMain(m *testing.M) { } return } - purgeFn, err := test.CreateContainer(opts, retryFn) + purgeFn, err := utils.CreateContainer(opts, retryFn) if err != nil { log.Fatal(err) } @@ -184,7 +184,7 @@ func jsonStruct(doc MeteorMockElasticDocs) string { } func newExtractor() *elastic.Extractor { - return elastic.New(test.Logger) + return elastic.New(utils.Logger) } func getExpectedVal() []models.Record { diff --git a/plugins/extractors/gcs/gcs_test.go b/plugins/extractors/gcs/gcs_test.go index 3735ac36c..fcfbeb7ba 100644 --- a/plugins/extractors/gcs/gcs_test.go +++ b/plugins/extractors/gcs/gcs_test.go @@ -4,17 +4,17 @@ package gcs_test import ( "context" + "github.com/odpf/meteor/test/utils" "testing" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/extractors/gcs" - "github.com/odpf/meteor/test" "github.com/stretchr/testify/assert" ) func TestInit(t *testing.T) { t.Run("should return error if no project_id in config", func(t *testing.T) { - err := gcs.New(test.Logger).Init(context.TODO(), map[string]interface{}{ + err := gcs.New(utils.Logger).Init(context.TODO(), map[string]interface{}{ "wrong-config": "sample-project", }) diff --git a/plugins/extractors/grafana/grafana_test.go b/plugins/extractors/grafana/grafana_test.go index 4c0d52aa1..db4e0e194 100644 --- a/plugins/extractors/grafana/grafana_test.go +++ b/plugins/extractors/grafana/grafana_test.go @@ -5,6 +5,7 @@ package grafana_test import ( "context" "fmt" + "github.com/odpf/meteor/test/utils" "net/http" "net/http/httptest" "os" @@ -15,7 +16,6 @@ import ( "github.com/odpf/meteor/models/odpf/assets/common" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/extractors/grafana" - "github.com/odpf/meteor/test" "github.com/odpf/meteor/test/mocks" "github.com/stretchr/testify/assert" ) @@ -34,7 +34,7 @@ func TestMain(m *testing.M) { func TestInit(t *testing.T) { t.Run("should return error if for empty base_url in config", func(t *testing.T) { - err := grafana.New(test.Logger).Init(context.TODO(), map[string]interface{}{ + err := grafana.New(utils.Logger).Init(context.TODO(), map[string]interface{}{ "base_url": "", "api_key": "qwerty123", }) @@ -43,7 +43,7 @@ func TestInit(t *testing.T) { }) t.Run("should return error if for empty api_key in config", func(t *testing.T) { - err := grafana.New(test.Logger).Init(context.TODO(), map[string]interface{}{ + err := grafana.New(utils.Logger).Init(context.TODO(), map[string]interface{}{ "base_url": testServer.URL, "api_key": "", }) @@ -117,7 +117,7 @@ func TestExtract(t *testing.T) { } ctx := context.TODO() - extractor := grafana.New(test.Logger) + extractor := grafana.New(utils.Logger) err := extractor.Init(ctx, map[string]interface{}{ "base_url": testServer.URL, "api_key": "qwerty123", diff --git a/plugins/extractors/kafka/kafka_test.go b/plugins/extractors/kafka/kafka_test.go index d28d0f882..839c91719 100644 --- a/plugins/extractors/kafka/kafka_test.go +++ b/plugins/extractors/kafka/kafka_test.go @@ -4,6 +4,7 @@ package kafka_test import ( "context" + "github.com/odpf/meteor/test/utils" "log" "net" @@ -16,7 +17,6 @@ import ( "github.com/odpf/meteor/models/odpf/assets/common" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/extractors/kafka" - "github.com/odpf/meteor/test" "github.com/odpf/meteor/test/mocks" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" @@ -60,7 +60,7 @@ func TestMain(m *testing.M) { return } - purgeContainer, err := test.CreateContainer(opts, retryFn) + purgeContainer, err := utils.CreateContainer(opts, retryFn) if err != nil { log.Fatal(err) } @@ -161,7 +161,7 @@ func setup(broker kafkaLib.Broker) (err error) { } func newExtractor() *kafka.Extractor { - return kafka.New(test.Logger) + return kafka.New(utils.Logger) } // This function compares two slices without concerning about the order diff --git a/plugins/extractors/metabase/metabase_test.go b/plugins/extractors/metabase/metabase_test.go index e01f3dc5b..2b1addf3f 100644 --- a/plugins/extractors/metabase/metabase_test.go +++ b/plugins/extractors/metabase/metabase_test.go @@ -7,6 +7,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/odpf/meteor/test/utils" "io/ioutil" "log" "net/http" @@ -18,7 +19,6 @@ import ( "github.com/odpf/meteor/models/odpf/assets" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/extractors/metabase" - "github.com/odpf/meteor/test" "github.com/odpf/meteor/test/mocks" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" @@ -82,7 +82,7 @@ func TestMain(m *testing.M) { } // Exponential backoff-retry for container to be resy to accept connections - purgeFn, err := test.CreateContainer(opts, retryFn) + purgeFn, err := utils.CreateContainer(opts, retryFn) if err != nil { log.Fatal(err) } @@ -102,7 +102,7 @@ func TestMain(m *testing.M) { func TestInit(t *testing.T) { t.Run("should return error for invalid config", func(t *testing.T) { - err := metabase.New(test.Logger).Init(context.TODO(), map[string]interface{}{ + err := metabase.New(utils.Logger).Init(context.TODO(), map[string]interface{}{ "user_id": "user", "host": host, }) @@ -114,7 +114,7 @@ func TestInit(t *testing.T) { func TestExtract(t *testing.T) { t.Run("should return dashboard model", func(t *testing.T) { ctx := context.TODO() - extr := metabase.New(test.Logger) + extr := metabase.New(utils.Logger) err := extr.Init(ctx, map[string]interface{}{ "user_id": email, "password": pass, diff --git a/plugins/extractors/mongodb/mongodb_test.go b/plugins/extractors/mongodb/mongodb_test.go index 4a89b8ece..329ddd296 100644 --- a/plugins/extractors/mongodb/mongodb_test.go +++ b/plugins/extractors/mongodb/mongodb_test.go @@ -5,6 +5,7 @@ package mongodb_test import ( "context" "fmt" + "github.com/odpf/meteor/test/utils" "log" "os" "testing" @@ -14,7 +15,6 @@ import ( "github.com/odpf/meteor/models/odpf/assets/common" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/extractors/mongodb" - "github.com/odpf/meteor/test" "github.com/odpf/meteor/test/mocks" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" @@ -68,7 +68,7 @@ func TestMain(m *testing.M) { return client.Ping(ctx, nil) } - purgeFn, err := test.CreateContainer(opts, retryFn) + purgeFn, err := utils.CreateContainer(opts, retryFn) if err != nil { log.Fatal(err) } @@ -92,7 +92,7 @@ func TestMain(m *testing.M) { func TestInit(t *testing.T) { t.Run("should return error for invalid", func(t *testing.T) { - err := mongodb.New(test.Logger).Init(context.TODO(), map[string]interface{}{ + err := mongodb.New(utils.Logger).Init(context.TODO(), map[string]interface{}{ "password": pass, "host": host, }) @@ -104,7 +104,7 @@ func TestInit(t *testing.T) { func TestExtract(t *testing.T) { t.Run("should extract and output tables metadata along with its columns", func(t *testing.T) { ctx := context.TODO() - extr := mongodb.New(test.Logger) + extr := mongodb.New(utils.Logger) err := extr.Init(ctx, map[string]interface{}{ "user_id": user, diff --git a/plugins/extractors/mssql/mssql_test.go b/plugins/extractors/mssql/mssql_test.go index 26cf8535f..38b1fd2c5 100644 --- a/plugins/extractors/mssql/mssql_test.go +++ b/plugins/extractors/mssql/mssql_test.go @@ -6,6 +6,7 @@ import ( "context" "database/sql" "fmt" + "github.com/odpf/meteor/test/utils" "log" "os" "testing" @@ -17,7 +18,6 @@ import ( "github.com/odpf/meteor/models/odpf/assets/facets" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/extractors/mssql" - "github.com/odpf/meteor/test" "github.com/odpf/meteor/test/mocks" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" @@ -58,7 +58,7 @@ func TestMain(m *testing.M) { } return db.Ping() } - purgeFn, err := test.CreateContainer(opts, retryFn) + purgeFn, err := utils.CreateContainer(opts, retryFn) if err != nil { log.Fatal(err) } @@ -79,7 +79,7 @@ func TestMain(m *testing.M) { func TestInit(t *testing.T) { t.Run("should error for invalid configurations", func(t *testing.T) { - err := mssql.New(test.Logger).Init(context.TODO(), map[string]interface{}{ + err := mssql.New(utils.Logger).Init(context.TODO(), map[string]interface{}{ "password": "pass", "host": host, }) @@ -91,7 +91,7 @@ func TestInit(t *testing.T) { func TestExtract(t *testing.T) { t.Run("should extract and output tables metadata along with its columns", func(t *testing.T) { ctx := context.TODO() - extr := mssql.New(test.Logger) + extr := mssql.New(utils.Logger) err := extr.Init(ctx, map[string]interface{}{ "user_id": user, diff --git a/plugins/extractors/mysql/mysql_test.go b/plugins/extractors/mysql/mysql_test.go index 849e92087..b4637f208 100644 --- a/plugins/extractors/mysql/mysql_test.go +++ b/plugins/extractors/mysql/mysql_test.go @@ -5,6 +5,7 @@ package mysql_test import ( "context" "fmt" + "github.com/odpf/meteor/test/utils" "log" "os" "testing" @@ -18,7 +19,6 @@ import ( "github.com/odpf/meteor/models/odpf/assets/facets" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/extractors/mysql" - "github.com/odpf/meteor/test" "github.com/odpf/meteor/test/mocks" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" @@ -58,7 +58,7 @@ func TestMain(m *testing.M) { } return db.Ping() } - purgeFn, err := test.CreateContainer(opts, retryFn) + purgeFn, err := utils.CreateContainer(opts, retryFn) if err != nil { log.Fatal(err) } @@ -79,7 +79,7 @@ func TestMain(m *testing.M) { func TestInit(t *testing.T) { t.Run("should return error for invalid configs", func(t *testing.T) { - err := mysql.New(test.Logger).Init(context.TODO(), map[string]interface{}{ + err := mysql.New(utils.Logger).Init(context.TODO(), map[string]interface{}{ "password": "pass", "host": host, }) @@ -91,7 +91,7 @@ func TestInit(t *testing.T) { func TestExtract(t *testing.T) { t.Run("should extract and output tables metadata along with its columns", func(t *testing.T) { ctx := context.TODO() - extr := mysql.New(test.Logger) + extr := mysql.New(utils.Logger) err := extr.Init(ctx, map[string]interface{}{ "user_id": user, diff --git a/plugins/extractors/postgres/postgres_test.go b/plugins/extractors/postgres/postgres_test.go index 758c4e745..f59f70196 100644 --- a/plugins/extractors/postgres/postgres_test.go +++ b/plugins/extractors/postgres/postgres_test.go @@ -5,6 +5,7 @@ package postgres_test import ( "context" "fmt" + "github.com/odpf/meteor/test/utils" "log" "os" "testing" @@ -15,7 +16,6 @@ import ( "github.com/odpf/meteor/models/odpf/assets" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/extractors/postgres" - "github.com/odpf/meteor/test" "github.com/odpf/meteor/test/mocks" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" @@ -52,7 +52,7 @@ func TestMain(m *testing.M) { } return db.Ping() } - purgeFn, err := test.CreateContainer(opts, retryFn) + purgeFn, err := utils.CreateContainer(opts, retryFn) if err != nil { log.Fatal(err) } @@ -73,7 +73,7 @@ func TestMain(m *testing.M) { func TestInit(t *testing.T) { t.Run("should return error for invalid config", func(t *testing.T) { - err := postgres.New(test.Logger).Init(context.TODO(), map[string]interface{}{ + err := postgres.New(utils.Logger).Init(context.TODO(), map[string]interface{}{ "password": "pass", "host": host, }) @@ -85,7 +85,7 @@ func TestInit(t *testing.T) { func TestExtract(t *testing.T) { t.Run("should return mockdata we generated with postgres", func(t *testing.T) { ctx := context.TODO() - extr := postgres.New(test.Logger) + extr := postgres.New(utils.Logger) err := extr.Init(ctx, map[string]interface{}{ "user_id": user, diff --git a/plugins/extractors/superset/superset_test.go b/plugins/extractors/superset/superset_test.go index 724b4e108..3bb6945e8 100644 --- a/plugins/extractors/superset/superset_test.go +++ b/plugins/extractors/superset/superset_test.go @@ -7,6 +7,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/odpf/meteor/test/utils" "io" "io/ioutil" "log" @@ -18,7 +19,6 @@ import ( "github.com/odpf/meteor/models/odpf/assets" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/extractors/superset" - "github.com/odpf/meteor/test" "github.com/odpf/meteor/test/mocks" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" @@ -83,7 +83,7 @@ func TestMain(m *testing.M) { } // exponential backoff-retry, because the application in the container might not be ready to accept connections yet - purgeFn, err := test.CreateContainer(opts, retryFn) + purgeFn, err := utils.CreateContainer(opts, retryFn) if err != nil { log.Fatal(err) } @@ -104,7 +104,7 @@ func TestMain(m *testing.M) { // TestInit tests the configs func TestInit(t *testing.T) { t.Run("should return error for invalid config", func(t *testing.T) { - err := superset.New(test.Logger).Init(context.TODO(), map[string]interface{}{ + err := superset.New(utils.Logger).Init(context.TODO(), map[string]interface{}{ "user_id": "user", "host": host, }) @@ -116,7 +116,7 @@ func TestInit(t *testing.T) { func TestExtract(t *testing.T) { t.Run("should return dashboard model", func(t *testing.T) { ctx := context.TODO() - extr := superset.New(test.Logger) + extr := superset.New(utils.Logger) err := extr.Init(ctx, map[string]interface{}{ "username": user, "password": pass, diff --git a/plugins/sinks/columbus/sink_test.go b/plugins/sinks/columbus/sink_test.go index 13fde8616..8ee96b091 100644 --- a/plugins/sinks/columbus/sink_test.go +++ b/plugins/sinks/columbus/sink_test.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "github.com/odpf/meteor/test/utils" "io/ioutil" "net/http" "testing" @@ -15,7 +16,6 @@ import ( "github.com/odpf/meteor/models/odpf/assets/facets" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/sinks/columbus" - "github.com/odpf/meteor/test" "github.com/stretchr/testify/assert" ) @@ -55,7 +55,7 @@ func TestInit(t *testing.T) { } for i, config := range invalidConfigs { t.Run(fmt.Sprintf("test invalid config #%d", i+1), func(t *testing.T) { - columbusSink := columbus.New(newmockHTTPClient(http.MethodGet, url, requestPayload), test.Logger) + columbusSink := columbus.New(newmockHTTPClient(http.MethodGet, url, requestPayload), utils.Logger) err := columbusSink.Init(context.TODO(), config) assert.Equal(t, plugins.InvalidConfigError{Type: plugins.PluginTypeSink}, err) @@ -70,7 +70,7 @@ func TestSink(t *testing.T) { client.SetupResponse(200, "") ctx := context.TODO() - columbusSink := columbus.New(client, test.Logger) + columbusSink := columbus.New(client, utils.Logger) err := columbusSink.Init(ctx, map[string]interface{}{ "host": host, "type": columbusType, @@ -95,7 +95,7 @@ func TestSink(t *testing.T) { client.SetupResponse(404, columbusError) ctx := context.TODO() - columbusSink := columbus.New(client, test.Logger) + columbusSink := columbus.New(client, utils.Logger) err := columbusSink.Init(ctx, map[string]interface{}{ "host": host, "type": "my-type", @@ -118,7 +118,7 @@ func TestSink(t *testing.T) { client.SetupResponse(code, `{"reason":"internal server error"}`) ctx := context.TODO() - columbusSink := columbus.New(client, test.Logger) + columbusSink := columbus.New(client, utils.Logger) err := columbusSink.Init(ctx, map[string]interface{}{ "host": host, "type": "my-type", @@ -141,7 +141,7 @@ func TestSink(t *testing.T) { client.SetupResponse(200, `{"success": true}`) ctx := context.TODO() - columbusSink := columbus.New(client, test.Logger) + columbusSink := columbus.New(client, utils.Logger) err := columbusSink.Init(ctx, map[string]interface{}{ "host": host, "type": "my-type", @@ -174,7 +174,7 @@ func TestSink(t *testing.T) { client.SetupResponse(200, "") ctx := context.TODO() - columbusSink := columbus.New(client, test.Logger) + columbusSink := columbus.New(client, utils.Logger) err := columbusSink.Init(ctx, map[string]interface{}{ "host": host, "type": columbusType, diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index f9758bcf6..8ab28bc27 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -6,6 +6,7 @@ import ( "context" "database/sql" "fmt" + "github.com/odpf/meteor/test/utils" "log" "net" "os" @@ -22,7 +23,6 @@ import ( _ "github.com/odpf/meteor/plugins/extractors" _ "github.com/odpf/meteor/plugins/processors" _ "github.com/odpf/meteor/plugins/sinks" - "github.com/odpf/meteor/test" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" "github.com/pkg/errors" @@ -115,7 +115,7 @@ func kafkaDockerSetup() (purge func() error, err error) { } return } - purgeContainer, err := test.CreateContainer(kafkaOpts, kafkaRetryFn) + purgeContainer, err := utils.CreateContainer(kafkaOpts, kafkaRetryFn) if err != nil { log.Fatal(err) } @@ -147,7 +147,7 @@ func mysqlDockerSetup() (purge func() error, err error) { } return db.Ping() } - purgeContainer, err := test.CreateContainer(mysqlOpts, mysqlRetryFn) + purgeContainer, err := utils.CreateContainer(mysqlOpts, mysqlRetryFn) if err != nil { log.Fatal(err) } @@ -176,7 +176,7 @@ func TestRecipe(t *testing.T) { if err != nil { t.Error(err) } - command := cmd.New(test.Logger, nil, cfg) + command := cmd.New(utils.Logger, nil, cfg) command.SetArgs([]string{"run", "mysql_kafka.yml"}) if err := command.Execute(); err != nil { if strings.HasPrefix(err.Error(), "unknown command ") { diff --git a/test/dockertest.go b/test/utils/dockertest.go similarity index 98% rename from test/dockertest.go rename to test/utils/dockertest.go index e7311a566..4f2ad1051 100644 --- a/test/dockertest.go +++ b/test/utils/dockertest.go @@ -1,4 +1,4 @@ -package test +package utils import ( "fmt" diff --git a/test/logger.go b/test/utils/logger.go similarity index 91% rename from test/logger.go rename to test/utils/logger.go index f3d7c7189..602b3896e 100644 --- a/test/logger.go +++ b/test/utils/logger.go @@ -1,4 +1,4 @@ -package test +package utils import ( "io/ioutil" From 36d2721b8719ec1f38fdea3351b60c18d3a62174 Mon Sep 17 00:00:00 2001 From: Aditya Singh Sisodiya Date: Thu, 7 Oct 2021 14:54:50 +0530 Subject: [PATCH 4/9] fix: close method to close any connection --- agent/agent.go | 6 ++++++ agent/agent_test.go | 8 ++++++++ agent/stream.go | 12 ++++++++++++ plugins/plugin.go | 3 +++ plugins/sinks/columbus/sink.go | 2 ++ plugins/sinks/console/sink.go | 2 ++ plugins/sinks/kafka/sink.go | 6 ++++-- test/mocks/plugin.go | 5 +++++ 8 files changed, 42 insertions(+), 2 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 873c7c3a8..03e64045c 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -254,6 +254,12 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.SinkRecipe, stream *str return err }, defaultBatchSize) + stream.onClose(func() { + if err = sink.Close(); err != nil { + r.logger.Warn("error closing sink", "sink", sr.Name, "error", err) + } + }) + return } diff --git a/agent/agent_test.go b/agent/agent_test.go index 684f920f4..7f1d6c64d 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -208,6 +208,7 @@ func TestRunnerRun(t *testing.T) { sink := mocks.NewSink() sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(errors.New("some error")).Once() + sink.On("Close").Return(nil) defer sink.AssertExpectations(t) sf := registry.NewSinkFactory() if err := sf.Register("test-sink", newSink(sink)); err != nil { @@ -243,6 +244,7 @@ func TestRunnerRun(t *testing.T) { sink := mocks.NewSink() sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once() + sink.On("Close").Return(nil) defer sink.AssertExpectations(t) sf := registry.NewSinkFactory() if err := sf.Register("test-sink", newSink(sink)); err != nil { @@ -277,6 +279,7 @@ func TestRunnerRun(t *testing.T) { sink := mocks.NewSink() sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once() + sink.On("Close").Return(nil) defer sink.AssertExpectations(t) sf := registry.NewSinkFactory() if err := sf.Register("test-sink", newSink(sink)); err != nil { @@ -318,6 +321,7 @@ func TestRunnerRun(t *testing.T) { sink := mocks.NewSink() sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once() + sink.On("Close").Return(nil) defer sink.AssertExpectations(t) sf := registry.NewSinkFactory() if err := sf.Register("test-sink", newSink(sink)); err != nil { @@ -358,6 +362,7 @@ func TestRunnerRun(t *testing.T) { sink := mocks.NewSink() sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once() + sink.On("Close").Return(nil) defer sink.AssertExpectations(t) sf := registry.NewSinkFactory() if err := sf.Register("test-sink", newSink(sink)); err != nil { @@ -400,6 +405,7 @@ func TestRunnerRun(t *testing.T) { sink := mocks.NewSink() sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once() sink.On("Sink", mockCtx, data).Return(errors.New("some error")) + sink.On("Close").Return(nil) defer sink.AssertExpectations(t) sf := registry.NewSinkFactory() if err := sf.Register("test-sink", newSink(sink)); err != nil { @@ -442,6 +448,7 @@ func TestRunnerRun(t *testing.T) { sink := mocks.NewSink() sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once() sink.On("Sink", mockCtx, data).Return(errors.New("some error")) + sink.On("Close").Return(nil) defer sink.AssertExpectations(t) sf := registry.NewSinkFactory() if err := sf.Register("test-sink", newSink(sink)); err != nil { @@ -485,6 +492,7 @@ func TestRunnerRun(t *testing.T) { sink := mocks.NewSink() sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once() sink.On("Sink", mockCtx, data).Return(nil) + sink.On("Close").Return(nil) defer sink.AssertExpectations(t) sf := registry.NewSinkFactory() if err := sf.Register("test-sink", newSink(sink)); err != nil { diff --git a/agent/stream.go b/agent/stream.go index ac3f4e9e5..a6fc1f4f8 100644 --- a/agent/stream.go +++ b/agent/stream.go @@ -18,6 +18,7 @@ type subscriber struct { type stream struct { middlewares []streamMiddleware subscribers []*subscriber + onCloses []func() closed bool err error } @@ -38,6 +39,13 @@ func (s *stream) subscribe(callback func(batchedData []models.Record) error, bat return s } +// onClose() is used to register callback for after stream is closed. +func (s *stream) onClose(callback func()) *stream { + s.onCloses = append(s.onCloses, callback) + + return s +} + // broadcast() will start listening to emitter for any pushed data. // This process is blocking, so most times you would want to call this inside a goroutine. func (s *stream) broadcast() error { @@ -116,6 +124,10 @@ func (s *stream) Close() { close(l.channel) } s.closed = true + + for _, onClose := range s.onCloses { + onClose() + } } func (s *stream) runMiddlewares(d models.Record) (res models.Record, err error) { diff --git a/plugins/plugin.go b/plugins/plugin.go index 481e27e57..4f60bca04 100644 --- a/plugins/plugin.go +++ b/plugins/plugin.go @@ -55,6 +55,9 @@ type Processor interface { type Syncer interface { Plugin Sink(ctx context.Context, batch []models.Record) (err error) + + // Close will be called once after everything is done + Close() error } // ParseInfo parses the plugin's meta.yaml file and returns an plugin Info struct. diff --git a/plugins/sinks/columbus/sink.go b/plugins/sinks/columbus/sink.go index 08a170799..46bb1d5c9 100644 --- a/plugins/sinks/columbus/sink.go +++ b/plugins/sinks/columbus/sink.go @@ -92,6 +92,8 @@ func (s *Sink) Sink(ctx context.Context, batch []models.Record) (err error) { return } +func (s *Sink) Close() (err error) { return } + func (s *Sink) buildColumbusPayload(metadata models.Metadata) (interface{}, error) { // skip if mapping is not defined if s.config.Mapping == nil { diff --git a/plugins/sinks/console/sink.go b/plugins/sinks/console/sink.go index 5f5ed64fc..a757d6527 100644 --- a/plugins/sinks/console/sink.go +++ b/plugins/sinks/console/sink.go @@ -49,6 +49,8 @@ func (s *Sink) Sink(ctx context.Context, batch []models.Record) (err error) { return nil } +func (s *Sink) Close() (err error) { return } + func (s *Sink) process(value interface{}) error { jsonBytes, err := json.Marshal(value) if err != nil { diff --git a/plugins/sinks/kafka/sink.go b/plugins/sinks/kafka/sink.go index 9a227e118..6e455c670 100644 --- a/plugins/sinks/kafka/sink.go +++ b/plugins/sinks/kafka/sink.go @@ -72,8 +72,6 @@ func (s *Sink) Init(ctx context.Context, configMap map[string]interface{}) (err } func (s *Sink) Sink(ctx context.Context, batch []models.Record) (err error) { - defer s.writer.Close() - for _, record := range batch { if err := s.push(ctx, record.Data()); err != nil { return err @@ -83,6 +81,10 @@ func (s *Sink) Sink(ctx context.Context, batch []models.Record) (err error) { return } +func (s *Sink) Close() (err error) { + return s.writer.Close() +} + func (s *Sink) push(ctx context.Context, payload interface{}) error { // struct needs to be cast to pointer to implement proto methods payload = castModelToPointer(payload) diff --git a/test/mocks/plugin.go b/test/mocks/plugin.go index b93d3cba4..6e12f7f9e 100644 --- a/test/mocks/plugin.go +++ b/test/mocks/plugin.go @@ -76,6 +76,11 @@ func (m *Sink) Sink(ctx context.Context, batch []models.Record) error { return args.Error(0) } +func (m *Sink) Close() error { + args := m.Called() + return args.Error(0) +} + type Emitter struct { data []models.Record } From f0a7f88960284cc7bf18f2816e59d2b9de1750c4 Mon Sep 17 00:00:00 2001 From: Aditya Singh Sisodiya Date: Thu, 7 Oct 2021 18:27:31 +0530 Subject: [PATCH 5/9] refactor: e2e test --- agent/agent_test.go | 11 +-- plugins/sinks/kafka/sink.go | 10 --- test/e2e/e2e_test.go | 147 ++++++++++++++++++------------------ 3 files changed, 80 insertions(+), 88 deletions(-) diff --git a/agent/agent_test.go b/agent/agent_test.go index 7f1d6c64d..c6bf99fb2 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -3,10 +3,6 @@ package agent_test import ( "context" "errors" - "github.com/odpf/meteor/test/utils" - "testing" - "time" - "github.com/odpf/meteor/agent" "github.com/odpf/meteor/models" "github.com/odpf/meteor/models/odpf/assets" @@ -14,8 +10,11 @@ import ( "github.com/odpf/meteor/recipe" "github.com/odpf/meteor/registry" "github.com/odpf/meteor/test/mocks" + "github.com/odpf/meteor/test/utils" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "testing" + "time" ) var mockCtx = mock.AnythingOfType("*context.emptyCtx") @@ -208,7 +207,6 @@ func TestRunnerRun(t *testing.T) { sink := mocks.NewSink() sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(errors.New("some error")).Once() - sink.On("Close").Return(nil) defer sink.AssertExpectations(t) sf := registry.NewSinkFactory() if err := sf.Register("test-sink", newSink(sink)); err != nil { @@ -536,6 +534,7 @@ func TestRunnerRun(t *testing.T) { sink := mocks.NewSink() sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once() sink.On("Sink", mockCtx, data).Return(nil) + sink.On("Close").Return(nil) defer sink.AssertExpectations(t) sf := registry.NewSinkFactory() if err := sf.Register("test-sink", newSink(sink)); err != nil { @@ -587,6 +586,7 @@ func TestRunnerRun(t *testing.T) { sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once() sink.On("Sink", mockCtx, data).Return(plugins.NewRetryError(err)).Once() sink.On("Sink", mockCtx, data).Return(nil) + sink.On("Close").Return(nil) defer sink.AssertExpectations(t) sf := registry.NewSinkFactory() if err := sf.Register("test-sink", newSink(sink)); err != nil { @@ -636,6 +636,7 @@ func TestRunnerRunMultiple(t *testing.T) { sink := mocks.NewSink() sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil) sink.On("Sink", mockCtx, data).Return(nil) + sink.On("Close").Return(nil) defer sink.AssertExpectations(t) sf := registry.NewSinkFactory() if err := sf.Register("test-sink", newSink(sink)); err != nil { diff --git a/plugins/sinks/kafka/sink.go b/plugins/sinks/kafka/sink.go index 6e455c670..b2bba9edd 100644 --- a/plugins/sinks/kafka/sink.go +++ b/plugins/sinks/kafka/sink.go @@ -86,9 +86,6 @@ func (s *Sink) Close() (err error) { } func (s *Sink) push(ctx context.Context, payload interface{}) error { - // struct needs to be cast to pointer to implement proto methods - payload = castModelToPointer(payload) - kafkaValue, err := s.buildValue(payload) if err != nil { return err @@ -190,13 +187,6 @@ func (s *Sink) getTopLevelKeyFromPath(keyPath string) (string, error) { return keyPaths[1], nil } -func castModelToPointer(value interface{}) interface{} { - vp := reflect.New(reflect.TypeOf(value)) - vp.Elem().Set(reflect.ValueOf(value)) - - return vp.Interface() -} - func createWriter(config Config) *kafka.Writer { brokers := strings.Split(config.Brokers, ",") return &kafka.Writer{ diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 8ab28bc27..409ca1913 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -1,4 +1,4 @@ -// +build integration +//+build integration package e2e_test @@ -7,6 +7,8 @@ import ( "database/sql" "fmt" "github.com/odpf/meteor/test/utils" + "github.com/ory/dockertest/v3" + "github.com/ory/dockertest/v3/docker" "log" "net" "os" @@ -23,8 +25,6 @@ import ( _ "github.com/odpf/meteor/plugins/extractors" _ "github.com/odpf/meteor/plugins/processors" _ "github.com/odpf/meteor/plugins/sinks" - "github.com/ory/dockertest/v3" - "github.com/ory/dockertest/v3/docker" "github.com/pkg/errors" "github.com/segmentio/kafka-go" "github.com/stretchr/testify/assert" @@ -88,75 +88,8 @@ func TestMain(m *testing.M) { os.Exit(code) } -// kafkaDockerSetup sets up a kafka docker container -func kafkaDockerSetup() (purge func() error, err error) { - // kafka setup test - kafkaOpts := dockertest.RunOptions{ - Repository: "moeenz/docker-kafka-kraft", - Tag: "latest", - Env: []string{ - "KRAFT_CONTAINER_HOST_NAME=kafka", - }, - ExposedPorts: []string{"9093"}, - PortBindings: map[docker.Port][]docker.PortBinding{ - "9093": { - {HostIP: "localhost", HostPort: "9093"}, - }, - }, - } - // exponential backoff-retry, because the application in the container might not be ready to accept connections yet - kafkaRetryFn := func(resource *dockertest.Resource) (err error) { - // create client - if conn, err = kafka.Dial("tcp", brokerHost); err != nil { - return errors.Wrap(err, "failed to kafka create client") - } - if broker, err = conn.Controller(); err != nil { - return errors.Wrap(err, "failed to generate broker request") - } - return - } - purgeContainer, err := utils.CreateContainer(kafkaOpts, kafkaRetryFn) - if err != nil { - log.Fatal(err) - } - - return purgeContainer, nil -} - -// mysqlDockerSetup sets up a mysql docker container -func mysqlDockerSetup() (purge func() error, err error) { - // mysql setup test - mysqlOpts := dockertest.RunOptions{ - Repository: "mysql", - Tag: "latest", - Env: []string{ - "MYSQL_ROOT_PASSWORD=" + pass, - }, - ExposedPorts: []string{"3306"}, - PortBindings: map[docker.Port][]docker.PortBinding{ - "3306": { - {HostIP: "0.0.0.0", HostPort: "3306"}, - }, - }, - } - // exponential backoff-retry, because the application in the container might not be ready to accept connections yet - mysqlRetryFn := func(resource *dockertest.Resource) (err error) { - db, err = sql.Open("mysql", fmt.Sprintf("root:%s@tcp(%s)/", pass, mysqlHost)) - if err != nil { - return errors.Wrap(err, "failed to create mysql client") - } - return db.Ping() - } - purgeContainer, err := utils.CreateContainer(mysqlOpts, mysqlRetryFn) - if err != nil { - log.Fatal(err) - } - - return purgeContainer, nil -} - -// TestRecipe tests the recipe from source to sink completely -func TestRecipe(t *testing.T) { +// TestMySqlToKafka tests the recipe from source to sink completely +func TestMySqlToKafka(t *testing.T) { err := setupKafka() if err != nil { t.Fatal(err) @@ -218,6 +151,7 @@ func listenToTopic(ctx context.Context, topic string, data *[]*assets.Table) err msg, err := reader.ReadMessage(ctx) if err != nil { break + } var convertMsg assets.Table if err := proto.Unmarshal(msg.Value, &convertMsg); err != nil { @@ -229,7 +163,7 @@ func listenToTopic(ctx context.Context, topic string, data *[]*assets.Table) err return nil } -// setupKafka intializes kafka broker with topic and partition +// setupKafka initializes kafka broker with topic and partition func setupKafka() error { conn, err := kafka.DialLeader(context.TODO(), "tcp", net.JoinHostPort(broker.Host, strconv.Itoa(broker.Port)), testTopic, partition) if err != nil { @@ -293,6 +227,73 @@ func execute(db *sql.DB, queries []string) (err error) { return } +// kafkaDockerSetup sets up a kafka docker container +func kafkaDockerSetup() (purge func() error, err error) { + // kafka setup test + kafkaOpts := dockertest.RunOptions{ + Repository: "moeenz/docker-kafka-kraft", + Tag: "latest", + Env: []string{ + "KRAFT_CONTAINER_HOST_NAME=kafka", + }, + ExposedPorts: []string{"9093"}, + PortBindings: map[docker.Port][]docker.PortBinding{ + "9093": { + {HostIP: "localhost", HostPort: "9093"}, + }, + }, + } + // exponential backoff-retry, because the application in the container might not be ready to accept connections yet + kafkaRetryFn := func(resource *dockertest.Resource) (err error) { + // create client + if conn, err = kafka.Dial("tcp", brokerHost); err != nil { + return errors.Wrap(err, "failed to kafka create client") + } + if broker, err = conn.Controller(); err != nil { + return errors.Wrap(err, "failed to generate broker request") + } + return + } + purgeContainer, err := utils.CreateContainer(kafkaOpts, kafkaRetryFn) + if err != nil { + log.Fatal(err) + } + + return purgeContainer, nil +} + +// mysqlDockerSetup sets up a mysql docker container +func mysqlDockerSetup() (purge func() error, err error) { + // mysql setup test + mysqlOpts := dockertest.RunOptions{ + Repository: "mysql", + Tag: "latest", + Env: []string{ + "MYSQL_ROOT_PASSWORD=" + pass, + }, + ExposedPorts: []string{"3306"}, + PortBindings: map[docker.Port][]docker.PortBinding{ + "3306": { + {HostIP: "0.0.0.0", HostPort: "3306"}, + }, + }, + } + // exponential backoff-retry, because the application in the container might not be ready to accept connections yet + mysqlRetryFn := func(resource *dockertest.Resource) (err error) { + db, err = sql.Open("mysql", fmt.Sprintf("root:%s@tcp(%s)/", pass, mysqlHost)) + if err != nil { + return errors.Wrap(err, "failed to create mysql client") + } + return db.Ping() + } + purgeContainer, err := utils.CreateContainer(mysqlOpts, mysqlRetryFn) + if err != nil { + log.Fatal(err) + } + + return purgeContainer, nil +} + // getExpectedTables returns the expected tables func getExpectedTables() []*assets.Table { return []*assets.Table{ From 45df5c8a2001d66f2cdfc9924ba6ffe06ab4dd4d Mon Sep 17 00:00:00 2001 From: Aditya Singh Sisodiya Date: Fri, 8 Oct 2021 11:52:01 +0530 Subject: [PATCH 6/9] feat: add make test-e2e command --- Makefile | 3 +++ test/e2e/e2e_test.go | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 1e42402a7..c8e2b793e 100644 --- a/Makefile +++ b/Makefile @@ -18,6 +18,9 @@ test: test-coverage: test go tool cover -html=coverage.out +test-e2e: + go test ./... -tags=e2e_test + generate-proto: ## regenerate protos @echo " > cloning protobuf from odpf/proton" @echo " > generating protobuf" diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 409ca1913..eda36cb40 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -1,4 +1,4 @@ -//+build integration +//+build e2e_test package e2e_test @@ -40,7 +40,7 @@ var ( const ( testDB = "test_db" user = "test_user" - pass = "pass" + pass = "admin" mysqlHost = "localhost:3306" brokerHost = "localhost:9093" testTopic = "topic-a" From 757e89691830d4c83c3de5c634af0f8a2a463243 Mon Sep 17 00:00:00 2001 From: Aditya Singh Sisodiya Date: Fri, 8 Oct 2021 12:23:15 +0530 Subject: [PATCH 7/9] chore: update test yaml --- .github/workflows/test.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7ca098045..4b781a8c5 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -14,4 +14,6 @@ jobs: - name: Install packages run: go mod tidy - name: Run Test - run: make test \ No newline at end of file + run: make test + - name: Run e2e-Test + run: make test-e2e From 47366392bee78a5b30c657e0ead416c5fa9f43f0 Mon Sep 17 00:00:00 2001 From: Aditya Singh Sisodiya Date: Fri, 8 Oct 2021 12:46:16 +0530 Subject: [PATCH 8/9] refactor: change build tag --- Makefile | 2 +- test/e2e/e2e_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index c8e2b793e..f4f4e6814 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ test-coverage: test go tool cover -html=coverage.out test-e2e: - go test ./... -tags=e2e_test + go test ./test/e2e -tags=integration generate-proto: ## regenerate protos @echo " > cloning protobuf from odpf/proton" diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index eda36cb40..166ece476 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -1,4 +1,4 @@ -//+build e2e_test +//+build integration package e2e_test From ba3c7f837968e59d9e2e8301f608bedcf21a76cf Mon Sep 17 00:00:00 2001 From: Aditya Singh Sisodiya Date: Fri, 8 Oct 2021 13:28:37 +0530 Subject: [PATCH 9/9] chore: add count --- .github/workflows/test.yml | 2 -- Makefile | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 4b781a8c5..6b2200699 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -15,5 +15,3 @@ jobs: run: go mod tidy - name: Run Test run: make test - - name: Run e2e-Test - run: make test-e2e diff --git a/Makefile b/Makefile index f4f4e6814..87931d1c3 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ test-coverage: test go tool cover -html=coverage.out test-e2e: - go test ./test/e2e -tags=integration + go test ./test/e2e -tags=integration -count=1 generate-proto: ## regenerate protos @echo " > cloning protobuf from odpf/proton"