Skip to content

Commit d9c27cc

Browse files
committed
Add Persistent Queue metrics with number of dispatched batches
1 parent 6edb503 commit d9c27cc

File tree

5 files changed

+148
-0
lines changed

5 files changed

+148
-0
lines changed

exporter/exporterhelper/factory.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"go.opentelemetry.io/collector/component"
2121
"go.opentelemetry.io/collector/component/componenterror"
2222
"go.opentelemetry.io/collector/config"
23+
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
2324
)
2425

2526
// FactoryOption apply changes to ExporterOptions.
@@ -78,6 +79,7 @@ func NewFactory(
7879
for _, opt := range options {
7980
opt(f)
8081
}
82+
internal.RegisterMetrics()
8183
return f
8284
}
8385

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//go:build enable_unstable
16+
// +build enable_unstable
17+
18+
package internal
19+
20+
import (
21+
"context"
22+
"sync"
23+
24+
"go.opencensus.io/stats"
25+
"go.opencensus.io/stats/view"
26+
"go.opencensus.io/tag"
27+
)
28+
29+
var (
30+
currentlyDispatchedBatches = stats.Int64(
31+
"/currently_dispatched_batches",
32+
"Number of batches that are currently being sent",
33+
stats.UnitDimensionless)
34+
35+
totalDispatchedBatches = stats.Int64(
36+
"/total_dispatched_batches",
37+
"Total number of batches which were processed",
38+
stats.UnitDimensionless)
39+
40+
queueNameKey = tag.MustNewKey("queue_name")
41+
)
42+
43+
func recordCurrentlyDispatchedBatches(ctx context.Context, dispatchedBatches int, queueName string) {
44+
ctx, err := tag.New(ctx, tag.Insert(queueNameKey, queueName))
45+
if err != nil {
46+
return
47+
}
48+
49+
stats.Record(ctx, currentlyDispatchedBatches.M(int64(dispatchedBatches)))
50+
}
51+
52+
func recordBatchDispatched(ctx context.Context, queueName string) {
53+
ctx, err := tag.New(ctx, tag.Insert(queueNameKey, queueName))
54+
if err != nil {
55+
return
56+
}
57+
58+
stats.Record(ctx, totalDispatchedBatches.M(int64(1)))
59+
}
60+
61+
// ExporterHelperInternalViews return the metrics views according to given telemetry level.
62+
func ExporterHelperInternalViews() []*view.View {
63+
64+
return []*view.View{
65+
{
66+
Name: currentlyDispatchedBatches.Name(),
67+
Description: currentlyDispatchedBatches.Description(),
68+
Measure: currentlyDispatchedBatches,
69+
Aggregation: view.Count(),
70+
TagKeys: []tag.Key{queueNameKey},
71+
},
72+
{
73+
Name: totalDispatchedBatches.Name(),
74+
Description: totalDispatchedBatches.Description(),
75+
Measure: totalDispatchedBatches,
76+
Aggregation: view.Sum(),
77+
TagKeys: []tag.Key{queueNameKey},
78+
},
79+
}
80+
}
81+
82+
var onceMetrics sync.Once
83+
84+
// RegisterMetrics registers a set of metric views used by the internal package
85+
func RegisterMetrics() {
86+
onceMetrics.Do(func() {
87+
_ = view.Register(ExporterHelperInternalViews()...)
88+
})
89+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//go:build !enable_unstable
16+
// +build !enable_unstable
17+
18+
package internal
19+
20+
func RegisterMetrics() {}

exporter/exporterhelper/internal/persistent_storage.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,8 @@ func (pcs *persistentContiguousStorage) itemDispatchingStart(ctx context.Context
360360
pcs.logger.Debug("Failed updating currently dispatched items",
361361
zap.String(zapQueueNameKey, pcs.queueName), zap.Error(err))
362362
}
363+
364+
recordCurrentlyDispatchedBatches(ctx, len(pcs.currentlyDispatchedItems), pcs.queueName)
363365
}
364366

365367
// itemDispatchingFinish removes the item from the list of currently dispatched items and deletes it from the persistent queue
@@ -380,6 +382,8 @@ func (pcs *persistentContiguousStorage) itemDispatchingFinish(ctx context.Contex
380382
pcs.logger.Debug("Failed updating currently dispatched items",
381383
zap.String(zapQueueNameKey, pcs.queueName), zap.Error(err))
382384
}
385+
386+
recordBatchDispatched(ctx, pcs.queueName)
383387
}
384388

385389
func (pcs *persistentContiguousStorage) updateReadIndex(ctx context.Context) {

exporter/exporterhelper/internal/persistent_storage_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import (
2929
"time"
3030

3131
"github.com/stretchr/testify/require"
32+
"go.opencensus.io/stats/view"
33+
"go.opencensus.io/tag"
3234
"go.uber.org/zap"
3335

3436
"go.opentelemetry.io/collector/component"
@@ -169,6 +171,37 @@ func TestPersistentStorage_CurrentlyProcessedItems(t *testing.T) {
169171
}
170172
}
171173

174+
func TestPersistentStorage_MetricsReported(t *testing.T) {
175+
path := createTemporaryDirectory()
176+
defer os.RemoveAll(path)
177+
178+
traces := newTraces(5, 10)
179+
req := newFakeTracesRequest(traces)
180+
181+
ext := createStorageExtension(path)
182+
client := createTestClient(ext)
183+
ps := createTestPersistentStorage(client)
184+
185+
RegisterMetrics()
186+
187+
for i := 0; i < 5; i++ {
188+
err := ps.put(req)
189+
require.NoError(t, err)
190+
}
191+
192+
_ = getItemFromChannel(t, ps)
193+
requireCurrentlyDispatchedItemsEqual(t, ps, []itemIndex{0, 1})
194+
195+
dd, err := view.RetrieveData("/currently_dispatched_batches")
196+
require.NoError(t, err)
197+
require.Equal(t, 1, len(dd))
198+
require.Equal(t, 1, len(dd[0].Tags))
199+
require.Equal(t, tag.Tag{Key: queueNameKey, Value: "foo"}, dd[0].Tags[0])
200+
require.Equal(t, int64(2), dd[0].Data.(*view.CountData).Value)
201+
202+
ps.stop()
203+
}
204+
172205
func TestPersistentStorage_RepeatPutCloseReadClose(t *testing.T) {
173206
path := createTemporaryDirectory()
174207
defer os.RemoveAll(path)

0 commit comments

Comments
 (0)