Skip to content

Commit cf869a3

Browse files
feat: add metric collection to catch sink errors (#372)
* feat: add metric collection and catch errors for sink * chore: add tests for RecordSink * refactor: assign const for sink metric name * feat: move record sink metric to agent while broadcast * refactor: generalise metrics from sink to plugins * refactor: rename sink metrics * feat(metrics): change runPlugin labels Co-authored-by: Stewart Jingga <stewart_jingga@yahoo.com>
1 parent 63b3ded commit cf869a3

6 files changed

Lines changed: 110 additions & 13 deletions

File tree

agent/agent.go

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -142,12 +142,11 @@ func (r *Agent) Run(ctx context.Context, recipe recipe.Recipe) (run Run) {
142142
}
143143

144144
for _, sr := range recipe.Sinks {
145-
sink, err := r.setupSink(ctx, sr, stream, recipe)
145+
err := r.setupSink(ctx, sr, stream, recipe)
146146
if err != nil {
147147
run.Error = errors.Wrap(err, "failed to setup sink")
148148
return
149149
}
150-
defer sink.Close()
151150
}
152151

153152
// to gather total number of records extracted
@@ -235,14 +234,15 @@ func (r *Agent) setupProcessor(ctx context.Context, pr recipe.PluginRecipe, str
235234
return
236235
}
237236

238-
func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *stream, recipe recipe.Recipe) (sink plugins.Syncer, err error) {
237+
func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *stream, recipe recipe.Recipe) (err error) {
238+
var sink plugins.Syncer
239+
239240
if sink, err = r.sinkFactory.Get(sr.Name); err != nil {
240-
return nil, errors.Wrapf(err, "could not find sink \"%s\"", sr.Name)
241+
return errors.Wrapf(err, "could not find sink \"%s\"", sr.Name)
241242
}
242243
if err = sink.Init(ctx, sr.Config); err != nil {
243-
return nil, errors.Wrapf(err, "could not initiate sink \"%s\"", sr.Name)
244+
return errors.Wrapf(err, "could not initiate sink \"%s\"", sr.Name)
244245
}
245-
246246
retryNotification := func(e error, d time.Duration) {
247247
r.logger.Info(
248248
fmt.Sprintf("retrying sink in %d", d),
@@ -255,21 +255,28 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s
255255
return err
256256
}, retryNotification)
257257

258-
// error (after exhausted retries) will just be skipped and logged
258+
var success bool
259259
if err != nil {
260+
// once it reaches here, it means that the retry has been exhausted and still got error
261+
success = false
260262
r.logger.Error("error running sink", "sink", sr.Name, "error", err.Error())
261-
if !r.stopOnSinkError {
262-
err = nil
263-
}
264-
return err
263+
} else {
264+
success = true
265+
r.logger.Info("Successfully published record", "sink", sr.Name, "recipe", recipe.Name)
265266
}
266-
r.logger.Info("Successfully published record", "sink", sr.Name, "recipe", recipe.Name)
267267

268+
r.monitor.RecordPlugin(recipe.Name, sr.Name, "sink", success)
269+
270+
if !r.stopOnSinkError {
271+
err = nil
272+
}
268273
// TODO: create a new error to signal stopping stream.
269274
// returning nil so stream wont stop.
270275
return err
271276
}, defaultBatchSize)
272277

278+
//TODO: the sink closes even though some records remain unpublished
279+
//TODO: once fixed, file sink's Close needs to close *File
273280
stream.onClose(func() {
274281
if err = sink.Close(); err != nil {
275282
r.logger.Warn("error closing sink", "sink", sr.Name, "error", err)

agent/agent_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,7 @@ func TestAgentRun(t *testing.T) {
484484

485485
monitor := newMockMonitor()
486486
monitor.On("RecordRun", mock.AnythingOfType("agent.Run")).Once()
487+
monitor.On("RecordPlugin", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("bool"))
487488
defer monitor.AssertExpectations(t)
488489

489490
r := agent.NewAgent(agent.Config{
@@ -588,6 +589,7 @@ func TestAgentRun(t *testing.T) {
588589

589590
monitor := newMockMonitor()
590591
monitor.On("RecordRun", mock.AnythingOfType("agent.Run")).Once()
592+
monitor.On("RecordPlugin", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("bool"))
591593
defer monitor.AssertExpectations(t)
592594

593595
r := agent.NewAgent(agent.Config{
@@ -645,6 +647,7 @@ func TestAgentRun(t *testing.T) {
645647

646648
monitor := newMockMonitor()
647649
monitor.On("RecordRun", mock.AnythingOfType("agent.Run")).Once()
650+
monitor.On("RecordPlugin", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("bool"))
648651
defer monitor.AssertExpectations(t)
649652

650653
r := agent.NewAgent(agent.Config{
@@ -700,6 +703,7 @@ func TestAgentRun(t *testing.T) {
700703

701704
monitor := newMockMonitor()
702705
monitor.On("RecordRun", mock.AnythingOfType("agent.Run")).Once()
706+
monitor.On("RecordPlugin", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("bool"))
703707
defer monitor.AssertExpectations(t)
704708

705709
r := agent.NewAgent(agent.Config{
@@ -757,6 +761,7 @@ func TestAgentRunMultiple(t *testing.T) {
757761

758762
monitor := newMockMonitor()
759763
monitor.On("RecordRun", mock.AnythingOfType("agent.Run"))
764+
monitor.On("RecordPlugin", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("bool"))
760765
defer monitor.AssertExpectations(t)
761766

762767
r := agent.NewAgent(agent.Config{
@@ -891,6 +896,10 @@ func (m *mockMonitor) RecordRun(run agent.Run) {
891896
m.Called(run)
892897
}
893898

899+
func (m *mockMonitor) RecordPlugin(recipeName, pluginName, pluginType string, success bool) {
900+
m.Called(recipeName, pluginName, pluginType, success)
901+
}
902+
894903
type panicExtractor struct {
895904
mocks.Extractor
896905
}

agent/monitor.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
// Monitor is the interface for monitoring the agent.
88
type Monitor interface {
99
RecordRun(run Run)
10+
RecordPlugin(recipeName, pluginName, pluginType string, success bool)
1011
}
1112

1213
// defaultMonitor is the default implementation of Monitor.
@@ -15,6 +16,9 @@ type defaultMonitor struct{}
1516
func (m *defaultMonitor) RecordRun(run Run) {
1617
}
1718

19+
func (m *defaultMonitor) RecordPlugin(recipeName, pluginName, pluginType string, success bool) {
20+
}
21+
1822
func isNilMonitor(monitor Monitor) bool {
1923
v := reflect.ValueOf(monitor)
2024
return !v.IsValid() || reflect.DeepEqual(v.Interface(), reflect.Zero(v.Type()).Interface())

metrics/statsd.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@ import (
1212
"github.com/odpf/meteor/recipe"
1313
)
1414

15-
var (
15+
const (
1616
runDurationMetricName = "runDuration"
1717
runRecordCountMetricName = "runRecordCount"
1818
runMetricName = "run"
19+
pluginRunMetricName = "runPlugin"
1920
)
2021

2122
// StatsdMonitor represents the statsd monitor.
@@ -47,6 +48,21 @@ func (m *StatsdMonitor) RecordRun(run agent.Run) {
4748
)
4849
}
4950

51+
// RecordPlugin records a individual plugin behavior in a run
52+
func (m *StatsdMonitor) RecordPlugin(recipeName, pluginName, pluginType string, success bool) {
53+
m.client.Increment(
54+
fmt.Sprintf(
55+
"%s.%s,recipe_name=%s,name=%s,type=%s,success=%t",
56+
m.prefix,
57+
pluginRunMetricName,
58+
recipeName,
59+
pluginName,
60+
pluginType,
61+
success,
62+
),
63+
)
64+
}
65+
5066
// createMetricName creates a metric name for a given recipe and success
5167
func (m *StatsdMonitor) createMetricName(metricName string, recipe recipe.Recipe, success bool, recordCount int) string {
5268
var successText = "false"

metrics/statsd_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,66 @@ func TestStatsdMonitorRecordRun(t *testing.T) {
166166
})
167167
}
168168

169+
func TestStatsdMonitorRecordPlugin(t *testing.T) {
170+
statsdPrefix := "testprefix"
171+
172+
t.Run("should create metrics with the correct name and value", func(t *testing.T) {
173+
recipe := recipe.Recipe{
174+
Name: "test-recipe",
175+
Source: recipe.PluginRecipe{
176+
Name: "mysql",
177+
},
178+
Sinks: []recipe.PluginRecipe{
179+
{Name: "test-sink"},
180+
},
181+
}
182+
incrementMetric := fmt.Sprintf(
183+
"%s.%s,recipe_name=%s,name=%s,type=%s,success=%t",
184+
statsdPrefix,
185+
"runPlugin",
186+
recipe.Name,
187+
recipe.Sinks[0].Name,
188+
"sink",
189+
false,
190+
)
191+
192+
client := new(mockStatsdClient)
193+
client.On("Increment", incrementMetric)
194+
defer client.AssertExpectations(t)
195+
196+
monitor := metrics.NewStatsdMonitor(client, statsdPrefix)
197+
monitor.RecordPlugin(recipe.Name, recipe.Sinks[0].Name, "sink", false)
198+
})
199+
200+
t.Run("should set success field to true on success", func(t *testing.T) {
201+
recipe := recipe.Recipe{
202+
Name: "test-recipe",
203+
Source: recipe.PluginRecipe{
204+
Name: "bigquery",
205+
},
206+
Sinks: []recipe.PluginRecipe{
207+
{Name: "test-sink"},
208+
},
209+
}
210+
incrementMetric := fmt.Sprintf(
211+
"%s.%s,recipe_name=%s,name=%s,type=%s,success=%t",
212+
statsdPrefix,
213+
"runPlugin",
214+
recipe.Name,
215+
recipe.Sinks[0].Name,
216+
"sink",
217+
true,
218+
)
219+
220+
client := new(mockStatsdClient)
221+
client.On("Increment", incrementMetric)
222+
defer client.AssertExpectations(t)
223+
224+
monitor := metrics.NewStatsdMonitor(client, statsdPrefix)
225+
monitor.RecordPlugin(recipe.Name, recipe.Sinks[0].Name, "sink", true)
226+
})
227+
}
228+
169229
func TestNewStatsClient(t *testing.T) {
170230
t.Run("should return error for invalid address", func(t *testing.T) {
171231
_, err := metrics.NewStatsdClient("127.0.0.1")

plugins/sinks/file/file.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ func (s *Sink) Sink(ctx context.Context, batch []models.Record) (err error) {
9595
}
9696

9797
func (s *Sink) Close() (err error) {
98+
// return s.File.Close()
9899
return nil
99100
}
100101

0 commit comments

Comments
 (0)