Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cmd/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ import (

"github.com/MakeNowJust/heredoc"
"github.com/odpf/meteor/recipe"
"github.com/odpf/salt/log"
"github.com/spf13/cobra"
"gopkg.in/yaml.v3"
)

// GenCmd creates a command object for the "gen" action
func GenCmd(lg log.Logger) *cobra.Command {
func GenCmd() *cobra.Command {
var (
outputDirPath string
dataFilePath string
Expand Down
3 changes: 1 addition & 2 deletions cmd/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ import (
"github.com/AlecAivazis/survey/v2"
"github.com/MakeNowJust/heredoc"
"github.com/odpf/meteor/registry"
"github.com/odpf/salt/log"
"github.com/odpf/salt/printer"
"github.com/odpf/salt/term"
"github.com/spf13/cobra"
)

// InfoCmd creates a command object for get info about a plugin
func InfoCmd(lg log.Logger) *cobra.Command {
func InfoCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "info <command>",
Short: "Display plugin information",
Expand Down
17 changes: 11 additions & 6 deletions cmd/lint.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,20 @@ import (
"fmt"
"os"

"github.com/odpf/meteor/plugins"

"github.com/MakeNowJust/heredoc"
"github.com/odpf/meteor/agent"
"github.com/odpf/meteor/metrics"
"github.com/odpf/meteor/config"
"github.com/odpf/meteor/plugins"
"github.com/odpf/meteor/recipe"
"github.com/odpf/meteor/registry"
"github.com/odpf/salt/log"
"github.com/odpf/salt/printer"
"github.com/odpf/salt/term"

"github.com/spf13/cobra"
)

// LintCmd creates a command object for linting recipes
func LintCmd(lg log.Logger, mt *metrics.StatsdMonitor) *cobra.Command {
func LintCmd() *cobra.Command {
var (
report [][]string
success = 0
Expand Down Expand Up @@ -50,12 +48,19 @@ func LintCmd(lg log.Logger, mt *metrics.StatsdMonitor) *cobra.Command {
"group:core": "true",
},
RunE: func(cmd *cobra.Command, args []string) error {
cfg, err := config.Load("./meteor.yaml")
if err != nil {
return err
}

lg := log.NewLogrus(log.LogrusWithLevel(cfg.LogLevel))
plugins.SetLog(lg)

cs := term.NewColorScheme()
runner := agent.NewAgent(agent.Config{
ExtractorFactory: registry.Extractors,
ProcessorFactory: registry.Processors,
SinkFactory: registry.Sinks,
Monitor: mt,
Logger: lg,
})

Expand Down
3 changes: 1 addition & 2 deletions cmd/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@ import (

"github.com/MakeNowJust/heredoc"
"github.com/odpf/meteor/registry"
"github.com/odpf/salt/log"
"github.com/odpf/salt/printer"
"github.com/odpf/salt/term"

"github.com/spf13/cobra"
)

// ListCmd creates a command object for linting recipes
func ListCmd(lg log.Logger) *cobra.Command {
func ListCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "list <command>",
Short: "List available plugins",
Expand Down
3 changes: 1 addition & 2 deletions cmd/new.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ import (
"github.com/MakeNowJust/heredoc"
"github.com/odpf/meteor/generator"
"github.com/odpf/meteor/registry"
"github.com/odpf/salt/log"
"github.com/spf13/cobra"
)

// NewCmd creates a command object for the "new" action
func NewCmd(lg log.Logger) *cobra.Command {
func NewCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "new",
Short: "Bootstrap new recipes",
Expand Down
41 changes: 6 additions & 35 deletions cmd/root.go
Original file line number Diff line number Diff line change
@@ -1,42 +1,13 @@
package cmd

import (
"fmt"
"os"

"github.com/MakeNowJust/heredoc"
"github.com/odpf/meteor/config"
"github.com/odpf/meteor/metrics"
"github.com/odpf/meteor/plugins"
"github.com/odpf/salt/cmdx"
"github.com/odpf/salt/log"
"github.com/spf13/cobra"
)

const exitError = 1

// New adds all child commands to the root command and sets flags appropriately.
func New() *cobra.Command {
cfg, err := config.Load("./meteor.yaml")
if err != nil {
fmt.Printf("ERROR: %s\n", err.Error())
os.Exit(1)
}

lg := log.NewLogrus(log.LogrusWithLevel(cfg.LogLevel))
plugins.SetLog(lg)

// Setup statsd monitor to collect monitoring metrics
var mt *metrics.StatsdMonitor
if cfg.StatsdEnabled {
client, err := metrics.NewStatsdClient(cfg.StatsdHost)
if err != nil {
fmt.Printf("ERROR: %s\n", err.Error())
os.Exit(exitError)
}
mt = metrics.NewStatsdMonitor(client, cfg.StatsdPrefix)
}

var cmd = &cobra.Command{
Use: "meteor <command> <subcommand> [flags]",
Short: "Metadata CLI",
Expand Down Expand Up @@ -66,12 +37,12 @@ func New() *cobra.Command {
cmd.AddCommand(cmdx.SetRefCmd(cmd))

cmd.AddCommand(VersionCmd())
cmd.AddCommand(GenCmd(lg))
cmd.AddCommand(ListCmd(lg))
cmd.AddCommand(InfoCmd(lg))
cmd.AddCommand(RunCmd(lg, mt, cfg))
cmd.AddCommand(LintCmd(lg, mt))
cmd.AddCommand(NewCmd(lg))
cmd.AddCommand(GenCmd())
cmd.AddCommand(ListCmd())
cmd.AddCommand(InfoCmd())
cmd.AddCommand(RunCmd())
cmd.AddCommand(LintCmd())
cmd.AddCommand(NewCmd())

return cmd
}
32 changes: 25 additions & 7 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/odpf/meteor/agent"
"github.com/odpf/meteor/config"
"github.com/odpf/meteor/metrics"
"github.com/odpf/meteor/plugins"
"github.com/odpf/meteor/recipe"
"github.com/odpf/meteor/registry"
"github.com/odpf/salt/log"
Expand All @@ -23,7 +24,7 @@ import (
)

// RunCmd creates a command object for the "run" action.
func RunCmd(lg log.Logger, mt *metrics.StatsdMonitor, cfg config.Config) *cobra.Command {
func RunCmd() *cobra.Command {
var (
report [][]string
pathToConfig string
Expand Down Expand Up @@ -57,12 +58,17 @@ func RunCmd(lg log.Logger, mt *metrics.StatsdMonitor, cfg config.Config) *cobra.
"group:core": "true",
},
RunE: func(cmd *cobra.Command, args []string) error {
if configFile != "" {
var err error
cfg, err = config.Load(configFile)
if err != nil {
return err
}
cfg, err := config.Load(configFile)
if err != nil {
return err
}

lg := log.NewLogrus(log.LogrusWithLevel(cfg.LogLevel))
plugins.SetLog(lg)

mt, err := newStatsdMonitor(cfg)
if err != nil {
return err
}

cs := term.NewColorScheme()
Expand Down Expand Up @@ -135,3 +141,15 @@ func RunCmd(lg log.Logger, mt *metrics.StatsdMonitor, cfg config.Config) *cobra.

return cmd
}

func newStatsdMonitor(cfg config.Config) (*metrics.StatsdMonitor, error) {
if !cfg.StatsdEnabled {
return nil, nil
}

client, err := metrics.NewStatsdClient(cfg.StatsdHost)
if err != nil {
return nil, err
}
return metrics.NewStatsdMonitor(client, cfg.StatsdPrefix), nil
}
20 changes: 11 additions & 9 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package config

import (
"errors"
"log"
"fmt"

"github.com/odpf/salt/config"
)
Expand All @@ -18,15 +18,17 @@ type Config struct {
StopOnSinkError bool `mapstructure:"STOP_ON_SINK_ERROR" default:"false"`
}

func Load(configFile string) (cfg Config, err error) {
err = config.
NewLoader(config.WithFile(configFile)).
func Load(configFile string) (Config, error) {
var cfg Config
err := config.NewLoader(config.WithFile(configFile)).
Load(&cfg)

if errors.As(err, &config.ConfigFileNotFoundError{}) {
log.Println(err)
err = nil
if err != nil {
if errors.As(err, &config.ConfigFileNotFoundError{}) {
fmt.Println(err)
return cfg, nil
}
return Config{}, err
}

return
return cfg, nil
}
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
func main() {
// Execute the root command
root := cmd.New()

cmd, err := root.ExecuteC()

if err == nil {
Expand Down
14 changes: 5 additions & 9 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"context"
"database/sql"
"fmt"
v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2"
"google.golang.org/protobuf/types/known/anypb"
"log"
"net"
"os"
Expand All @@ -17,12 +15,14 @@ import (
"testing"
"time"

v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2"
"google.golang.org/protobuf/types/known/anypb"

"github.com/odpf/meteor/test/utils"
"github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"

"github.com/odpf/meteor/cmd"
"github.com/odpf/meteor/config"
_ "github.com/odpf/meteor/plugins/extractors"
_ "github.com/odpf/meteor/plugins/processors"
_ "github.com/odpf/meteor/plugins/sinks"
Expand Down Expand Up @@ -105,13 +105,9 @@ func TestMySqlToKafka(t *testing.T) {
}
}()

// run mysql_kafka.yml file
cfg, err := config.Load("mysql_kafka.yml")
if err != nil {
t.Error(err)
}
command := cmd.RunCmd(utils.Logger, nil, cfg)
command := cmd.RunCmd()

// run mysql_kafka.yml file
command.SetArgs([]string{"mysql_kafka.yml"})
if err := command.Execute(); err != nil {
if strings.HasPrefix(err.Error(), "unknown command ") {
Expand Down