diff --git a/agent/agent.go b/agent/agent.go index e17e3b09a..ee565a957 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -142,10 +142,12 @@ func (r *Agent) Run(ctx context.Context, recipe recipe.Recipe) (run Run) { } for _, sr := range recipe.Sinks { - if err := r.setupSink(ctx, sr, stream, recipe); err != nil { + sink, err := r.setupSink(ctx, sr, stream, recipe) + if err != nil { run.Error = errors.Wrap(err, "failed to setup sink") return } + defer sink.Close() } // to gather total number of records extracted @@ -233,13 +235,12 @@ func (r *Agent) setupProcessor(ctx context.Context, pr recipe.PluginRecipe, str return } -func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *stream, recipe recipe.Recipe) (err error) { - var sink plugins.Syncer +func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *stream, recipe recipe.Recipe) (sink plugins.Syncer, err error) { if sink, err = r.sinkFactory.Get(sr.Name); err != nil { - return errors.Wrapf(err, "could not find sink \"%s\"", sr.Name) + return nil, errors.Wrapf(err, "could not find sink \"%s\"", sr.Name) } if err = sink.Init(ctx, sr.Config); err != nil { - return errors.Wrapf(err, "could not initiate sink \"%s\"", sr.Name) + return nil, errors.Wrapf(err, "could not initiate sink \"%s\"", sr.Name) } retryNotification := func(e error, d time.Duration) { diff --git a/go.mod b/go.mod index 1c2d316bd..71092c2c6 100644 --- a/go.mod +++ b/go.mod @@ -47,6 +47,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prestodb/presto-go-client v0.0.0-20211201125635-ad28cec17d6c github.com/schollz/progressbar/v3 v3.8.5 + github.com/scizorman/go-ndjson v0.0.0-20200902005011-1d92486df71e github.com/segmentio/kafka-go v0.4.17 github.com/sijms/go-ora/v2 v2.2.22 github.com/snowflakedb/gosnowflake v1.6.7 diff --git a/go.sum b/go.sum index 75b87b804..8e5a1aabb 100644 --- a/go.sum +++ b/go.sum @@ -1372,6 +1372,8 @@ github.com/sagikazarmark/crypt v0.1.0/go.mod h1:B/mN0msZuINBtQ1zZLEQcegFJJf9vnYI github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/schollz/progressbar/v3 v3.8.5 h1:VcmmNRO+eFN3B0m5dta6FXYXY+MEJmXdWoIS+jjssQM= github.com/schollz/progressbar/v3 v3.8.5/go.mod h1:ewO25kD7ZlaJFTvMeOItkOZa8kXu1UvFs379htE8HMQ= +github.com/scizorman/go-ndjson v0.0.0-20200902005011-1d92486df71e h1:irVMZAXAG3l2RXnj0AUEQO5hTYmIEHu0usaYQrOdlco= +github.com/scizorman/go-ndjson v0.0.0-20200902005011-1d92486df71e/go.mod h1:LoK3vOi95qRhyyu7Y7SnZBTTw2x7YQyvmOQC3HNbc+8= github.com/scylladb/go-set v1.0.2/go.mod h1:DkpGd78rljTxKAnTDPFqXSGxvETQnJyuSOQwsHycqfs= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo= diff --git a/plugins/sinks/file/README.md b/plugins/sinks/file/README.md index 9d8b351bf..7f2783c89 100644 --- a/plugins/sinks/file/README.md +++ b/plugins/sinks/file/README.md @@ -1,6 +1,6 @@ # file -Sinks metadata to a file in `json/yaml` format as per the config defined. +Sinks metadata to a file in `ndjson/yaml` format as per the config defined. ## Usage @@ -10,6 +10,7 @@ sinks: config: path: "./dir/sample.yaml" format: "yaml" + overwrite: false ``` ## Config Defination @@ -18,6 +19,7 @@ sinks: | :-- | :---- | :------ | :---------- | :-- | |`path` | `string` | `./dir/sample.yaml` | absolute or relative path from binary to output file, directory should exist| *required*| | `format` | `string` | `yaml` | data format for the output file | *required* | +| `overwrite` | `bool` | `false` | to choose whether data should be overwritten or appended in case file exists, default is `true` | *optional* | ## Contributing diff --git a/plugins/sinks/file/file.go b/plugins/sinks/file/file.go index a116fe24b..275baad24 100644 --- a/plugins/sinks/file/file.go +++ b/plugins/sinks/file/file.go @@ -3,9 +3,8 @@ package file import ( "context" _ "embed" - "encoding/json" "fmt" - "io/ioutil" + "os" "strings" "github.com/odpf/meteor/models" @@ -13,6 +12,7 @@ import ( "github.com/odpf/meteor/registry" "github.com/odpf/meteor/utils" "github.com/odpf/salt/log" + ndjson "github.com/scizorman/go-ndjson" "gopkg.in/yaml.v3" ) @@ -20,19 +20,21 @@ import ( var summary string type Config struct { - Path string `mapstructure:"path" validate:"required"` - Format string `mapstructure:"format" validate:"required"` + Overwrite bool `mapstructure:"overwrite" default:"true"` + Path string `mapstructure:"path" validate:"required"` + Format string `mapstructure:"format" validate:"required"` } var sampleConfig = ` -path: ./dir/some-dir/postgres_food_app_data.json -format: json +path: ./output-filename.txt +format: ndjson ` type Sink struct { logger log.Logger config Config format string + File *os.File } func New() plugins.Syncer { @@ -60,7 +62,16 @@ func (s *Sink) Init(ctx context.Context, config map[string]interface{}) (err err if err := s.validateFilePath(s.config.Path); err != nil { return err } + s.format = s.config.Format + if s.config.Overwrite { + s.File, err = os.Create(s.config.Path) + return err + } + s.File, err = os.OpenFile(s.config.Path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0777) + if err != nil { + return err + } return } @@ -69,8 +80,8 @@ func (s *Sink) Sink(ctx context.Context, batch []models.Record) (err error) { for _, record := range batch { data = append(data, record.Data()) } - if s.format == "json" { - err := s.jsonOut(data) + if s.format == "ndjson" { + err := s.ndjsonOut(data) if err != nil { return err } @@ -83,18 +94,17 @@ func (s *Sink) Sink(ctx context.Context, batch []models.Record) (err error) { return nil } -func (s *Sink) Close() (err error) { return } +func (s *Sink) Close() (err error) { + return nil +} -func (s *Sink) jsonOut(data []models.Metadata) error { - jsnBy, err := json.MarshalIndent(data, "", " ") - if err != nil { - return err - } - err = ioutil.WriteFile(s.config.Path, jsnBy, 0644) +func (s *Sink) ndjsonOut(data []models.Metadata) error { + jsnBy, err := ndjson.Marshal(data) if err != nil { return err } - return nil + err = s.writeBytes(jsnBy) + return err } func (s *Sink) yamlOut(data []models.Metadata) error { @@ -102,7 +112,12 @@ func (s *Sink) yamlOut(data []models.Metadata) error { if err != nil { return err } - err = ioutil.WriteFile(s.config.Path, ymlByte, 0644) + err = s.writeBytes(ymlByte) + return err +} + +func (s *Sink) writeBytes(b []byte) error { + _, err := s.File.Write(b) if err != nil { return err } diff --git a/plugins/sinks/file/file_test.go b/plugins/sinks/file/file_test.go index 05ca29fb8..4873c320f 100644 --- a/plugins/sinks/file/file_test.go +++ b/plugins/sinks/file/file_test.go @@ -18,8 +18,8 @@ import ( var summary string var validConfig = map[string]interface{}{ - "path": "./test-dir/sample.json", - "format": "json", + "path": "./test-dir/sample.ndjson", + "format": "ndjson", } func TestValidate(t *testing.T) { @@ -41,21 +41,12 @@ func TestInit(t *testing.T) { t.Run("should return error on filename missing", func(t *testing.T) { invalidConfig := map[string]interface{}{ "path": "./some-dir", - "format": "json", + "format": "ndjson", } fileSink := f.New() err := fileSink.Init(context.TODO(), invalidConfig) assert.Error(t, err) }) - // t.Run("should return error on invalid file format extension", func(t *testing.T) { - // invalidConfig := map[string]interface{}{ - // "path": "./sample.txt", - // "format": "json", - // } - // fileSink := f.New() - // err := fileSink.Init(context.TODO(), invalidConfig) - // assert.Error(t, err) - // }) t.Run("should return no error on valid config", func(t *testing.T) { fileSink := f.New() err := fileSink.Init(context.TODO(), validConfig) @@ -64,7 +55,7 @@ func TestInit(t *testing.T) { } func TestMain(t *testing.T) { - t.Run("should return no error with for valid json config", func(t *testing.T) { + t.Run("should return no error with for valid ndjson config", func(t *testing.T) { assert.NoError(t, sinkValidSetup(t, validConfig)) }) t.Run("should return no error with for valid yaml config", func(t *testing.T) { @@ -80,14 +71,14 @@ func TestMain(t *testing.T) { "path": "./test-dir/some-dir/sample.yaml", "format": "yaml", } - assert.Error(t, sinkValidSetup(t, config)) + assert.Error(t, sinkInvalidPath(t, config)) }) - t.Run("should return error for invalid directory in json", func(t *testing.T) { + t.Run("should return error for invalid directory in ndjson", func(t *testing.T) { config := map[string]interface{}{ - "path": "./test-dir/some-dir/sample.json", - "format": "json", + "path": "./test-dir/some-dir/sample.ndjson", + "format": "ndjson", } - assert.Error(t, sinkValidSetup(t, config)) + assert.Error(t, sinkInvalidPath(t, config)) }) } @@ -96,11 +87,18 @@ func TestInfo(t *testing.T) { assert.Equal(t, summary, info.Summary) } +func sinkInvalidPath(t *testing.T, config map[string]interface{}) error { + fileSink := f.New() + return fileSink.Init(context.TODO(), config) +} + func sinkValidSetup(t *testing.T, config map[string]interface{}) error { fileSink := f.New() err := fileSink.Init(context.TODO(), config) assert.NoError(t, err) - return fileSink.Sink(context.TODO(), getExpectedVal()) + err = fileSink.Sink(context.TODO(), getExpectedVal()) + assert.NoError(t, err) + return fileSink.Close() } func getExpectedVal() []models.Record { diff --git a/plugins/sinks/file/test-dir/sample.json b/plugins/sinks/file/test-dir/sample.json deleted file mode 100644 index bb9223ea9..000000000 --- a/plugins/sinks/file/test-dir/sample.json +++ /dev/null @@ -1,46 +0,0 @@ -[ - { - "resource": { - "urn": "elasticsearch.index1", - "name": "index1", - "type": "table" - }, - "profile": { - "total_rows": 1 - }, - "schema": { - "columns": [ - { - "name": "SomeInt", - "data_type": "long" - }, - { - "name": "SomeStr", - "data_type": "text" - } - ] - } - }, - { - "resource": { - "urn": "elasticsearch.index2", - "name": "index2", - "type": "table" - }, - "profile": { - "total_rows": 1 - }, - "schema": { - "columns": [ - { - "name": "SomeInt", - "data_type": "long" - }, - { - "name": "SomeStr", - "data_type": "text" - } - ] - } - } -] \ No newline at end of file diff --git a/plugins/sinks/file/test-dir/sample.ndjson b/plugins/sinks/file/test-dir/sample.ndjson new file mode 100755 index 000000000..4ad7ace4e --- /dev/null +++ b/plugins/sinks/file/test-dir/sample.ndjson @@ -0,0 +1,2 @@ +{"resource":{"urn":"elasticsearch.index1","name":"index1","type":"table"},"profile":{"total_rows":1},"schema":{"columns":[{"name":"SomeInt","data_type":"long"},{"name":"SomeStr","data_type":"text"}]}} +{"resource":{"urn":"elasticsearch.index2","name":"index2","type":"table"},"profile":{"total_rows":1},"schema":{"columns":[{"name":"SomeInt","data_type":"long"},{"name":"SomeStr","data_type":"text"}]}}