diff --git a/docs/docs/concepts/sink.md b/docs/docs/concepts/sink.md index 38464c0a3..3595982cc 100644 --- a/docs/docs/concepts/sink.md +++ b/docs/docs/concepts/sink.md @@ -37,6 +37,18 @@ sinks: Print metadata to stdout. +* **File** + +```yaml +sinks: + name: file + config: + path: "./dir/sample.yaml" + format: "yaml" +``` + +Sinks metadata to a file in `json/yaml` format as per the config defined. + ## Upcoming sinks * HTTP @@ -66,4 +78,3 @@ sink: ``` More info about available sinks can be found [here](../reference/sinks.md). - diff --git a/docs/docs/example/README.md b/docs/docs/example/README.md index cff583336..734ef1585 100644 --- a/docs/docs/example/README.md +++ b/docs/docs/example/README.md @@ -1,12 +1,13 @@ # Example ## Running recipe with dynamic variable -``` + +```cli export METEOR_KAFKA_BROKER=localhost:9092 meteor run ./kafka-console.yaml ``` -This recipe tells meteor to fetch kafka metadata from broker defined by `METEOR_KAFKA_BROKER` envvar which will be translated to `kafka_broker` in recipe. ([learn more about dynamic recipe](../docs/concepts/recipe.md#dynamic-recipe-value)) +This recipe tells meteor to fetch kafka metadata from broker defined by `METEOR_KAFKA_BROKER` envvar which will be translated to `kafka_broker` in recipe. ([learn more about dynamic recipe](../docs/concepts/recipe.md#dynamic-recipe-value)) [enrich](../docs/reference/processors.md#enrich) processor is also added in the `processors` list in the recipe which will enrich metadata fetched with keyvalues defined in the config. diff --git a/docs/docs/reference/sinks.md b/docs/docs/reference/sinks.md index 9b0a7e52a..dd06dd35b 100644 --- a/docs/docs/reference/sinks.md +++ b/docs/docs/reference/sinks.md @@ -33,6 +33,20 @@ sinks: displayName: "resource.name" ``` +## File + +`file` + +Sinks metadata to a file in `json/yaml` format as per the config defined. + +```yaml +sinks: + name: file + config: + path: "./dir/sample.yaml" + format: "yaml" +``` + _**Notes**_ Compass' Type requires certain fields to be sent, hence why `mapping` config is needed to map value from any of our metadata models to any field name when sending to Compass. Supports getting value from nested fields. diff --git a/plugins/sinks/file/README.md b/plugins/sinks/file/README.md new file mode 100644 index 000000000..9d8b351bf --- /dev/null +++ b/plugins/sinks/file/README.md @@ -0,0 +1,24 @@ +# file + +Sinks metadata to a file in `json/yaml` format as per the config defined. + +## Usage + +```yaml +sinks: + name: file + config: + path: "./dir/sample.yaml" + format: "yaml" +``` + +## Config Defination + +| Key | Value | Example | Description | | +| :-- | :---- | :------ | :---------- | :-- | +|`path` | `string` | `./dir/sample.yaml` | absolute or relative path from binary to output file, directory should exist| *required*| +| `format` | `string` | `yaml` | data format for the output file | *required* | + +## Contributing + +Refer to the contribution guidelines for information on contributing to this module. diff --git a/plugins/sinks/file/file.go b/plugins/sinks/file/file.go new file mode 100644 index 000000000..a116fe24b --- /dev/null +++ b/plugins/sinks/file/file.go @@ -0,0 +1,130 @@ +package file + +import ( + "context" + _ "embed" + "encoding/json" + "fmt" + "io/ioutil" + "strings" + + "github.com/odpf/meteor/models" + "github.com/odpf/meteor/plugins" + "github.com/odpf/meteor/registry" + "github.com/odpf/meteor/utils" + "github.com/odpf/salt/log" + "gopkg.in/yaml.v3" +) + +//go:embed README.md +var summary string + +type Config struct { + Path string `mapstructure:"path" validate:"required"` + Format string `mapstructure:"format" validate:"required"` +} + +var sampleConfig = ` +path: ./dir/some-dir/postgres_food_app_data.json +format: json +` + +type Sink struct { + logger log.Logger + config Config + format string +} + +func New() plugins.Syncer { + return new(Sink) +} + +func (s *Sink) Info() plugins.Info { + return plugins.Info{ + Description: "save output to a file", + SampleConfig: sampleConfig, + Summary: summary, + Tags: []string{"file", "json", "yaml", "sink"}, + } +} + +func (s *Sink) Validate(configMap map[string]interface{}) (err error) { + return utils.BuildConfig(configMap, &Config{}) +} + +func (s *Sink) Init(ctx context.Context, config map[string]interface{}) (err error) { + if err := utils.BuildConfig(config, &s.config); err != nil { + return plugins.InvalidConfigError{Type: "sink", PluginName: "file"} + } + + if err := s.validateFilePath(s.config.Path); err != nil { + return err + } + s.format = s.config.Format + return +} + +func (s *Sink) Sink(ctx context.Context, batch []models.Record) (err error) { + var data []models.Metadata + for _, record := range batch { + data = append(data, record.Data()) + } + if s.format == "json" { + err := s.jsonOut(data) + if err != nil { + return err + } + return nil + } + err = s.yamlOut(data) + if err != nil { + return err + } + return nil +} + +func (s *Sink) Close() (err error) { return } + +func (s *Sink) jsonOut(data []models.Metadata) error { + jsnBy, err := json.MarshalIndent(data, "", " ") + if err != nil { + return err + } + err = ioutil.WriteFile(s.config.Path, jsnBy, 0644) + if err != nil { + return err + } + return nil +} + +func (s *Sink) yamlOut(data []models.Metadata) error { + ymlByte, err := yaml.Marshal(data) + if err != nil { + return err + } + err = ioutil.WriteFile(s.config.Path, ymlByte, 0644) + if err != nil { + return err + } + return nil +} + +func (s *Sink) validateFilePath(path string) error { + dirs := strings.Split(path, "/") + filename := dirs[len(dirs)-1] + format := strings.Split(filename, ".") + if len(format) != 2 { + return fmt.Errorf("invalid filename") + } + return nil +} + +func init() { + if err := registry.Sinks.Register("file", func() plugins.Syncer { + return &Sink{ + logger: plugins.GetLog(), + } + }); err != nil { + panic(err) + } +} diff --git a/plugins/sinks/file/file_test.go b/plugins/sinks/file/file_test.go new file mode 100644 index 000000000..05ca29fb8 --- /dev/null +++ b/plugins/sinks/file/file_test.go @@ -0,0 +1,153 @@ +package file_test + +import ( + "context" + _ "embed" + "testing" + + "github.com/odpf/meteor/models" + commonv1beta1 "github.com/odpf/meteor/models/odpf/assets/common/v1beta1" + facetsv1beta1 "github.com/odpf/meteor/models/odpf/assets/facets/v1beta1" + assetsv1beta1 "github.com/odpf/meteor/models/odpf/assets/v1beta1" + "github.com/odpf/meteor/plugins" + f "github.com/odpf/meteor/plugins/sinks/file" + "github.com/stretchr/testify/assert" +) + +//go:embed README.md +var summary string + +var validConfig = map[string]interface{}{ + "path": "./test-dir/sample.json", + "format": "json", +} + +func TestValidate(t *testing.T) { + t.Run("should return error on invalid config", func(t *testing.T) { + invalidConfig := map[string]interface{}{} + fileSink := f.New() + err := fileSink.Validate(invalidConfig) + assert.Error(t, err) + }) +} + +func TestInit(t *testing.T) { + t.Run("should return InvalidConfigError on invalid config", func(t *testing.T) { + invalidConfig := map[string]interface{}{} + fileSink := f.New() + err := fileSink.Init(context.TODO(), invalidConfig) + assert.Equal(t, plugins.InvalidConfigError{Type: "sink", PluginName: "file"}, err) + }) + t.Run("should return error on filename missing", func(t *testing.T) { + invalidConfig := map[string]interface{}{ + "path": "./some-dir", + "format": "json", + } + fileSink := f.New() + err := fileSink.Init(context.TODO(), invalidConfig) + assert.Error(t, err) + }) + // t.Run("should return error on invalid file format extension", func(t *testing.T) { + // invalidConfig := map[string]interface{}{ + // "path": "./sample.txt", + // "format": "json", + // } + // fileSink := f.New() + // err := fileSink.Init(context.TODO(), invalidConfig) + // assert.Error(t, err) + // }) + t.Run("should return no error on valid config", func(t *testing.T) { + fileSink := f.New() + err := fileSink.Init(context.TODO(), validConfig) + assert.NoError(t, err) + }) +} + +func TestMain(t *testing.T) { + t.Run("should return no error with for valid json config", func(t *testing.T) { + assert.NoError(t, sinkValidSetup(t, validConfig)) + }) + t.Run("should return no error with for valid yaml config", func(t *testing.T) { + config := map[string]interface{}{ + "path": "./test-dir/sample.yaml", + "format": "yaml", + } + assert.NoError(t, sinkValidSetup(t, config)) + + }) + t.Run("should return error for invalid directory in yaml", func(t *testing.T) { + config := map[string]interface{}{ + "path": "./test-dir/some-dir/sample.yaml", + "format": "yaml", + } + assert.Error(t, sinkValidSetup(t, config)) + }) + t.Run("should return error for invalid directory in json", func(t *testing.T) { + config := map[string]interface{}{ + "path": "./test-dir/some-dir/sample.json", + "format": "json", + } + assert.Error(t, sinkValidSetup(t, config)) + }) +} + +func TestInfo(t *testing.T) { + info := f.New().Info() + assert.Equal(t, summary, info.Summary) +} + +func sinkValidSetup(t *testing.T, config map[string]interface{}) error { + fileSink := f.New() + err := fileSink.Init(context.TODO(), config) + assert.NoError(t, err) + return fileSink.Sink(context.TODO(), getExpectedVal()) +} + +func getExpectedVal() []models.Record { + return []models.Record{ + models.NewRecord(&assetsv1beta1.Table{ + Resource: &commonv1beta1.Resource{ + Urn: "elasticsearch.index1", + Name: "index1", + Type: "table", + }, + Schema: &facetsv1beta1.Columns{ + Columns: []*facetsv1beta1.Column{ + { + Name: "SomeInt", + DataType: "long", + }, + { + Name: "SomeStr", + DataType: "text", + }, + }, + }, + Profile: &assetsv1beta1.TableProfile{ + TotalRows: 1, + }, + }), + models.NewRecord(&assetsv1beta1.Table{ + Resource: &commonv1beta1.Resource{ + Urn: "elasticsearch.index2", + Name: "index2", + Type: "table", + }, + Schema: &facetsv1beta1.Columns{ + Columns: []*facetsv1beta1.Column{ + { + Name: "SomeInt", + DataType: "long", + }, + { + Name: "SomeStr", + DataType: "text", + }, + }, + }, + Profile: &assetsv1beta1.TableProfile{ + TotalRows: 1, + }, + }), + } +} diff --git a/plugins/sinks/file/test-dir/sample.json b/plugins/sinks/file/test-dir/sample.json new file mode 100644 index 000000000..bb9223ea9 --- /dev/null +++ b/plugins/sinks/file/test-dir/sample.json @@ -0,0 +1,46 @@ +[ + { + "resource": { + "urn": "elasticsearch.index1", + "name": "index1", + "type": "table" + }, + "profile": { + "total_rows": 1 + }, + "schema": { + "columns": [ + { + "name": "SomeInt", + "data_type": "long" + }, + { + "name": "SomeStr", + "data_type": "text" + } + ] + } + }, + { + "resource": { + "urn": "elasticsearch.index2", + "name": "index2", + "type": "table" + }, + "profile": { + "total_rows": 1 + }, + "schema": { + "columns": [ + { + "name": "SomeInt", + "data_type": "long" + }, + { + "name": "SomeStr", + "data_type": "text" + } + ] + } + } +] \ No newline at end of file diff --git a/plugins/sinks/file/test-dir/sample.yaml b/plugins/sinks/file/test-dir/sample.yaml new file mode 100644 index 000000000..7383129a1 --- /dev/null +++ b/plugins/sinks/file/test-dir/sample.yaml @@ -0,0 +1,72 @@ +- resource: + urn: elasticsearch.index1 + name: index1 + service: "" + type: table + url: "" + description: "" + profile: + totalrows: 1 + partitionkey: "" + partitionvalue: "" + usagecount: 0 + joins: [] + filters: [] + schema: + columns: + - name: SomeInt + description: "" + datatype: long + isnullable: false + length: 0 + profile: null + properties: null + - name: SomeStr + description: "" + datatype: text + isnullable: false + length: 0 + profile: null + properties: null + preview: null + ownership: null + lineage: null + properties: null + timestamps: null + event: null +- resource: + urn: elasticsearch.index2 + name: index2 + service: "" + type: table + url: "" + description: "" + profile: + totalrows: 1 + partitionkey: "" + partitionvalue: "" + usagecount: 0 + joins: [] + filters: [] + schema: + columns: + - name: SomeInt + description: "" + datatype: long + isnullable: false + length: 0 + profile: null + properties: null + - name: SomeStr + description: "" + datatype: text + isnullable: false + length: 0 + profile: null + properties: null + preview: null + ownership: null + lineage: null + properties: null + timestamps: null + event: null diff --git a/plugins/sinks/populate.go b/plugins/sinks/populate.go index dfcab7c22..a1dfa9dab 100644 --- a/plugins/sinks/populate.go +++ b/plugins/sinks/populate.go @@ -3,5 +3,6 @@ package sinks import ( _ "github.com/odpf/meteor/plugins/sinks/compass" _ "github.com/odpf/meteor/plugins/sinks/console" + _ "github.com/odpf/meteor/plugins/sinks/file" _ "github.com/odpf/meteor/plugins/sinks/kafka" )