diff --git a/agent/agent.go b/agent/agent.go index ee565a957..ff038ff13 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -142,12 +142,11 @@ func (r *Agent) Run(ctx context.Context, recipe recipe.Recipe) (run Run) { } for _, sr := range recipe.Sinks { - sink, err := r.setupSink(ctx, sr, stream, recipe) + err := r.setupSink(ctx, sr, stream, recipe) if err != nil { run.Error = errors.Wrap(err, "failed to setup sink") return } - defer sink.Close() } // to gather total number of records extracted @@ -235,14 +234,15 @@ func (r *Agent) setupProcessor(ctx context.Context, pr recipe.PluginRecipe, str return } -func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *stream, recipe recipe.Recipe) (sink plugins.Syncer, err error) { +func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *stream, recipe recipe.Recipe) (err error) { + var sink plugins.Syncer + if sink, err = r.sinkFactory.Get(sr.Name); err != nil { - return nil, errors.Wrapf(err, "could not find sink \"%s\"", sr.Name) + return errors.Wrapf(err, "could not find sink \"%s\"", sr.Name) } if err = sink.Init(ctx, sr.Config); err != nil { - return nil, errors.Wrapf(err, "could not initiate sink \"%s\"", sr.Name) + return errors.Wrapf(err, "could not initiate sink \"%s\"", sr.Name) } - retryNotification := func(e error, d time.Duration) { r.logger.Info( fmt.Sprintf("retrying sink in %d", d), @@ -255,21 +255,28 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s return err }, retryNotification) - // error (after exhausted retries) will just be skipped and logged + var success bool if err != nil { + // once it reaches here, it means that the retry has been exhausted and still got error + success = false r.logger.Error("error running sink", "sink", sr.Name, "error", err.Error()) - if !r.stopOnSinkError { - err = nil - } - return err + } else { + success = true + r.logger.Info("Successfully published record", "sink", sr.Name, "recipe", recipe.Name) } - r.logger.Info("Successfully published record", "sink", sr.Name, "recipe", recipe.Name) + r.monitor.RecordPlugin(recipe.Name, sr.Name, "sink", success) + + if !r.stopOnSinkError { + err = nil + } // TODO: create a new error to signal stopping stream. // returning nil so stream wont stop. return err }, defaultBatchSize) + //TODO: the sink closes even though some records remain unpublished + //TODO: once fixed, file sink's Close needs to close *File stream.onClose(func() { if err = sink.Close(); err != nil { r.logger.Warn("error closing sink", "sink", sr.Name, "error", err) diff --git a/agent/agent_test.go b/agent/agent_test.go index 9370ccc6e..fd27c4f8c 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -484,6 +484,7 @@ func TestAgentRun(t *testing.T) { monitor := newMockMonitor() monitor.On("RecordRun", mock.AnythingOfType("agent.Run")).Once() + monitor.On("RecordPlugin", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("bool")) defer monitor.AssertExpectations(t) r := agent.NewAgent(agent.Config{ @@ -588,6 +589,7 @@ func TestAgentRun(t *testing.T) { monitor := newMockMonitor() monitor.On("RecordRun", mock.AnythingOfType("agent.Run")).Once() + monitor.On("RecordPlugin", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("bool")) defer monitor.AssertExpectations(t) r := agent.NewAgent(agent.Config{ @@ -645,6 +647,7 @@ func TestAgentRun(t *testing.T) { monitor := newMockMonitor() monitor.On("RecordRun", mock.AnythingOfType("agent.Run")).Once() + monitor.On("RecordPlugin", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("bool")) defer monitor.AssertExpectations(t) r := agent.NewAgent(agent.Config{ @@ -700,6 +703,7 @@ func TestAgentRun(t *testing.T) { monitor := newMockMonitor() monitor.On("RecordRun", mock.AnythingOfType("agent.Run")).Once() + monitor.On("RecordPlugin", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("bool")) defer monitor.AssertExpectations(t) r := agent.NewAgent(agent.Config{ @@ -757,6 +761,7 @@ func TestAgentRunMultiple(t *testing.T) { monitor := newMockMonitor() monitor.On("RecordRun", mock.AnythingOfType("agent.Run")) + monitor.On("RecordPlugin", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("bool")) defer monitor.AssertExpectations(t) r := agent.NewAgent(agent.Config{ @@ -891,6 +896,10 @@ func (m *mockMonitor) RecordRun(run agent.Run) { m.Called(run) } +func (m *mockMonitor) RecordPlugin(recipeName, pluginName, pluginType string, success bool) { + m.Called(recipeName, pluginName, pluginType, success) +} + type panicExtractor struct { mocks.Extractor } diff --git a/agent/monitor.go b/agent/monitor.go index bfb5253d2..2623197f9 100644 --- a/agent/monitor.go +++ b/agent/monitor.go @@ -7,6 +7,7 @@ import ( // Monitor is the interface for monitoring the agent. type Monitor interface { RecordRun(run Run) + RecordPlugin(recipeName, pluginName, pluginType string, success bool) } // defaultMonitor is the default implementation of Monitor. @@ -15,6 +16,9 @@ type defaultMonitor struct{} func (m *defaultMonitor) RecordRun(run Run) { } +func (m *defaultMonitor) RecordPlugin(recipeName, pluginName, pluginType string, success bool) { +} + func isNilMonitor(monitor Monitor) bool { v := reflect.ValueOf(monitor) return !v.IsValid() || reflect.DeepEqual(v.Interface(), reflect.Zero(v.Type()).Interface()) diff --git a/metrics/statsd.go b/metrics/statsd.go index 9cd914fe2..cb69852bf 100644 --- a/metrics/statsd.go +++ b/metrics/statsd.go @@ -12,10 +12,11 @@ import ( "github.com/odpf/meteor/recipe" ) -var ( +const ( runDurationMetricName = "runDuration" runRecordCountMetricName = "runRecordCount" runMetricName = "run" + pluginRunMetricName = "runPlugin" ) // StatsdMonitor represents the statsd monitor. @@ -47,6 +48,21 @@ func (m *StatsdMonitor) RecordRun(run agent.Run) { ) } +// RecordPlugin records a individual plugin behavior in a run +func (m *StatsdMonitor) RecordPlugin(recipeName, pluginName, pluginType string, success bool) { + m.client.Increment( + fmt.Sprintf( + "%s.%s,recipe_name=%s,name=%s,type=%s,success=%t", + m.prefix, + pluginRunMetricName, + recipeName, + pluginName, + pluginType, + success, + ), + ) +} + // createMetricName creates a metric name for a given recipe and success func (m *StatsdMonitor) createMetricName(metricName string, recipe recipe.Recipe, success bool, recordCount int) string { var successText = "false" diff --git a/metrics/statsd_test.go b/metrics/statsd_test.go index 409688674..b808a38ee 100644 --- a/metrics/statsd_test.go +++ b/metrics/statsd_test.go @@ -166,6 +166,66 @@ func TestStatsdMonitorRecordRun(t *testing.T) { }) } +func TestStatsdMonitorRecordPlugin(t *testing.T) { + statsdPrefix := "testprefix" + + t.Run("should create metrics with the correct name and value", func(t *testing.T) { + recipe := recipe.Recipe{ + Name: "test-recipe", + Source: recipe.PluginRecipe{ + Name: "mysql", + }, + Sinks: []recipe.PluginRecipe{ + {Name: "test-sink"}, + }, + } + incrementMetric := fmt.Sprintf( + "%s.%s,recipe_name=%s,name=%s,type=%s,success=%t", + statsdPrefix, + "runPlugin", + recipe.Name, + recipe.Sinks[0].Name, + "sink", + false, + ) + + client := new(mockStatsdClient) + client.On("Increment", incrementMetric) + defer client.AssertExpectations(t) + + monitor := metrics.NewStatsdMonitor(client, statsdPrefix) + monitor.RecordPlugin(recipe.Name, recipe.Sinks[0].Name, "sink", false) + }) + + t.Run("should set success field to true on success", func(t *testing.T) { + recipe := recipe.Recipe{ + Name: "test-recipe", + Source: recipe.PluginRecipe{ + Name: "bigquery", + }, + Sinks: []recipe.PluginRecipe{ + {Name: "test-sink"}, + }, + } + incrementMetric := fmt.Sprintf( + "%s.%s,recipe_name=%s,name=%s,type=%s,success=%t", + statsdPrefix, + "runPlugin", + recipe.Name, + recipe.Sinks[0].Name, + "sink", + true, + ) + + client := new(mockStatsdClient) + client.On("Increment", incrementMetric) + defer client.AssertExpectations(t) + + monitor := metrics.NewStatsdMonitor(client, statsdPrefix) + monitor.RecordPlugin(recipe.Name, recipe.Sinks[0].Name, "sink", true) + }) +} + func TestNewStatsClient(t *testing.T) { t.Run("should return error for invalid address", func(t *testing.T) { _, err := metrics.NewStatsdClient("127.0.0.1") diff --git a/plugins/sinks/file/file.go b/plugins/sinks/file/file.go index 275baad24..2b02f8e37 100644 --- a/plugins/sinks/file/file.go +++ b/plugins/sinks/file/file.go @@ -95,6 +95,7 @@ func (s *Sink) Sink(ctx context.Context, batch []models.Record) (err error) { } func (s *Sink) Close() (err error) { + // return s.File.Close() return nil }