Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,12 @@ func (r *Agent) Run(ctx context.Context, recipe recipe.Recipe) (run Run) {
}

for _, sr := range recipe.Sinks {
if err := r.setupSink(ctx, sr, stream, recipe); err != nil {
sink, err := r.setupSink(ctx, sr, stream, recipe)
if err != nil {
run.Error = errors.Wrap(err, "failed to setup sink")
return
}
defer sink.Close()
}

// to gather total number of records extracted
Expand Down Expand Up @@ -233,13 +235,12 @@ func (r *Agent) setupProcessor(ctx context.Context, pr recipe.PluginRecipe, str
return
}

func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *stream, recipe recipe.Recipe) (err error) {
var sink plugins.Syncer
func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *stream, recipe recipe.Recipe) (sink plugins.Syncer, err error) {
if sink, err = r.sinkFactory.Get(sr.Name); err != nil {
return errors.Wrapf(err, "could not find sink \"%s\"", sr.Name)
return nil, errors.Wrapf(err, "could not find sink \"%s\"", sr.Name)
}
if err = sink.Init(ctx, sr.Config); err != nil {
return errors.Wrapf(err, "could not initiate sink \"%s\"", sr.Name)
return nil, errors.Wrapf(err, "could not initiate sink \"%s\"", sr.Name)
}

retryNotification := func(e error, d time.Duration) {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/prestodb/presto-go-client v0.0.0-20211201125635-ad28cec17d6c
github.com/schollz/progressbar/v3 v3.8.5
github.com/scizorman/go-ndjson v0.0.0-20200902005011-1d92486df71e
github.com/segmentio/kafka-go v0.4.17
github.com/sijms/go-ora/v2 v2.2.22
github.com/snowflakedb/gosnowflake v1.6.7
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1372,6 +1372,8 @@ github.com/sagikazarmark/crypt v0.1.0/go.mod h1:B/mN0msZuINBtQ1zZLEQcegFJJf9vnYI
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/schollz/progressbar/v3 v3.8.5 h1:VcmmNRO+eFN3B0m5dta6FXYXY+MEJmXdWoIS+jjssQM=
github.com/schollz/progressbar/v3 v3.8.5/go.mod h1:ewO25kD7ZlaJFTvMeOItkOZa8kXu1UvFs379htE8HMQ=
github.com/scizorman/go-ndjson v0.0.0-20200902005011-1d92486df71e h1:irVMZAXAG3l2RXnj0AUEQO5hTYmIEHu0usaYQrOdlco=
github.com/scizorman/go-ndjson v0.0.0-20200902005011-1d92486df71e/go.mod h1:LoK3vOi95qRhyyu7Y7SnZBTTw2x7YQyvmOQC3HNbc+8=
github.com/scylladb/go-set v1.0.2/go.mod h1:DkpGd78rljTxKAnTDPFqXSGxvETQnJyuSOQwsHycqfs=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo=
Expand Down
4 changes: 3 additions & 1 deletion plugins/sinks/file/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# file

Sinks metadata to a file in `json/yaml` format as per the config defined.
Sinks metadata to a file in `ndjson/yaml` format as per the config defined.

## Usage

Expand All @@ -10,6 +10,7 @@ sinks:
config:
path: "./dir/sample.yaml"
format: "yaml"
overwrite: false
```

## Config Defination
Expand All @@ -18,6 +19,7 @@ sinks:
| :-- | :---- | :------ | :---------- | :-- |
|`path` | `string` | `./dir/sample.yaml` | absolute or relative path from binary to output file, directory should exist| *required*|
| `format` | `string` | `yaml` | data format for the output file | *required* |
| `overwrite` | `bool` | `false` | to choose whether data should be overwritten or appended in case file exists, default is `true` | *optional* |

## Contributing

Expand Down
49 changes: 32 additions & 17 deletions plugins/sinks/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,38 @@ package file
import (
"context"
_ "embed"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"strings"

"github.com/odpf/meteor/models"
"github.com/odpf/meteor/plugins"
"github.com/odpf/meteor/registry"
"github.com/odpf/meteor/utils"
"github.com/odpf/salt/log"
ndjson "github.com/scizorman/go-ndjson"
"gopkg.in/yaml.v3"
)

//go:embed README.md
var summary string

type Config struct {
Path string `mapstructure:"path" validate:"required"`
Format string `mapstructure:"format" validate:"required"`
Overwrite bool `mapstructure:"overwrite" default:"true"`
Path string `mapstructure:"path" validate:"required"`
Format string `mapstructure:"format" validate:"required"`
}

var sampleConfig = `
path: ./dir/some-dir/postgres_food_app_data.json
format: json
path: ./output-filename.txt
format: ndjson
`

type Sink struct {
logger log.Logger
config Config
format string
File *os.File
}

func New() plugins.Syncer {
Expand Down Expand Up @@ -60,7 +62,16 @@ func (s *Sink) Init(ctx context.Context, config map[string]interface{}) (err err
if err := s.validateFilePath(s.config.Path); err != nil {
return err
}

s.format = s.config.Format
if s.config.Overwrite {
s.File, err = os.Create(s.config.Path)
return err
}
s.File, err = os.OpenFile(s.config.Path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0777)
if err != nil {
return err
}
return
}

Expand All @@ -69,8 +80,8 @@ func (s *Sink) Sink(ctx context.Context, batch []models.Record) (err error) {
for _, record := range batch {
data = append(data, record.Data())
}
if s.format == "json" {
err := s.jsonOut(data)
if s.format == "ndjson" {
err := s.ndjsonOut(data)
if err != nil {
return err
}
Expand All @@ -83,26 +94,30 @@ func (s *Sink) Sink(ctx context.Context, batch []models.Record) (err error) {
return nil
}

func (s *Sink) Close() (err error) { return }
func (s *Sink) Close() (err error) {
return nil
}

func (s *Sink) jsonOut(data []models.Metadata) error {
jsnBy, err := json.MarshalIndent(data, "", " ")
if err != nil {
return err
}
err = ioutil.WriteFile(s.config.Path, jsnBy, 0644)
func (s *Sink) ndjsonOut(data []models.Metadata) error {
jsnBy, err := ndjson.Marshal(data)
if err != nil {
return err
}
return nil
err = s.writeBytes(jsnBy)
return err
}

func (s *Sink) yamlOut(data []models.Metadata) error {
ymlByte, err := yaml.Marshal(data)
if err != nil {
return err
}
err = ioutil.WriteFile(s.config.Path, ymlByte, 0644)
err = s.writeBytes(ymlByte)
return err
}

func (s *Sink) writeBytes(b []byte) error {
_, err := s.File.Write(b)
if err != nil {
return err
}
Expand Down
36 changes: 17 additions & 19 deletions plugins/sinks/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
var summary string

var validConfig = map[string]interface{}{
"path": "./test-dir/sample.json",
"format": "json",
"path": "./test-dir/sample.ndjson",
"format": "ndjson",
}

func TestValidate(t *testing.T) {
Expand All @@ -41,21 +41,12 @@ func TestInit(t *testing.T) {
t.Run("should return error on filename missing", func(t *testing.T) {
invalidConfig := map[string]interface{}{
"path": "./some-dir",
"format": "json",
"format": "ndjson",
}
fileSink := f.New()
err := fileSink.Init(context.TODO(), invalidConfig)
assert.Error(t, err)
})
// t.Run("should return error on invalid file format extension", func(t *testing.T) {
// invalidConfig := map[string]interface{}{
// "path": "./sample.txt",
// "format": "json",
// }
// fileSink := f.New()
// err := fileSink.Init(context.TODO(), invalidConfig)
// assert.Error(t, err)
// })
t.Run("should return no error on valid config", func(t *testing.T) {
fileSink := f.New()
err := fileSink.Init(context.TODO(), validConfig)
Expand All @@ -64,7 +55,7 @@ func TestInit(t *testing.T) {
}

func TestMain(t *testing.T) {
t.Run("should return no error with for valid json config", func(t *testing.T) {
t.Run("should return no error with for valid ndjson config", func(t *testing.T) {
assert.NoError(t, sinkValidSetup(t, validConfig))
})
t.Run("should return no error with for valid yaml config", func(t *testing.T) {
Expand All @@ -80,14 +71,14 @@ func TestMain(t *testing.T) {
"path": "./test-dir/some-dir/sample.yaml",
"format": "yaml",
}
assert.Error(t, sinkValidSetup(t, config))
assert.Error(t, sinkInvalidPath(t, config))
})
t.Run("should return error for invalid directory in json", func(t *testing.T) {
t.Run("should return error for invalid directory in ndjson", func(t *testing.T) {
config := map[string]interface{}{
"path": "./test-dir/some-dir/sample.json",
"format": "json",
"path": "./test-dir/some-dir/sample.ndjson",
"format": "ndjson",
}
assert.Error(t, sinkValidSetup(t, config))
assert.Error(t, sinkInvalidPath(t, config))
})
}

Expand All @@ -96,11 +87,18 @@ func TestInfo(t *testing.T) {
assert.Equal(t, summary, info.Summary)
}

func sinkInvalidPath(t *testing.T, config map[string]interface{}) error {
fileSink := f.New()
return fileSink.Init(context.TODO(), config)
}

func sinkValidSetup(t *testing.T, config map[string]interface{}) error {
fileSink := f.New()
err := fileSink.Init(context.TODO(), config)
assert.NoError(t, err)
return fileSink.Sink(context.TODO(), getExpectedVal())
err = fileSink.Sink(context.TODO(), getExpectedVal())
assert.NoError(t, err)
return fileSink.Close()
}

func getExpectedVal() []models.Record {
Expand Down
46 changes: 0 additions & 46 deletions plugins/sinks/file/test-dir/sample.json

This file was deleted.

2 changes: 2 additions & 0 deletions plugins/sinks/file/test-dir/sample.ndjson
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"resource":{"urn":"elasticsearch.index1","name":"index1","type":"table"},"profile":{"total_rows":1},"schema":{"columns":[{"name":"SomeInt","data_type":"long"},{"name":"SomeStr","data_type":"text"}]}}
{"resource":{"urn":"elasticsearch.index2","name":"index2","type":"table"},"profile":{"total_rows":1},"schema":{"columns":[{"name":"SomeInt","data_type":"long"},{"name":"SomeStr","data_type":"text"}]}}