Skip to content
Merged
31 changes: 19 additions & 12 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,11 @@ func (r *Agent) Run(ctx context.Context, recipe recipe.Recipe) (run Run) {
}

for _, sr := range recipe.Sinks {
sink, err := r.setupSink(ctx, sr, stream, recipe)
err := r.setupSink(ctx, sr, stream, recipe)
if err != nil {
run.Error = errors.Wrap(err, "failed to setup sink")
return
}
defer sink.Close()
}

// to gather total number of records extracted
Expand Down Expand Up @@ -235,14 +234,15 @@ func (r *Agent) setupProcessor(ctx context.Context, pr recipe.PluginRecipe, str
return
}

func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *stream, recipe recipe.Recipe) (sink plugins.Syncer, err error) {
func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *stream, recipe recipe.Recipe) (err error) {
var sink plugins.Syncer

if sink, err = r.sinkFactory.Get(sr.Name); err != nil {
return nil, errors.Wrapf(err, "could not find sink \"%s\"", sr.Name)
return errors.Wrapf(err, "could not find sink \"%s\"", sr.Name)
}
if err = sink.Init(ctx, sr.Config); err != nil {
return nil, errors.Wrapf(err, "could not initiate sink \"%s\"", sr.Name)
return errors.Wrapf(err, "could not initiate sink \"%s\"", sr.Name)
}

retryNotification := func(e error, d time.Duration) {
r.logger.Info(
fmt.Sprintf("retrying sink in %d", d),
Expand All @@ -255,21 +255,28 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s
return err
}, retryNotification)

// error (after exhausted retries) will just be skipped and logged
var success bool
if err != nil {
// once it reaches here, it means that the retry has been exhausted and still got error
success = false
r.logger.Error("error running sink", "sink", sr.Name, "error", err.Error())
if !r.stopOnSinkError {
err = nil
}
return err
} else {
success = true
r.logger.Info("Successfully published record", "sink", sr.Name, "recipe", recipe.Name)
}
r.logger.Info("Successfully published record", "sink", sr.Name, "recipe", recipe.Name)

r.monitor.RecordPlugin(recipe.Name, sr.Name, "sink", success)

if !r.stopOnSinkError {
err = nil
}
// TODO: create a new error to signal stopping stream.
// returning nil so stream wont stop.
return err
}, defaultBatchSize)

//TODO: the sink closes even though some records remain unpublished
//TODO: once fixed, file sink's Close needs to close *File
stream.onClose(func() {
if err = sink.Close(); err != nil {
r.logger.Warn("error closing sink", "sink", sr.Name, "error", err)
Expand Down
9 changes: 9 additions & 0 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ func TestAgentRun(t *testing.T) {

monitor := newMockMonitor()
monitor.On("RecordRun", mock.AnythingOfType("agent.Run")).Once()
monitor.On("RecordPlugin", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("bool"))
defer monitor.AssertExpectations(t)

r := agent.NewAgent(agent.Config{
Expand Down Expand Up @@ -588,6 +589,7 @@ func TestAgentRun(t *testing.T) {

monitor := newMockMonitor()
monitor.On("RecordRun", mock.AnythingOfType("agent.Run")).Once()
monitor.On("RecordPlugin", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("bool"))
defer monitor.AssertExpectations(t)

r := agent.NewAgent(agent.Config{
Expand Down Expand Up @@ -645,6 +647,7 @@ func TestAgentRun(t *testing.T) {

monitor := newMockMonitor()
monitor.On("RecordRun", mock.AnythingOfType("agent.Run")).Once()
monitor.On("RecordPlugin", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("bool"))
defer monitor.AssertExpectations(t)

r := agent.NewAgent(agent.Config{
Expand Down Expand Up @@ -700,6 +703,7 @@ func TestAgentRun(t *testing.T) {

monitor := newMockMonitor()
monitor.On("RecordRun", mock.AnythingOfType("agent.Run")).Once()
monitor.On("RecordPlugin", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("bool"))
defer monitor.AssertExpectations(t)

r := agent.NewAgent(agent.Config{
Expand Down Expand Up @@ -757,6 +761,7 @@ func TestAgentRunMultiple(t *testing.T) {

monitor := newMockMonitor()
monitor.On("RecordRun", mock.AnythingOfType("agent.Run"))
monitor.On("RecordPlugin", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("bool"))
defer monitor.AssertExpectations(t)

r := agent.NewAgent(agent.Config{
Expand Down Expand Up @@ -891,6 +896,10 @@ func (m *mockMonitor) RecordRun(run agent.Run) {
m.Called(run)
}

func (m *mockMonitor) RecordPlugin(recipeName, pluginName, pluginType string, success bool) {
m.Called(recipeName, pluginName, pluginType, success)
}

type panicExtractor struct {
mocks.Extractor
}
Expand Down
4 changes: 4 additions & 0 deletions agent/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
// Monitor is the interface for monitoring the agent.
type Monitor interface {
RecordRun(run Run)
RecordPlugin(recipeName, pluginName, pluginType string, success bool)
}

// defaultMonitor is the default implementation of Monitor.
Expand All @@ -15,6 +16,9 @@ type defaultMonitor struct{}
func (m *defaultMonitor) RecordRun(run Run) {
}

func (m *defaultMonitor) RecordPlugin(recipeName, pluginName, pluginType string, success bool) {
}

func isNilMonitor(monitor Monitor) bool {
v := reflect.ValueOf(monitor)
return !v.IsValid() || reflect.DeepEqual(v.Interface(), reflect.Zero(v.Type()).Interface())
Expand Down
18 changes: 17 additions & 1 deletion metrics/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ import (
"github.com/odpf/meteor/recipe"
)

var (
const (
runDurationMetricName = "runDuration"
runRecordCountMetricName = "runRecordCount"
runMetricName = "run"
pluginRunMetricName = "runPlugin"
)

// StatsdMonitor represents the statsd monitor.
Expand Down Expand Up @@ -47,6 +48,21 @@ func (m *StatsdMonitor) RecordRun(run agent.Run) {
)
}

// RecordPlugin records a individual plugin behavior in a run
func (m *StatsdMonitor) RecordPlugin(recipeName, pluginName, pluginType string, success bool) {
m.client.Increment(
fmt.Sprintf(
"%s.%s,recipe_name=%s,name=%s,type=%s,success=%t",
m.prefix,
pluginRunMetricName,
recipeName,
pluginName,
pluginType,
success,
),
)
}

// createMetricName creates a metric name for a given recipe and success
func (m *StatsdMonitor) createMetricName(metricName string, recipe recipe.Recipe, success bool, recordCount int) string {
var successText = "false"
Expand Down
60 changes: 60 additions & 0 deletions metrics/statsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,66 @@ func TestStatsdMonitorRecordRun(t *testing.T) {
})
}

func TestStatsdMonitorRecordPlugin(t *testing.T) {
statsdPrefix := "testprefix"

t.Run("should create metrics with the correct name and value", func(t *testing.T) {
recipe := recipe.Recipe{
Name: "test-recipe",
Source: recipe.PluginRecipe{
Name: "mysql",
},
Sinks: []recipe.PluginRecipe{
{Name: "test-sink"},
},
}
incrementMetric := fmt.Sprintf(
"%s.%s,recipe_name=%s,name=%s,type=%s,success=%t",
statsdPrefix,
"runPlugin",
recipe.Name,
recipe.Sinks[0].Name,
"sink",
false,
)

client := new(mockStatsdClient)
client.On("Increment", incrementMetric)
defer client.AssertExpectations(t)

monitor := metrics.NewStatsdMonitor(client, statsdPrefix)
monitor.RecordPlugin(recipe.Name, recipe.Sinks[0].Name, "sink", false)
})

t.Run("should set success field to true on success", func(t *testing.T) {
recipe := recipe.Recipe{
Name: "test-recipe",
Source: recipe.PluginRecipe{
Name: "bigquery",
},
Sinks: []recipe.PluginRecipe{
{Name: "test-sink"},
},
}
incrementMetric := fmt.Sprintf(
"%s.%s,recipe_name=%s,name=%s,type=%s,success=%t",
statsdPrefix,
"runPlugin",
recipe.Name,
recipe.Sinks[0].Name,
"sink",
true,
)

client := new(mockStatsdClient)
client.On("Increment", incrementMetric)
defer client.AssertExpectations(t)

monitor := metrics.NewStatsdMonitor(client, statsdPrefix)
monitor.RecordPlugin(recipe.Name, recipe.Sinks[0].Name, "sink", true)
})
}

func TestNewStatsClient(t *testing.T) {
t.Run("should return error for invalid address", func(t *testing.T) {
_, err := metrics.NewStatsdClient("127.0.0.1")
Expand Down
1 change: 1 addition & 0 deletions plugins/sinks/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func (s *Sink) Sink(ctx context.Context, batch []models.Record) (err error) {
}

func (s *Sink) Close() (err error) {
// return s.File.Close()
return nil
}

Expand Down