Skip to content

Commit 62452d2

Browse files
committed
Unify tracking of metrics and events in CloudHandler
Instead of having a per source for metrics queue and a per source for events queue, this unifies it in to a single per host for anything queue. This simplifies the logic, and removes what appears to be a logic bug: #341 (comment)
1 parent d02913e commit 62452d2

File tree

4 files changed

+53
-58
lines changed

4 files changed

+53
-58
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
Next
22
----
3-
- Refactor the cloud handler to use the modern pipeline. This removes the `cloudprovider.items_queued` metric with `type:metric` tag.
3+
- Refactor the cloud handler to use the modern pipeline. This removes the `cloudprovider.items_queued` metric, and now
4+
tracks the absolute number of hosts to look up, regardless of type.
45

56
29.0.0
67
------

METRICS.md

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,7 @@ Metrics:
5353
| cloudprovider.cache_refresh_negative | gauge (cumulative) | | The cumulative number of refreshes which had an error refreshing and used old data
5454
| cloudprovider.cache_hit | gauge (cumulative) | | The cumulative number of cache hits (host was in the cache)
5555
| cloudprovider.cache_miss | gauge (cumulative) | | The cumulative number of cache misses
56-
| cloudprovider.hosts_queued | gauge (flush) | type | The absolute number of hosts waiting to be looked up
57-
| cloudprovider.items_queued | gauge (flush) | type | The absolute number of metrics or events waiting for a host lookup to complete
56+
| cloudprovider.hosts_queued | gauge (flush) | | The absolute number of hosts waiting to be looked up
5857
| http.forwarder.invalid | counter | | The number of failures to prepare a batch of metrics to forward
5958
| http.forwarder.created | counter | | The number of batches prepared for forwarding
6059
| http.forwarder.sent | counter | | The number of batches successfully forwarded
@@ -70,7 +69,6 @@ Metrics:
7069
| version | The git tag of the build
7170
| commit | The short git commit of the build
7271
| backend | The backend sending a particular metric
73-
| type | Either metric or event for cloudprovider.hosts_queued, or event for cloudprovider.items_queued
7472
| result | Success to indicate a batch of metrics was successfully processed, failure to indicate a batch of metrics was not processed, with additional failure tag for why)
7573
| failure | The reason a batch of metrics was not processed
7674
| server-name | The name of an http-server as specified in the config file

pkg/statsd/handler_backend.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ func (bh *BackendHandler) DispatchEvent(ctx context.Context, e *gostatsd.Event)
167167
for _, backend := range bh.backends {
168168
select {
169169
case <-ctx.Done():
170-
// Not all backends got the event, should decrement the wg counter
170+
// Not all backends got the event, should decrement the wg counter to account for it
171171
bh.eventWg.Add(eventsDispatched - len(bh.backends))
172172
return
173173
case bh.concurrentEvents <- struct{}{}:

pkg/statsd/handler_cloud.go

Lines changed: 49 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -11,28 +11,28 @@ import (
1111
"github.com/atlassian/gostatsd/pkg/stats"
1212
)
1313

14+
type pendingMetricsAndEvents struct {
15+
metrics *gostatsd.MetricMap
16+
events []*gostatsd.Event
17+
}
18+
1419
// CloudHandler enriches metrics and events with additional information fetched from cloud provider.
1520
type CloudHandler struct {
1621
// These fields are accessed by any go routine, must use atomic ops
1722
statsCacheHit uint64 // Cumulative number of cache hits
1823
statsCacheMiss uint64 // Cumulative number of cache misses
1924

20-
// All other stats fields may only be read or written by the main CloudHandler.Run goroutine
21-
statsMetricHostsQueued uint64 // Absolute number of IPs waiting for a CP to respond for metrics
22-
statsEventItemsQueued uint64 // Absolute number of events queued, waiting for a CP to respond
23-
statsEventHostsQueued uint64 // Absolute number of IPs waiting for a CP to respond for events
24-
2525
cachedInstances gostatsd.CachedInstances
2626
handler gostatsd.PipelineHandler
2727
incomingMetrics chan *gostatsd.MetricMap
2828
incomingEvents chan *gostatsd.Event
2929

3030
// emitChan triggers a write of all the current stats when it is given a Statser
31-
emitChan chan stats.Statser
32-
awaitingEvents map[gostatsd.Source][]*gostatsd.Event
33-
awaitingMetrics map[gostatsd.Source]*gostatsd.MetricMap
31+
emitChan chan stats.Statser
32+
33+
perHostPending map[gostatsd.Source]*pendingMetricsAndEvents
3434
toLookupIPs []gostatsd.Source
35-
wg sync.WaitGroup
35+
wgPendingEvents sync.WaitGroup
3636

3737
estimatedTags int
3838
}
@@ -45,8 +45,7 @@ func NewCloudHandler(cachedInstances gostatsd.CachedInstances, handler gostatsd.
4545
incomingMetrics: make(chan *gostatsd.MetricMap),
4646
incomingEvents: make(chan *gostatsd.Event),
4747
emitChan: make(chan stats.Statser),
48-
awaitingEvents: make(map[gostatsd.Source][]*gostatsd.Event),
49-
awaitingMetrics: make(map[gostatsd.Source]*gostatsd.MetricMap),
48+
perHostPending: make(map[gostatsd.Source]*pendingMetricsAndEvents),
5049
estimatedTags: handler.EstimatedTags() + cachedInstances.EstimatedTags(),
5150
}
5251
}
@@ -105,17 +104,17 @@ func (ch *CloudHandler) DispatchEvent(ctx context.Context, e *gostatsd.Event) {
105104
ch.handler.DispatchEvent(ctx, e)
106105
return
107106
}
108-
ch.wg.Add(1) // Increment before sending to the channel
107+
ch.wgPendingEvents.Add(1) // Increment before sending to the channel
109108
select {
110109
case <-ctx.Done():
111-
ch.wg.Done()
110+
ch.wgPendingEvents.Done()
112111
case ch.incomingEvents <- e:
113112
}
114113
}
115114

116115
// WaitForEvents waits for all event-dispatching goroutines to finish.
117116
func (ch *CloudHandler) WaitForEvents() {
118-
ch.wg.Wait()
117+
ch.wgPendingEvents.Wait()
119118
ch.handler.WaitForEvents()
120119
}
121120

@@ -160,11 +159,8 @@ func (ch *CloudHandler) emit(statser stats.Statser) {
160159
// atomic
161160
statser.Gauge("cloudprovider.cache_hit", float64(atomic.LoadUint64(&ch.statsCacheHit)), nil)
162161
statser.Gauge("cloudprovider.cache_miss", float64(atomic.LoadUint64(&ch.statsCacheMiss)), nil)
163-
t := gostatsd.Tags{"type:metric"}
164-
statser.Gauge("cloudprovider.hosts_queued", float64(ch.statsMetricHostsQueued), t)
165-
t = gostatsd.Tags{"type:event"}
166-
statser.Gauge("cloudprovider.hosts_queued", float64(ch.statsEventHostsQueued), t)
167-
statser.Gauge("cloudprovider.items_queued", float64(ch.statsEventItemsQueued), t)
162+
// non-atomic
163+
statser.Gauge("cloudprovider.hosts_queued", float64(len(ch.perHostPending)), nil)
168164
}
169165

170166
func (ch *CloudHandler) Run(ctx context.Context) {
@@ -184,10 +180,8 @@ func (ch *CloudHandler) Run(ctx context.Context) {
184180
case info := <-infoSource:
185181
ch.handleInstanceInfo(ctx, info)
186182
case metrics := <-ch.incomingMetrics:
187-
// Add metrics to awaitingMetrics, accumulate IPs to lookup
188183
ch.handleIncomingMetrics(metrics)
189184
case e := <-ch.incomingEvents:
190-
// Add event to awaitingEvents, accumulate IPs to lookup
191185
ch.handleIncomingEvent(e)
192186
case statser := <-ch.emitChan:
193187
ch.emit(statser)
@@ -203,38 +197,46 @@ func (ch *CloudHandler) Run(ctx context.Context) {
203197
}
204198

205199
func (ch *CloudHandler) handleInstanceInfo(ctx context.Context, info gostatsd.InstanceInfo) {
206-
mm := ch.awaitingMetrics[info.IP]
207-
if mm != nil {
208-
delete(ch.awaitingMetrics, info.IP)
209-
ch.statsMetricHostsQueued--
210-
go ch.updateAndDispatchMetrics(ctx, info.Instance, mm)
200+
pending, ok := ch.perHostPending[info.IP]
201+
if !ok {
202+
return // got an instance for something we didn't request, ignore it.
211203
}
212-
events := ch.awaitingEvents[info.IP]
213-
if len(events) > 0 {
214-
delete(ch.awaitingEvents, info.IP)
215-
ch.statsEventItemsQueued -= uint64(len(events))
216-
ch.statsEventHostsQueued--
217-
go ch.updateAndDispatchEvents(ctx, info.Instance, events)
204+
205+
delete(ch.perHostPending, info.IP)
206+
if pending.metrics != nil {
207+
go ch.updateAndDispatchMetrics(ctx, info.Instance, pending.metrics)
208+
}
209+
if len(pending.events) > 0 {
210+
go ch.updateAndDispatchEvents(ctx, info.Instance, pending.events)
218211
}
219212
}
220213

221-
// prepareMetricQueue will ensure that ch.awaitingMetrics has a matching MetricMap for
222-
// source, and return it. If it did not have one initially, it will also enqueue source
223-
// for lookup. The functionality is overloaded to minimize code duplication.
224-
func (ch *CloudHandler) prepareMetricQueue(source gostatsd.Source) *gostatsd.MetricMap {
225-
if queue, ok := ch.awaitingMetrics[source]; ok {
226-
return queue
227-
}
228-
if len(ch.awaitingEvents[source]) == 0 {
214+
// preparePending will return a place to queue things that are waiting to be processed,
215+
// and ensure that source will be looked up if it wasn't already.
216+
func (ch *CloudHandler) preparePending(source gostatsd.Source) *pendingMetricsAndEvents {
217+
if _, ok := ch.perHostPending[source]; !ok {
218+
ch.perHostPending[source] = &pendingMetricsAndEvents{}
229219
ch.toLookupIPs = append(ch.toLookupIPs, source)
230-
ch.statsMetricHostsQueued++
231220
}
232-
queue := gostatsd.NewMetricMap()
233-
ch.awaitingMetrics[source] = queue
234-
return queue
221+
return ch.perHostPending[source]
222+
}
223+
224+
// prepareMetricQueue will ensure that ch.perHostPending has a matching MetricMap for
225+
// the provided source and return it.
226+
func (ch *CloudHandler) prepareMetricQueue(source gostatsd.Source) *gostatsd.MetricMap {
227+
queue := ch.preparePending(source)
228+
if queue.metrics == nil {
229+
// There might be value in pushing this to preparePending, since the split is
230+
// really only beneficial if a host is only sending events and not metrics, and
231+
// this adds an extra comparison to every lookup.
232+
queue.metrics = gostatsd.NewMetricMap()
233+
}
234+
return queue.metrics
235235
}
236236

237237
func (ch *CloudHandler) handleIncomingMetrics(mm *gostatsd.MetricMap) {
238+
// The <metric>.Source values could be from different hosts if they were
239+
// forwarded, therefore we need to do a lookup each time.
238240
mm.Counters.Each(func(metricName string, tagsKey string, c gostatsd.Counter) {
239241
ch.prepareMetricQueue(c.Source).MergeCounter(metricName, tagsKey, c)
240242
})
@@ -250,14 +252,8 @@ func (ch *CloudHandler) handleIncomingMetrics(mm *gostatsd.MetricMap) {
250252
}
251253

252254
func (ch *CloudHandler) handleIncomingEvent(e *gostatsd.Event) {
253-
queue := ch.awaitingEvents[e.Source]
254-
ch.awaitingEvents[e.Source] = append(queue, e)
255-
if len(queue) == 0 && ch.awaitingMetrics[e.Source] == nil {
256-
// This is the first event for that IP in the queue. Need to fetch an Instance for this IP.
257-
ch.toLookupIPs = append(ch.toLookupIPs, e.Source)
258-
ch.statsEventHostsQueued++
259-
}
260-
ch.statsEventItemsQueued++
255+
queue := ch.preparePending(e.Source)
256+
queue.events = append(queue.events, e)
261257
}
262258

263259
func (ch *CloudHandler) updateAndDispatchMetrics(ctx context.Context, instance *gostatsd.Instance, mmIn *gostatsd.MetricMap) {
@@ -284,7 +280,7 @@ func (ch *CloudHandler) updateAndDispatchMetrics(ctx context.Context, instance *
284280
func (ch *CloudHandler) updateAndDispatchEvents(ctx context.Context, instance *gostatsd.Instance, events []*gostatsd.Event) {
285281
var dispatched int
286282
defer func() {
287-
ch.wg.Add(-dispatched)
283+
ch.wgPendingEvents.Add(-dispatched)
288284
}()
289285
for _, e := range events {
290286
updateInplace(e, instance)

0 commit comments

Comments
 (0)