Skip to content

Commit 8f43cc8

Browse files
author
Tigran Najaryan
committed
Add factory and new-style config for Queued processor
This is part of remaining migration to new configuration format. Github issue: #46 Testing done: make
1 parent 9e8f857 commit 8f43cc8

File tree

6 files changed

+234
-1
lines changed

6 files changed

+234
-1
lines changed

cmd/occollector/app/collector/testdata/otelsvc-config.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@ exporters:
1010
processors:
1111
attributes:
1212
enabled: true
13+
queued-retry:
14+
enabled: true
1315

1416
pipelines:
1517
traces:
1618
receivers: [jaeger]
17-
processors: [attributes]
19+
processors: [attributes, queued-retry]
1820
exporters: [opencensus]
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Copyright 2019, OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package queued
16+
17+
import (
18+
"time"
19+
20+
"github.com/open-telemetry/opentelemetry-service/internal/configmodels"
21+
)
22+
23+
// ConfigV2 defines configuration for Attributes processor.
24+
type ConfigV2 struct {
25+
configmodels.ProcessorSettings `mapstructure:",squash"`
26+
27+
// NumWorkers is the number of queue workers that dequeue batches and send them out
28+
NumWorkers int `mapstructure:"num-workers"`
29+
// QueueSize is the maximum number of batches allowed in queue at a given time
30+
QueueSize int `mapstructure:"queue-size"`
31+
// Retry indicates whether queue processor should retry span batches in case of processing failure
32+
RetryOnFailure bool `mapstructure:"retry-on-failure"`
33+
// BackoffDelay is the amount of time a worker waits after a failed send before retrying
34+
BackoffDelay time.Duration `mapstructure:"backoff-delay"`
35+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Copyright 2019, OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package queued
16+
17+
import (
18+
"path"
19+
"testing"
20+
"time"
21+
22+
"github.com/stretchr/testify/assert"
23+
"github.com/stretchr/testify/require"
24+
25+
"github.com/open-telemetry/opentelemetry-service/internal/configmodels"
26+
"github.com/open-telemetry/opentelemetry-service/internal/configv2"
27+
"github.com/open-telemetry/opentelemetry-service/internal/factories"
28+
)
29+
30+
var _ = configv2.RegisterTestFactories()
31+
32+
func TestLoadConfig(t *testing.T) {
33+
34+
factory := factories.GetProcessorFactory(typeStr)
35+
36+
config, err := configv2.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"))
37+
38+
require.Nil(t, err)
39+
require.NotNil(t, config)
40+
41+
p0 := config.Processors["queued-retry"]
42+
assert.Equal(t, p0, factory.CreateDefaultConfig())
43+
44+
p1 := config.Processors["queued-retry/2"]
45+
assert.Equal(t, p1,
46+
&ConfigV2{
47+
ProcessorSettings: configmodels.ProcessorSettings{
48+
TypeVal: "queued-retry",
49+
},
50+
NumWorkers: 2,
51+
QueueSize: 10,
52+
RetryOnFailure: true,
53+
BackoffDelay: time.Second * 5,
54+
})
55+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Copyright 2019, OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package queued
16+
17+
import (
18+
"time"
19+
20+
"github.com/open-telemetry/opentelemetry-service/consumer"
21+
"github.com/open-telemetry/opentelemetry-service/internal/configmodels"
22+
"github.com/open-telemetry/opentelemetry-service/internal/factories"
23+
"github.com/open-telemetry/opentelemetry-service/processor"
24+
)
25+
26+
var _ = factories.RegisterProcessorFactory(&processorFactory{})
27+
28+
const (
29+
// The value of "type" key in configuration.
30+
typeStr = "queued-retry"
31+
)
32+
33+
// processorFactory is the factory for OpenCensus exporter.
34+
type processorFactory struct {
35+
}
36+
37+
// Type gets the type of the Option config created by this factory.
38+
func (f *processorFactory) Type() string {
39+
return typeStr
40+
}
41+
42+
// CreateDefaultConfig creates the default configuration for exporter.
43+
func (f *processorFactory) CreateDefaultConfig() configmodels.Processor {
44+
return &ConfigV2{
45+
ProcessorSettings: configmodels.ProcessorSettings{
46+
TypeVal: typeStr,
47+
},
48+
NumWorkers: 10,
49+
QueueSize: 5000,
50+
RetryOnFailure: true,
51+
BackoffDelay: time.Second * 5,
52+
}
53+
}
54+
55+
// CreateTraceProcessor creates a trace processor based on this config.
56+
func (f *processorFactory) CreateTraceProcessor(
57+
nextConsumer consumer.TraceConsumer,
58+
cfg configmodels.Processor,
59+
) (processor.TraceProcessor, error) {
60+
oCfg := cfg.(*ConfigV2)
61+
return NewQueuedSpanProcessor(nextConsumer,
62+
Options.WithNumWorkers(oCfg.NumWorkers),
63+
Options.WithQueueSize(oCfg.QueueSize),
64+
Options.WithRetryOnProcessingFailures(oCfg.RetryOnFailure),
65+
Options.WithBackoffDelay(oCfg.BackoffDelay),
66+
), nil
67+
}
68+
69+
// CreateMetricsProcessor creates a metrics processor based on this config.
70+
func (f *processorFactory) CreateMetricsProcessor(
71+
nextConsumer consumer.MetricsConsumer,
72+
cfg configmodels.Processor,
73+
) (processor.MetricsProcessor, error) {
74+
return nil, factories.ErrDataTypeIsNotSupported
75+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright 2019, OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package queued
16+
17+
import (
18+
"testing"
19+
20+
"github.com/stretchr/testify/assert"
21+
"github.com/stretchr/testify/require"
22+
23+
"github.com/open-telemetry/opentelemetry-service/internal/factories"
24+
)
25+
26+
func TestCreateDefaultConfig(t *testing.T) {
27+
factory := factories.GetProcessorFactory(typeStr)
28+
require.NotNil(t, factory)
29+
30+
cfg := factory.CreateDefaultConfig()
31+
assert.NotNil(t, cfg, "failed to create default config")
32+
}
33+
34+
func TestCreateProcessor(t *testing.T) {
35+
factory := factories.GetProcessorFactory(typeStr)
36+
require.NotNil(t, factory)
37+
38+
cfg := factory.CreateDefaultConfig()
39+
40+
tp, err := factory.CreateTraceProcessor(nil, cfg)
41+
assert.NotNil(t, tp)
42+
assert.NoError(t, err, "cannot create trace processor")
43+
44+
mp, err := factory.CreateMetricsProcessor(nil, cfg)
45+
assert.Nil(t, mp)
46+
assert.Error(t, err, "should not be able to create metric processor")
47+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
receivers:
2+
examplereceiver:
3+
4+
processors:
5+
queued-retry:
6+
queued-retry/2:
7+
num-workers: 2
8+
queue-size: 10
9+
retry-on-failure: true
10+
backoff-delay: 5s
11+
12+
exporters:
13+
exampleexporter:
14+
15+
pipelines:
16+
traces:
17+
receivers: [examplereceiver]
18+
processors: [queued-retry/2]
19+
exporters: [exampleexporter]

0 commit comments

Comments
 (0)