Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions cmd/occollector/app/builder/exporters_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-service/consumer"
"github.com/open-telemetry/opentelemetry-service/factories"
"github.com/open-telemetry/opentelemetry-service/internal"
"github.com/open-telemetry/opentelemetry-service/pkg/configmodels"
"github.com/open-telemetry/opentelemetry-service/pkg/factories"
"github.com/open-telemetry/opentelemetry-service/models"
)

// builtExporter is an exporter that is built based on a config. It can have
Expand All @@ -39,7 +39,7 @@ func (exp *builtExporter) Stop() error {
}

// Exporters is a map of exporters created from exporter configs.
type Exporters map[configmodels.Exporter]*builtExporter
type Exporters map[models.Exporter]*builtExporter

// StopAll stops all exporters.
func (exps Exporters) StopAll() {
Expand All @@ -50,23 +50,23 @@ func (exps Exporters) StopAll() {

type dataTypeRequirement struct {
// Pipeline that requires the data type.
requiredBy *configmodels.Pipeline
requiredBy *models.Pipeline
}

// Map of data type requirements.
type dataTypeRequirements map[configmodels.DataType]dataTypeRequirement
type dataTypeRequirements map[models.DataType]dataTypeRequirement

// Data type requirements for all exporters.
type exportersRequiredDataTypes map[configmodels.Exporter]dataTypeRequirements
type exportersRequiredDataTypes map[models.Exporter]dataTypeRequirements

// ExportersBuilder builds exporters from config.
type ExportersBuilder struct {
logger *zap.Logger
config *configmodels.ConfigV2
config *models.ConfigV2
}

// NewExportersBuilder creates a new ExportersBuilder. Call Build() on the returned value.
func NewExportersBuilder(logger *zap.Logger, config *configmodels.ConfigV2) *ExportersBuilder {
func NewExportersBuilder(logger *zap.Logger, config *models.ConfigV2) *ExportersBuilder {
return &ExportersBuilder{logger, config}
}

Expand Down Expand Up @@ -149,7 +149,7 @@ func combineStopFunc(f1, f2 factories.StopFunc) factories.StopFunc {
}

func (eb *ExportersBuilder) buildExporter(
config configmodels.Exporter,
config models.Exporter,
exportersInputDataTypes exportersRequiredDataTypes,
) (*builtExporter, error) {

Expand All @@ -166,13 +166,13 @@ func (eb *ExportersBuilder) buildExporter(
return exporter, nil
}

if requirement, ok := inputDataTypes[configmodels.TracesDataType]; ok {
if requirement, ok := inputDataTypes[models.TracesDataType]; ok {
// Traces data type is required. Create a trace exporter based on config.
tc, stopFunc, err := factory.CreateTraceExporter(config)
if err != nil {
if err == factories.ErrDataTypeIsNotSupported {
// Could not create because this exporter does not support this data type.
return nil, typeMismatchErr(config, requirement.requiredBy, configmodels.TracesDataType)
return nil, typeMismatchErr(config, requirement.requiredBy, models.TracesDataType)
}
return nil, fmt.Errorf("error creating %s exporter: %v", config.Name(), err)
}
Expand All @@ -181,13 +181,13 @@ func (eb *ExportersBuilder) buildExporter(
exporter.stop = stopFunc
}

if requirement, ok := inputDataTypes[configmodels.MetricsDataType]; ok {
if requirement, ok := inputDataTypes[models.MetricsDataType]; ok {
// Metrics data type is required. Create a trace exporter based on config.
mc, stopFunc, err := factory.CreateMetricsExporter(config)
if err != nil {
if err == factories.ErrDataTypeIsNotSupported {
// Could not create because this exporter does not support this data type.
return nil, typeMismatchErr(config, requirement.requiredBy, configmodels.MetricsDataType)
return nil, typeMismatchErr(config, requirement.requiredBy, models.MetricsDataType)
}
return nil, fmt.Errorf("error creating %s exporter: %v", config.Name(), err)
}
Expand All @@ -202,9 +202,9 @@ func (eb *ExportersBuilder) buildExporter(
}

func typeMismatchErr(
config configmodels.Exporter,
requiredByPipeline *configmodels.Pipeline,
dataType configmodels.DataType,
config models.Exporter,
requiredByPipeline *models.Pipeline,
dataType models.DataType,
) error {
return fmt.Errorf("%s is a %s pipeline but has a %s which does not support %s",
requiredByPipeline.Name, dataType.GetString(),
Expand Down
16 changes: 8 additions & 8 deletions cmd/occollector/app/builder/exporters_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ import (
"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-service/exporter/opencensusexporter"
"github.com/open-telemetry/opentelemetry-service/pkg/configmodels"
"github.com/open-telemetry/opentelemetry-service/models"
)

func TestExportersBuilder_Build(t *testing.T) {
config := &configmodels.ConfigV2{
Exporters: map[string]configmodels.Exporter{
config := &models.ConfigV2{
Exporters: map[string]models.Exporter{
"opencensus": &opencensusexporter.ConfigV2{
ExporterSettings: configmodels.ExporterSettings{
ExporterSettings: models.ExporterSettings{
NameVal: "opencensus",
TypeVal: "opencensus",
Enabled: true,
Expand All @@ -40,10 +40,10 @@ func TestExportersBuilder_Build(t *testing.T) {
},
},

Pipelines: map[string]*configmodels.Pipeline{
Pipelines: map[string]*models.Pipeline{
"trace": {
Name: "trace",
InputType: configmodels.TracesDataType,
InputType: models.TracesDataType,
Exporters: []string{"opencensus"},
},
},
Expand All @@ -67,7 +67,7 @@ func TestExportersBuilder_Build(t *testing.T) {

// Now change only pipeline data type to "metrics" and make sure exporter builder
// now fails (because opencensus exporter does not currently support metrics).
config.Pipelines["trace"].InputType = configmodels.MetricsDataType
config.Pipelines["trace"].InputType = models.MetricsDataType
_, err = NewExportersBuilder(zap.NewNop(), config).Build()
assert.NotNil(t, err)

Expand All @@ -92,7 +92,7 @@ func TestExportersBuilder_Build(t *testing.T) {

func TestExportersBuilder_StopAll(t *testing.T) {
exporters := make(Exporters)
expCfg := &configmodels.ExporterSettings{}
expCfg := &models.ExporterSettings{}
stopCalled := false
exporters[expCfg] = &builtExporter{
stop: func() error {
Expand Down
20 changes: 10 additions & 10 deletions cmd/occollector/app/builder/pipelines_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-service/consumer"
"github.com/open-telemetry/opentelemetry-service/pkg/configmodels"
"github.com/open-telemetry/opentelemetry-service/pkg/factories"
"github.com/open-telemetry/opentelemetry-service/factories"
"github.com/open-telemetry/opentelemetry-service/models"
"github.com/open-telemetry/opentelemetry-service/processor/multiconsumer"
)

Expand All @@ -34,20 +34,20 @@ type builtProcessor struct {

// PipelineProcessors is a map of entry-point processors created from pipeline configs.
// Each element of the map points to the first processor of the pipeline.
type PipelineProcessors map[*configmodels.Pipeline]*builtProcessor
type PipelineProcessors map[*models.Pipeline]*builtProcessor

// PipelinesBuilder builds pipelines from config.
type PipelinesBuilder struct {
logger *zap.Logger
config *configmodels.ConfigV2
config *models.ConfigV2
exporters Exporters
}

// NewPipelinesBuilder creates a new PipelinesBuilder. Requires exporters to be already
// built via ExportersBuilder. Call Build() on the returned value.
func NewPipelinesBuilder(
logger *zap.Logger,
config *configmodels.ConfigV2,
config *models.ConfigV2,
exporters Exporters,
) *PipelinesBuilder {
return &PipelinesBuilder{logger, config, exporters}
Expand All @@ -72,7 +72,7 @@ func (pb *PipelinesBuilder) Build() (PipelineProcessors, error) {
// The last processor in the pipeline will be plugged to fan out the data into exporters
// that are configured for this pipeline.
func (pb *PipelinesBuilder) buildPipeline(
pipelineCfg *configmodels.Pipeline,
pipelineCfg *models.Pipeline,
) (*builtProcessor, error) {

// Build the pipeline backwards.
Expand All @@ -82,9 +82,9 @@ func (pb *PipelinesBuilder) buildPipeline(
var mc consumer.MetricsConsumer

switch pipelineCfg.InputType {
case configmodels.TracesDataType:
case models.TracesDataType:
tc = pb.buildFanoutExportersTraceConsumer(pipelineCfg.Exporters)
case configmodels.MetricsDataType:
case models.MetricsDataType:
mc = pb.buildFanoutExportersMetricsConsumer(pipelineCfg.Exporters)
}

Expand All @@ -103,9 +103,9 @@ func (pb *PipelinesBuilder) buildPipeline(
// which we will build in the next loop iteration).
var err error
switch pipelineCfg.InputType {
case configmodels.TracesDataType:
case models.TracesDataType:
tc, err = factory.CreateTraceProcessor(pb.logger, tc, procCfg)
case configmodels.MetricsDataType:
case models.MetricsDataType:
mc, err = factory.CreateMetricsProcessor(pb.logger, mc, procCfg)
}

Expand Down
6 changes: 3 additions & 3 deletions cmd/occollector/app/builder/pipelines_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (

tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"

"github.com/open-telemetry/opentelemetry-service/configv2"
"github.com/open-telemetry/opentelemetry-service/data"
"github.com/open-telemetry/opentelemetry-service/pkg/configmodels"
"github.com/open-telemetry/opentelemetry-service/pkg/configv2"
"github.com/open-telemetry/opentelemetry-service/models"
"github.com/open-telemetry/opentelemetry-service/processor/addattributesprocessor"
)

Expand Down Expand Up @@ -131,7 +131,7 @@ func TestPipelinesBuilder_Error(t *testing.T) {
// since there is no way to have such config loaded by LoadConfigFile, it would not
// pass validation. We are doing this to test failure mode of PipelinesBuilder.
pipeline := config.Pipelines["traces"]
pipeline.InputType = configmodels.MetricsDataType
pipeline.InputType = models.MetricsDataType

exporters, err := NewExportersBuilder(zap.NewNop(), config).Build()

Expand Down
30 changes: 15 additions & 15 deletions cmd/occollector/app/builder/receivers_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-service/consumer"
"github.com/open-telemetry/opentelemetry-service/factories"
"github.com/open-telemetry/opentelemetry-service/internal"
"github.com/open-telemetry/opentelemetry-service/pkg/configmodels"
"github.com/open-telemetry/opentelemetry-service/pkg/factories"
"github.com/open-telemetry/opentelemetry-service/models"
"github.com/open-telemetry/opentelemetry-service/processor/multiconsumer"
"github.com/open-telemetry/opentelemetry-service/receiver"
)
Expand Down Expand Up @@ -76,7 +76,7 @@ func (rcv *builtReceiver) Start(asyncErrorChan chan<- error) error {
}

// Receivers is a map of receivers created from receiver configs.
type Receivers map[configmodels.Receiver]*builtReceiver
type Receivers map[models.Receiver]*builtReceiver

// StopAll stops all receivers.
func (rcvs Receivers) StopAll() {
Expand All @@ -101,14 +101,14 @@ func (rcvs Receivers) StartAll(logger *zap.Logger, asyncErrorChan chan<- error)
// ReceiversBuilder builds receivers from config.
type ReceiversBuilder struct {
logger *zap.Logger
config *configmodels.ConfigV2
config *models.ConfigV2
pipelineProcessors PipelineProcessors
}

// NewReceiversBuilder creates a new ReceiversBuilder. Call Build() on the returned value.
func NewReceiversBuilder(
logger *zap.Logger,
config *configmodels.ConfigV2,
config *models.ConfigV2,
pipelineProcessors PipelineProcessors,
) *ReceiversBuilder {
return &ReceiversBuilder{logger, config, pipelineProcessors}
Expand All @@ -131,7 +131,7 @@ func (rb *ReceiversBuilder) Build() (Receivers, error) {
}

// hasReceiver returns true if the pipeline is attached to specified receiver.
func hasReceiver(pipeline *configmodels.Pipeline, receiverName string) bool {
func hasReceiver(pipeline *models.Pipeline, receiverName string) bool {
for _, name := range pipeline.Receivers {
if name == receiverName {
return true
Expand All @@ -140,16 +140,16 @@ func hasReceiver(pipeline *configmodels.Pipeline, receiverName string) bool {
return false
}

type attachedPipelines map[configmodels.DataType][]*builtProcessor
type attachedPipelines map[models.DataType][]*builtProcessor

func (rb *ReceiversBuilder) findPipelinesToAttach(config configmodels.Receiver) (attachedPipelines, error) {
func (rb *ReceiversBuilder) findPipelinesToAttach(config models.Receiver) (attachedPipelines, error) {
// A receiver may be attached to multiple pipelines. Pipelines may consume different
// data types. We need to compile the list of pipelines of each type that must be
// attached to this receiver according to configuration.

pipelinesToAttach := make(attachedPipelines)
pipelinesToAttach[configmodels.TracesDataType] = make([]*builtProcessor, 0)
pipelinesToAttach[configmodels.MetricsDataType] = make([]*builtProcessor, 0)
pipelinesToAttach[models.TracesDataType] = make([]*builtProcessor, 0)
pipelinesToAttach[models.MetricsDataType] = make([]*builtProcessor, 0)

// Iterate over all pipelines.
for _, pipelineCfg := range rb.config.Pipelines {
Expand All @@ -173,8 +173,8 @@ func (rb *ReceiversBuilder) findPipelinesToAttach(config configmodels.Receiver)

func (rb *ReceiversBuilder) attachReceiverToPipelines(
factory factories.ReceiverFactory,
dataType configmodels.DataType,
config configmodels.Receiver,
dataType models.DataType,
config models.Receiver,
receiver *builtReceiver,
pipelineProcessors []*builtProcessor,
) error {
Expand All @@ -183,14 +183,14 @@ func (rb *ReceiversBuilder) attachReceiverToPipelines(
// sure its output is fanned out to all attached pipelines.
var err error
switch dataType {
case configmodels.TracesDataType:
case models.TracesDataType:
// First, create the fan out junction point.
junction := buildFanoutTraceConsumer(pipelineProcessors)

// Now create the receiver and tell it to send to the junction point.
receiver.trace, err = factory.CreateTraceReceiver(context.Background(), config, junction)

case configmodels.MetricsDataType:
case models.MetricsDataType:
junction := buildFanoutMetricConsumer(pipelineProcessors)
receiver.metrics, err = factory.CreateMetricsReceiver(config, junction)
}
Expand All @@ -213,7 +213,7 @@ func (rb *ReceiversBuilder) attachReceiverToPipelines(
return nil
}

func (rb *ReceiversBuilder) buildReceiver(config configmodels.Receiver) (*builtReceiver, error) {
func (rb *ReceiversBuilder) buildReceiver(config models.Receiver) (*builtReceiver, error) {

// First find pipelines that must be attached to this receiver.
pipelinesToAttach, err := rb.findPipelinesToAttach(config)
Expand Down
8 changes: 4 additions & 4 deletions cmd/occollector/app/builder/receivers_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"

"github.com/open-telemetry/opentelemetry-service/configv2"
"github.com/open-telemetry/opentelemetry-service/data"
"github.com/open-telemetry/opentelemetry-service/pkg/configmodels"
"github.com/open-telemetry/opentelemetry-service/pkg/configv2"
"github.com/open-telemetry/opentelemetry-service/models"
)

type testCase struct {
Expand Down Expand Up @@ -216,7 +216,7 @@ func TestReceiversBuilder_DataTypeError(t *testing.T) {

func TestReceiversBuilder_StartAll(t *testing.T) {
receivers := make(Receivers)
rcvCfg := &configmodels.ReceiverSettings{}
rcvCfg := &models.ReceiverSettings{}

receiver := &configv2.ExampleReceiverProducer{}

Expand All @@ -238,7 +238,7 @@ func TestReceiversBuilder_StartAll(t *testing.T) {

func TestReceiversBuilder_StopAll(t *testing.T) {
receivers := make(Receivers)
rcvCfg := &configmodels.ReceiverSettings{}
rcvCfg := &models.ReceiverSettings{}

receiver := &configv2.ExampleReceiverProducer{}

Expand Down
2 changes: 1 addition & 1 deletion cmd/occollector/app/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-service/cmd/occollector/app/builder"
"github.com/open-telemetry/opentelemetry-service/configv2"
"github.com/open-telemetry/opentelemetry-service/consumer"
"github.com/open-telemetry/opentelemetry-service/internal/config/viperutils"
"github.com/open-telemetry/opentelemetry-service/internal/pprofserver"
"github.com/open-telemetry/opentelemetry-service/internal/zpagesserver"
"github.com/open-telemetry/opentelemetry-service/pkg/configv2"
"github.com/open-telemetry/opentelemetry-service/receiver"
)

Expand Down
Loading