Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion docs/docs/concepts/sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ sinks:

Print metadata to stdout.

* **File**

```yaml
sinks:
name: file
config:
path: "./dir/sample.yaml"
format: "yaml"
```

Sinks metadata to a file in `json/yaml` format as per the config defined.

## Upcoming sinks

* HTTP
Expand Down Expand Up @@ -66,4 +78,3 @@ sink:
```

More info about available sinks can be found [here](../reference/sinks.md).

5 changes: 3 additions & 2 deletions docs/docs/example/README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
# Example

## Running recipe with dynamic variable
```

```cli
export METEOR_KAFKA_BROKER=localhost:9092
meteor run ./kafka-console.yaml
```

This recipe tells meteor to fetch kafka metadata from broker defined by `METEOR_KAFKA_BROKER` envvar which will be translated to `kafka_broker` in recipe. ([learn more about dynamic recipe](../docs/concepts/recipe.md#dynamic-recipe-value))
This recipe tells meteor to fetch kafka metadata from broker defined by `METEOR_KAFKA_BROKER` envvar which will be translated to `kafka_broker` in recipe. ([learn more about dynamic recipe](../docs/concepts/recipe.md#dynamic-recipe-value))

[enrich](../docs/reference/processors.md#enrich) processor is also added in the `processors` list in the recipe which will enrich metadata fetched with keyvalues defined in the config.

Expand Down
14 changes: 14 additions & 0 deletions docs/docs/reference/sinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ sinks:
displayName: "resource.name"
```

## File

`file`

Sinks metadata to a file in `json/yaml` format as per the config defined.

```yaml
sinks:
name: file
config:
path: "./dir/sample.yaml"
format: "yaml"
```

_**Notes**_

Compass' Type requires certain fields to be sent, hence why `mapping` config is needed to map value from any of our metadata models to any field name when sending to Compass. Supports getting value from nested fields.
24 changes: 24 additions & 0 deletions plugins/sinks/file/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# file

Sinks metadata to a file in `json/yaml` format as per the config defined.

## Usage

```yaml
sinks:
name: file
config:
path: "./dir/sample.yaml"
format: "yaml"
```

## Config Defination

| Key | Value | Example | Description | |
| :-- | :---- | :------ | :---------- | :-- |
|`path` | `string` | `./dir/sample.yaml` | absolute or relative path from binary to output file, directory should exist| *required*|
| `format` | `string` | `yaml` | data format for the output file | *required* |

## Contributing

Refer to the contribution guidelines for information on contributing to this module.
130 changes: 130 additions & 0 deletions plugins/sinks/file/file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package file

import (
"context"
_ "embed"
"encoding/json"
"fmt"
"io/ioutil"
"strings"

"github.com/odpf/meteor/models"
"github.com/odpf/meteor/plugins"
"github.com/odpf/meteor/registry"
"github.com/odpf/meteor/utils"
"github.com/odpf/salt/log"
"gopkg.in/yaml.v3"
)

//go:embed README.md
var summary string

type Config struct {
Path string `mapstructure:"path" validate:"required"`
Format string `mapstructure:"format" validate:"required"`
}

var sampleConfig = `
path: ./dir/some-dir/postgres_food_app_data.json
format: json
`

type Sink struct {
logger log.Logger
config Config
format string
}

func New() plugins.Syncer {
return new(Sink)
}

func (s *Sink) Info() plugins.Info {
return plugins.Info{
Description: "save output to a file",
SampleConfig: sampleConfig,
Summary: summary,
Tags: []string{"file", "json", "yaml", "sink"},
}
}

func (s *Sink) Validate(configMap map[string]interface{}) (err error) {
return utils.BuildConfig(configMap, &Config{})
}

func (s *Sink) Init(ctx context.Context, config map[string]interface{}) (err error) {
if err := utils.BuildConfig(config, &s.config); err != nil {
return plugins.InvalidConfigError{Type: "sink", PluginName: "file"}
}

if err := s.validateFilePath(s.config.Path); err != nil {
return err
}
s.format = s.config.Format
return
}

func (s *Sink) Sink(ctx context.Context, batch []models.Record) (err error) {
var data []models.Metadata
for _, record := range batch {
data = append(data, record.Data())
}
if s.format == "json" {
err := s.jsonOut(data)
if err != nil {
return err
}
return nil
}
err = s.yamlOut(data)
if err != nil {
return err
}
return nil
}

func (s *Sink) Close() (err error) { return }

func (s *Sink) jsonOut(data []models.Metadata) error {
jsnBy, err := json.MarshalIndent(data, "", " ")
if err != nil {
return err
}
err = ioutil.WriteFile(s.config.Path, jsnBy, 0644)
if err != nil {
return err
}
return nil
}

func (s *Sink) yamlOut(data []models.Metadata) error {
ymlByte, err := yaml.Marshal(data)
if err != nil {
return err
}
err = ioutil.WriteFile(s.config.Path, ymlByte, 0644)
if err != nil {
return err
}
return nil
}

func (s *Sink) validateFilePath(path string) error {
dirs := strings.Split(path, "/")
filename := dirs[len(dirs)-1]
format := strings.Split(filename, ".")
if len(format) != 2 {
return fmt.Errorf("invalid filename")
}
return nil
}

func init() {
if err := registry.Sinks.Register("file", func() plugins.Syncer {
return &Sink{
logger: plugins.GetLog(),
}
}); err != nil {
panic(err)
}
}
153 changes: 153 additions & 0 deletions plugins/sinks/file/file_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package file_test

import (
"context"
_ "embed"
"testing"

"github.com/odpf/meteor/models"
commonv1beta1 "github.com/odpf/meteor/models/odpf/assets/common/v1beta1"
facetsv1beta1 "github.com/odpf/meteor/models/odpf/assets/facets/v1beta1"
assetsv1beta1 "github.com/odpf/meteor/models/odpf/assets/v1beta1"
"github.com/odpf/meteor/plugins"
f "github.com/odpf/meteor/plugins/sinks/file"
"github.com/stretchr/testify/assert"
)

//go:embed README.md
var summary string

var validConfig = map[string]interface{}{
"path": "./test-dir/sample.json",
"format": "json",
}

func TestValidate(t *testing.T) {
t.Run("should return error on invalid config", func(t *testing.T) {
invalidConfig := map[string]interface{}{}
fileSink := f.New()
err := fileSink.Validate(invalidConfig)
assert.Error(t, err)
})
}

func TestInit(t *testing.T) {
t.Run("should return InvalidConfigError on invalid config", func(t *testing.T) {
invalidConfig := map[string]interface{}{}
fileSink := f.New()
err := fileSink.Init(context.TODO(), invalidConfig)
assert.Equal(t, plugins.InvalidConfigError{Type: "sink", PluginName: "file"}, err)
})
t.Run("should return error on filename missing", func(t *testing.T) {
invalidConfig := map[string]interface{}{
"path": "./some-dir",
"format": "json",
}
fileSink := f.New()
err := fileSink.Init(context.TODO(), invalidConfig)
assert.Error(t, err)
})
// t.Run("should return error on invalid file format extension", func(t *testing.T) {
// invalidConfig := map[string]interface{}{
// "path": "./sample.txt",
// "format": "json",
// }
// fileSink := f.New()
// err := fileSink.Init(context.TODO(), invalidConfig)
// assert.Error(t, err)
// })
t.Run("should return no error on valid config", func(t *testing.T) {
fileSink := f.New()
err := fileSink.Init(context.TODO(), validConfig)
assert.NoError(t, err)
})
}

func TestMain(t *testing.T) {
t.Run("should return no error with for valid json config", func(t *testing.T) {
assert.NoError(t, sinkValidSetup(t, validConfig))
})
t.Run("should return no error with for valid yaml config", func(t *testing.T) {
config := map[string]interface{}{
"path": "./test-dir/sample.yaml",
"format": "yaml",
}
assert.NoError(t, sinkValidSetup(t, config))

})
t.Run("should return error for invalid directory in yaml", func(t *testing.T) {
config := map[string]interface{}{
"path": "./test-dir/some-dir/sample.yaml",
"format": "yaml",
}
assert.Error(t, sinkValidSetup(t, config))
})
t.Run("should return error for invalid directory in json", func(t *testing.T) {
config := map[string]interface{}{
"path": "./test-dir/some-dir/sample.json",
"format": "json",
}
assert.Error(t, sinkValidSetup(t, config))
})
}

func TestInfo(t *testing.T) {
info := f.New().Info()
assert.Equal(t, summary, info.Summary)
}

func sinkValidSetup(t *testing.T, config map[string]interface{}) error {
fileSink := f.New()
err := fileSink.Init(context.TODO(), config)
assert.NoError(t, err)
return fileSink.Sink(context.TODO(), getExpectedVal())
}

func getExpectedVal() []models.Record {
return []models.Record{
models.NewRecord(&assetsv1beta1.Table{
Resource: &commonv1beta1.Resource{
Urn: "elasticsearch.index1",
Name: "index1",
Type: "table",
},
Schema: &facetsv1beta1.Columns{
Columns: []*facetsv1beta1.Column{
{
Name: "SomeInt",
DataType: "long",
},
{
Name: "SomeStr",
DataType: "text",
},
},
},
Profile: &assetsv1beta1.TableProfile{
TotalRows: 1,
},
}),
models.NewRecord(&assetsv1beta1.Table{
Resource: &commonv1beta1.Resource{
Urn: "elasticsearch.index2",
Name: "index2",
Type: "table",
},
Schema: &facetsv1beta1.Columns{
Columns: []*facetsv1beta1.Column{
{
Name: "SomeInt",
DataType: "long",
},
{
Name: "SomeStr",
DataType: "text",
},
},
},
Profile: &assetsv1beta1.TableProfile{
TotalRows: 1,
},
}),
}
}
Loading