diff --git a/component/exporter.go b/component/exporter.go index 4570b25b52f..e8c83af76dd 100644 --- a/component/exporter.go +++ b/component/exporter.go @@ -62,6 +62,12 @@ type MetricsExporter interface { MetricsExporterBase } +// DataExporter is an DataConsumer that is also an Exporter. +type DataExporter interface { + Exporter + consumer.DataConsumer +} + // ExporterFactoryBase defines the common functions for all exporter factories. type ExporterFactoryBase interface { Factory @@ -111,3 +117,18 @@ type ExporterFactory interface { CreateMetricsExporter(ctx context.Context, params ExporterCreateParams, cfg configmodels.Exporter) (MetricsExporter, error) } + +// DataExporterFactory can create Exporter. +type DataExporterFactory interface { + ExporterFactoryBase + + // CreateExporter creates an exporter based on this config. + // If the exporter type does not support the data type or if the config is not valid + // error will be returned instead. + CreateExporter( + ctx context.Context, + params ExporterCreateParams, + dataType configmodels.DataType, + cfg configmodels.Exporter, + ) (DataExporter, error) +} diff --git a/component/processor.go b/component/processor.go index c7bb9733611..0958d4f1054 100644 --- a/component/processor.go +++ b/component/processor.go @@ -66,6 +66,12 @@ type MetricsProcessor interface { MetricsProcessorBase } +// DataProcessor composes DataConsumer with some additional processor-specific functions. +type DataProcessor interface { + Processor + consumer.DataConsumer +} + // ProcessorCapabilities describes the capabilities of a Processor. type ProcessorCapabilities struct { // MutatesConsumedData is set to true if Consume* function of the @@ -131,3 +137,20 @@ type ProcessorFactory interface { CreateMetricsProcessor(ctx context.Context, params ProcessorCreateParams, nextConsumer consumer.MetricsConsumer, cfg configmodels.Processor) (MetricsProcessor, error) } + +// DataProcessorFactory is factory interface for processors. This is the +// new factory type that can create new style processors. +type DataProcessorFactory interface { + ProcessorFactoryBase + + // CreateProcessor creates a processor based on this config. + // If the processor type does not support the dataType or if the config is not valid + // error will be returned instead. + CreateProcessor( + ctx context.Context, + params ProcessorCreateParams, + dataType configmodels.DataType, + cfg configmodels.Processor, + nextConsumer consumer.DataConsumer, + ) (DataProcessor, error) +} diff --git a/component/receiver.go b/component/receiver.go index 4ea05be7e14..b73fd0ff92e 100644 --- a/component/receiver.go +++ b/component/receiver.go @@ -49,6 +49,16 @@ type MetricsReceiver interface { Receiver } +// A DataReceiver is an "arbitrary data"-to-"internal format" converter. +// Its purpose is to translate data from the wild into internal data format. +// DataReceiver feeds a consumer.DataConsumer with data. +// +// For example it could be Zipkin data source which translates +// Zipkin spans into consumerdata.DataData. +type DataReceiver interface { + Receiver +} + // ReceiverFactoryBase defines the common functions for all receiver factories. type ReceiverFactoryBase interface { Factory @@ -117,3 +127,19 @@ type ReceiverFactory interface { CreateMetricsReceiver(ctx context.Context, params ReceiverCreateParams, cfg configmodels.Receiver, nextConsumer consumer.MetricsConsumer) (MetricsReceiver, error) } + +// DataReceiverFactory can create Receiver and MetricsReceiver. +type DataReceiverFactory interface { + ReceiverFactoryBase + + // CreateReceiver creates a receiver based on this config. + // If the receiver type does not support the dataType or if the config is not valid + // error will be returned instead. + CreateReceiver( + ctx context.Context, + params ReceiverCreateParams, + dataType configmodels.DataType, + cfg configmodels.Receiver, + nextConsumer consumer.DataConsumer, + ) (DataReceiver, error) +} diff --git a/config/config.go b/config/config.go index 5e59df4e877..fca916452f5 100644 --- a/config/config.go +++ b/config/config.go @@ -539,17 +539,7 @@ func loadPipelines(v *viper.Viper) (configmodels.Pipelines, error) { var pipelineCfg configmodels.Pipeline // Set the type. - switch typeStr { - case configmodels.TracesDataTypeStr: - pipelineCfg.InputType = configmodels.TracesDataType - case configmodels.MetricsDataTypeStr: - pipelineCfg.InputType = configmodels.MetricsDataType - default: - return nil, &configError{ - code: errInvalidPipelineType, - msg: fmt.Sprintf("invalid pipeline type %q (must be metrics or traces)", typeStr), - } - } + pipelineCfg.InputType = configmodels.DataType(typeStr) pipelineConfig := ViperSub(pipelinesConfig, key) diff --git a/config/config_test.go b/config/config_test.go index b0b2a7a2b8d..1aaf0a5458f 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -100,7 +100,7 @@ func TestDecodeConfig(t *testing.T) { assert.Equal(t, 1, len(config.Processors), "Incorrect processors count") assert.Equal(t, - &ExampleProcessor{ + &ExampleProcessorCfg{ ProcessorSettings: configmodels.ProcessorSettings{ TypeVal: "exampleprocessor", NameVal: "exampleprocessor", @@ -258,7 +258,7 @@ func TestSimpleConfig(t *testing.T) { assert.Equalf(t, 1, len(config.Processors), "TEST[%s]", test.name) assert.Equalf(t, - &ExampleProcessor{ + &ExampleProcessorCfg{ ProcessorSettings: configmodels.ProcessorSettings{ TypeVal: "exampleprocessor", NameVal: "exampleprocessor", @@ -367,7 +367,6 @@ func TestDecodeConfig_Invalid(t *testing.T) { {name: "unknown-processor-type", expected: errUnknownProcessorType}, {name: "invalid-service-extensions-value", expected: errUnmarshalErrorOnService}, {name: "invalid-sequence-value", expected: errUnmarshalErrorOnPipeline}, - {name: "invalid-pipeline-type", expected: errInvalidPipelineType}, {name: "invalid-pipeline-type-and-name", expected: errInvalidTypeAndNameKey}, {name: "duplicate-extension", expected: errDuplicateExtensionName}, {name: "duplicate-receiver", expected: errDuplicateReceiverName}, diff --git a/config/configmodels/configmodels.go b/config/configmodels/configmodels.go index 21be2d643f3..1b9487f60e0 100644 --- a/config/configmodels/configmodels.go +++ b/config/configmodels/configmodels.go @@ -79,37 +79,18 @@ type Processors map[string]Processor // DataType is the data type that is supported for collection. We currently support // collecting metrics and traces, this can expand in the future (e.g. logs, events, etc). -type DataType int + +type DataType string // Currently supported data types. Add new data types here when new types are supported in the future. const ( - _ DataType = iota // skip 0, start types from 1. - // TracesDataType is the data type tag for traces. - TracesDataType + TracesDataType DataType = "traces" // MetricsDataType is the data type tag for metrics. - MetricsDataType + MetricsDataType DataType = "metrics" ) -// Data type strings. -const ( - TracesDataTypeStr = "traces" - MetricsDataTypeStr = "metrics" -) - -// GetString converts data type to string. -func (dataType DataType) GetString() string { - switch dataType { - case TracesDataType: - return TracesDataTypeStr - case MetricsDataType: - return MetricsDataTypeStr - default: - panic("unknown data type") - } -} - // Pipeline defines a single pipeline. type Pipeline struct { Name string `mapstructure:"-"` diff --git a/config/example_factories.go b/config/example_factories.go index 7d37c526bf6..e35772a0b3a 100644 --- a/config/example_factories.go +++ b/config/example_factories.go @@ -25,6 +25,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector/config/configmodels" "github.com/open-telemetry/opentelemetry-collector/consumer" "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" + "github.com/open-telemetry/opentelemetry-collector/internal/data" ) // ExampleReceiver is for testing purposes. We are defining an example config and factory @@ -60,7 +61,8 @@ func (f *ExampleReceiverFactory) CustomUnmarshaler() component.CustomUnmarshaler func (f *ExampleReceiverFactory) CreateDefaultConfig() configmodels.Receiver { return &ExampleReceiver{ ReceiverSettings: configmodels.ReceiverSettings{ - TypeVal: "examplereceiver", + TypeVal: f.Type(), + NameVal: string(f.Type()), Endpoint: "localhost:1000", }, ExtraSetting: "some string", @@ -120,12 +122,36 @@ func (f *ExampleReceiverFactory) CreateMetricsReceiver( return receiver, nil } +func (f *ExampleReceiverFactory) CreateReceiver( + ctx context.Context, + params component.ReceiverCreateParams, + dataType configmodels.DataType, + cfg configmodels.Receiver, + nextConsumer consumer.DataConsumer, +) (component.DataReceiver, error) { + // There must be one receiver for both metrics and traces. We maintain a map of + // receivers per config. + + // Check to see if there is already a receiver for this config. + receiver, ok := exampleReceivers[cfg] + if !ok { + receiver = &ExampleReceiverProducer{} + // Remember the receiver in the map + exampleReceivers[cfg] = receiver + } + receiver.DataConsumer = nextConsumer + + return receiver, nil + +} + // ExampleReceiverProducer allows producing traces and metrics for testing purposes. type ExampleReceiverProducer struct { - TraceConsumer consumer.TraceConsumerOld Started bool Stopped bool + TraceConsumer consumer.TraceConsumerOld MetricsConsumer consumer.MetricsConsumerOld + DataConsumer consumer.DataConsumer } // Start tells the receiver to start its processing. @@ -199,7 +225,8 @@ func (f *MultiProtoReceiverFactory) CustomUnmarshaler() component.CustomUnmarsha // CreateDefaultConfig creates the default configuration for the Receiver. func (f *MultiProtoReceiverFactory) CreateDefaultConfig() configmodels.Receiver { return &MultiProtoReceiver{ - TypeVal: "multireceiver", + TypeVal: f.Type(), + NameVal: string(f.Type()), Protocols: map[string]MultiProtoReceiverOneCfg{ "http": { Endpoint: "example.com:8888", @@ -256,7 +283,10 @@ func (f *ExampleExporterFactory) Type() configmodels.Type { // CreateDefaultConfig creates the default configuration for the Exporter. func (f *ExampleExporterFactory) CreateDefaultConfig() configmodels.Exporter { return &ExampleExporter{ - ExporterSettings: configmodels.ExporterSettings{TypeVal: f.Type()}, + ExporterSettings: configmodels.ExporterSettings{ + TypeVal: f.Type(), + NameVal: string(f.Type()), + }, ExtraSetting: "some export string", ExtraMapSetting: nil, ExtraListSetting: nil, @@ -273,10 +303,23 @@ func (f *ExampleExporterFactory) CreateMetricsExporter(logger *zap.Logger, cfg c return &ExampleExporterConsumer{}, nil } +func (f *ExampleExporterFactory) CreateExporter( + ctx context.Context, + params component.ExporterCreateParams, + dataType configmodels.DataType, + cfg configmodels.Exporter, +) (component.DataExporter, error) { + if dataType == "exampledata" { + return &ExampleExporterConsumer{}, nil + } + return nil, configerror.ErrDataTypeIsNotSupported +} + // ExampleExporterConsumer stores consumed traces and metrics for testing purposes. type ExampleExporterConsumer struct { Traces []consumerdata.TraceData Metrics []consumerdata.MetricsData + Data []data.Custom ExporterStarted bool ExporterShutdown bool } @@ -301,6 +344,19 @@ func (exp *ExampleExporterConsumer) ConsumeMetricsData(ctx context.Context, md c return nil } +func (exp *ExampleExporterConsumer) ConsumeData(ctx context.Context, td data.Custom) error { + exp.Data = append(exp.Data, td) + return nil +} + +type ExampleCustomData struct { + Data string +} + +func (ecd *ExampleCustomData) Clone() data.Custom { + return &ExampleCustomData{ecd.Data} +} + // Name returns the name of the exporter. func (exp *ExampleExporterConsumer) Name() string { return "exampleexporter" @@ -312,9 +368,9 @@ func (exp *ExampleExporterConsumer) Shutdown(context.Context) error { return nil } -// ExampleProcessor is for testing purposes. We are defining an example config and factory +// ExampleProcessorCfg is for testing purposes. We are defining an example config and factory // for "exampleprocessor" processor type. -type ExampleProcessor struct { +type ExampleProcessorCfg struct { configmodels.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct ExtraSetting string `mapstructure:"extra"` ExtraMapSetting map[string]string `mapstructure:"extra_map"` @@ -332,11 +388,14 @@ func (f *ExampleProcessorFactory) Type() configmodels.Type { // CreateDefaultConfig creates the default configuration for the Processor. func (f *ExampleProcessorFactory) CreateDefaultConfig() configmodels.Processor { - return &ExampleProcessor{ - ProcessorSettings: configmodels.ProcessorSettings{}, - ExtraSetting: "some export string", - ExtraMapSetting: nil, - ExtraListSetting: nil, + return &ExampleProcessorCfg{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: f.Type(), + NameVal: string(f.Type()), + }, + ExtraSetting: "some export string", + ExtraMapSetting: nil, + ExtraListSetting: nil, } } @@ -358,6 +417,39 @@ func (f *ExampleProcessorFactory) CreateMetricsProcessor( return nil, configerror.ErrDataTypeIsNotSupported } +func (f *ExampleProcessorFactory) CreateProcessor( + ctx context.Context, + params component.ProcessorCreateParams, + dataType configmodels.DataType, + cfg configmodels.Processor, + nextConsumer consumer.DataConsumer, +) (component.DataProcessor, error) { + if dataType == "exampledata" { + return &ExampleProcessor{nextConsumer}, nil + } + return nil, configerror.ErrDataTypeIsNotSupported +} + +type ExampleProcessor struct { + nextConsumer consumer.DataConsumer +} + +func (ep *ExampleProcessor) Start(ctx context.Context, host component.Host) error { + return nil +} + +func (ep *ExampleProcessor) Shutdown(ctx context.Context) error { + return nil +} + +func (ep *ExampleProcessor) GetCapabilities() component.ProcessorCapabilities { + return component.ProcessorCapabilities{MutatesConsumedData: false} +} + +func (ep *ExampleProcessor) ConsumeData(ctx context.Context, td data.Custom) error { + return ep.nextConsumer.ConsumeData(ctx, td) +} + // ExampleExtensionCfg is for testing purposes. We are defining an example config and factory // for "exampleextension" extension type. type ExampleExtensionCfg struct { @@ -387,10 +479,13 @@ func (f *ExampleExtensionFactory) Type() configmodels.Type { // CreateDefaultConfig creates the default configuration for the Extension. func (f *ExampleExtensionFactory) CreateDefaultConfig() configmodels.Extension { return &ExampleExtensionCfg{ - ExtensionSettings: configmodels.ExtensionSettings{TypeVal: f.Type()}, - ExtraSetting: "extra string setting", - ExtraMapSetting: nil, - ExtraListSetting: nil, + ExtensionSettings: configmodels.ExtensionSettings{ + TypeVal: f.Type(), + NameVal: string(f.Type()), + }, + ExtraSetting: "extra string setting", + ExtraMapSetting: nil, + ExtraListSetting: nil, } } diff --git a/consumer/consumer.go b/consumer/consumer.go index db9278cbd16..7c3cdc99d40 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -20,6 +20,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" "github.com/open-telemetry/opentelemetry-collector/consumer/pdata" + "github.com/open-telemetry/opentelemetry-collector/internal/data" ) // MetricsConsumerBase defines a common interface for MetricsConsumerOld and MetricsConsumer. @@ -60,3 +61,10 @@ type TraceConsumer interface { // ConsumeTraces receives pdata.Traces for processing. ConsumeTraces(ctx context.Context, td pdata.Traces) error } + +// DataConsumer is an interface that receives data.Custom, processes it +// as needed, and sends it to the next processing node if any or to the destination. +type DataConsumer interface { + // ConsumeData receives data.Custom for processing. + ConsumeData(ctx context.Context, td data.Custom) error +} diff --git a/docs/design.md b/docs/design.md index 9e53b595b06..31a325298a9 100644 --- a/docs/design.md +++ b/docs/design.md @@ -139,6 +139,66 @@ When the Collector loads this config the result will look like this: Note that each “queued_retry” processor is an independent instance, although both are configured the same way, i.e. each have a size of 50. +## Custom Data Types + +Collector currently has 2 built-in data types that can flow through it: traces and metrics. +It is possible to define a custom data type, implement components that support this data +type and have pipelines of such components. + +In order to implement a custom data type you will need to do the following. + +1. Choose a name for custom data type that will be used in configuration file. Choose +simple and descriptive names. For example if we wanted to add support for logs in the +Collector we would name the data type "logs". + +2. Design in-memory representation for the custom data type. The in-memory representation +must implement `data.Custom` interface. For example in case of logs we could name it +`data.Logs`. + +3. Implement components that can receive, process or export this data type. You will need +to implement the corresponding interfaces: `DataReceiver`, `DataProcessor`, `DataExporter` +and their factories: `DataReceiverFactory`, `DataProcessorFactory`, `DataExporterFactory`. + +`Data*Factory.Create*()` function accept a `dataType` parameter which tells the factory +what is the data type that the component is supposed to handle. This allows one component +to handle multiple data types if needed (just like today it is possible to have a component +that handles both traces and metrics). + +The `DataReceiver` implementation must emit data of the type that you defined earlier in +step 2 (`data.Logs` in the example). + +`DataProcessor` and `DataExporter` implementation must define `ConsumeData` function +that accepts `data.Custom` parameter and type-casts it to the concrete implementation +of the data type (`data.Logs` in the example). + +For sample code please see `ExampleReceiver`, `ExampleProcessor`, `ExampleExporter` in +`example_factories.go`. + +4. Once all the above is done the end-user can use these components. The usage is exactly +the same as for builtin type. In the configuration file the user must define a pipeline +with the custom data type. For example (assuming "logs" data type example): + +```yaml +receivers: + syslog: + +processors: + log_filter: + # this is a hypothetical processor that can filter log custom data type + # based on user-specified filer condition. + include: severity == "error" + +exporters: + syslog: + +pipelines: + logs: + receivers: [syslog] + processors: [log_filter] + exporters: [syslog] +``` + + ## Running as an Agent On a typical VM/container, there are user applications running in some diff --git a/internal/data/custom.go b/internal/data/custom.go new file mode 100644 index 00000000000..0d42303eb73 --- /dev/null +++ b/internal/data/custom.go @@ -0,0 +1,18 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package data + +type Custom interface { + Clone() Custom +} diff --git a/processor/cloningfanoutconnector.go b/processor/cloningfanoutconnector.go index 68f06057575..3e853b6582f 100644 --- a/processor/cloningfanoutconnector.go +++ b/processor/cloningfanoutconnector.go @@ -28,6 +28,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector/consumer/converter" "github.com/open-telemetry/opentelemetry-collector/consumer/pdata" "github.com/open-telemetry/opentelemetry-collector/consumer/pdatautil" + "github.com/open-telemetry/opentelemetry-collector/internal/data" ) // This file contains implementations of cloning Trace/Metrics connectors @@ -215,6 +216,39 @@ func (tfc traceCloningFanOutConnector) ConsumeTraces(ctx context.Context, td pda return componenterror.CombineErrors(errs) } +// NewTraceCloningFanOutConnector wraps multiple trace consumers in a single one. +func NewCloningFanOutConnector(tcs []consumer.DataConsumer) consumer.DataConsumer { + return CloningFanOutConnector(tcs) +} + +type CloningFanOutConnector []consumer.DataConsumer + +var _ consumer.DataConsumer = (*CloningFanOutConnector)(nil) + +// ConsumeData exports the span data to all consumers wrapped by the current one. +func (tfc CloningFanOutConnector) ConsumeData(ctx context.Context, td data.Custom) error { + var errs []error + + // Fan out to first len-1 consumers. + for i := 0; i < len(tfc)-1; i++ { + // Create a clone of data. We need to clone because consumers may modify the data. + clone := td.Clone() + if err := tfc[i].ConsumeData(ctx, clone); err != nil { + errs = append(errs, err) + } + } + + if len(tfc) > 0 { + // Give the original data to the last consumer. + lastTc := tfc[len(tfc)-1] + if err := lastTc.ConsumeData(ctx, td); err != nil { + errs = append(errs, err) + } + } + + return componenterror.CombineErrors(errs) +} + func cloneTraceDataOld(td consumerdata.TraceData) consumerdata.TraceData { clone := consumerdata.TraceData{ SourceFormat: td.SourceFormat, diff --git a/processor/fanoutconnector.go b/processor/fanoutconnector.go index 5aeebb3cdd4..78fe91506ba 100644 --- a/processor/fanoutconnector.go +++ b/processor/fanoutconnector.go @@ -22,6 +22,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" "github.com/open-telemetry/opentelemetry-collector/consumer/converter" "github.com/open-telemetry/opentelemetry-collector/consumer/pdata" + "github.com/open-telemetry/opentelemetry-collector/internal/data" ) // This file contains implementations of Trace/Metrics connectors @@ -154,3 +155,23 @@ func (tfc traceFanOutConnector) ConsumeTraces(ctx context.Context, td pdata.Trac } return componenterror.CombineErrors(errs) } + +// NewFanOutConnector wraps multiple new type consumers in a single one. +func NewFanOutConnector(tcs []consumer.DataConsumer) consumer.DataConsumer { + return FanOutConnector(tcs) +} + +type FanOutConnector []consumer.DataConsumer + +var _ consumer.DataConsumer = (*FanOutConnector)(nil) + +// Consume exports the span data to all consumers wrapped by the current one. +func (tfc FanOutConnector) ConsumeData(ctx context.Context, td data.Custom) error { + var errs []error + for _, tc := range tfc { + if err := tc.ConsumeData(ctx, td); err != nil { + errs = append(errs, err) + } + } + return componenterror.CombineErrors(errs) +} diff --git a/service/builder/exporters_builder.go b/service/builder/exporters_builder.go index 65e3a59cb9a..af226934a05 100644 --- a/service/builder/exporters_builder.go +++ b/service/builder/exporters_builder.go @@ -32,6 +32,7 @@ type builtExporter struct { logger *zap.Logger te component.TraceExporterBase me component.MetricsExporterBase + de map[configmodels.DataType]component.DataExporter } // Start the exporter. @@ -227,6 +228,7 @@ func (eb *ExportersBuilder) buildExporter( exporter := &builtExporter{ logger: logger, + de: make(map[configmodels.DataType]component.DataExporter), } inputDataTypes := exportersInputDataTypes[config] @@ -239,43 +241,60 @@ func (eb *ExportersBuilder) buildExporter( return exporter, nil } - if requirement, ok := inputDataTypes[configmodels.TracesDataType]; ok { - // Traces data type is required. Create a trace exporter based on config. - te, err := createTraceExporter(factory, logger, config) - if err != nil { - if err == configerror.ErrDataTypeIsNotSupported { - // Could not create because this exporter does not support this data type. - return nil, typeMismatchErr(config, requirement.requiredBy, configmodels.TracesDataType) + for dataType, requirement := range inputDataTypes { + + if dataType == configmodels.TracesDataType { + // Traces data type is required. Create a trace exporter based on config. + te, err := createTraceExporter(factory, logger, config) + if err != nil { + if err == configerror.ErrDataTypeIsNotSupported { + // Could not create because this exporter does not support this data type. + return nil, exporterTypeMismatchErr(config, requirement.requiredBy, configmodels.TracesDataType) + } + return nil, fmt.Errorf("error creating %s exporter: %v", config.Name(), err) } - return nil, fmt.Errorf("error creating %s exporter: %v", config.Name(), err) - } - // Check if the factory really created the exporter. - if te == nil { - return nil, fmt.Errorf("factory for %q produced a nil exporter", config.Name()) - } + // Check if the factory really created the exporter. + if te == nil { + return nil, fmt.Errorf("factory for %q produced a nil exporter", config.Name()) + } - exporter.te = te - } + exporter.te = te + } else if dataType == configmodels.MetricsDataType { + // Metrics data type is required. Create a trace exporter based on config. + me, err := createMetricsExporter(factory, logger, config) + if err != nil { + if err == configerror.ErrDataTypeIsNotSupported { + // Could not create because this exporter does not support this data type. + return nil, exporterTypeMismatchErr(config, requirement.requiredBy, configmodels.MetricsDataType) + } + return nil, fmt.Errorf("error creating %s exporter: %v", config.Name(), err) + } - if requirement, ok := inputDataTypes[configmodels.MetricsDataType]; ok { - // Metrics data type is required. Create a trace exporter based on config. - me, err := createMetricsExporter(factory, logger, config) - if err != nil { - if err == configerror.ErrDataTypeIsNotSupported { - // Could not create because this exporter does not support this data type. - return nil, typeMismatchErr(config, requirement.requiredBy, configmodels.MetricsDataType) + // The factories can be implemented by third parties, check if they really + // created the exporter. + if me == nil { + return nil, fmt.Errorf("factory for %q produced a nil exporter", config.Name()) } - return nil, fmt.Errorf("error creating %s exporter: %v", config.Name(), err) - } - // The factories can be implemented by third parties, check if they really - // created the exporter. - if me == nil { - return nil, fmt.Errorf("factory for %q produced a nil exporter", config.Name()) - } + exporter.me = me + } else { + de, err := createDataExporter(factory, logger, dataType, config) + if err != nil { + if err == configerror.ErrDataTypeIsNotSupported { + // Could not create because this exporter does not support this data type. + return nil, exporterTypeMismatchErr(config, requirement.requiredBy, dataType) + } + return nil, fmt.Errorf("error creating %s exporter: %v", config.Name(), err) + } - exporter.me = me + // Check if the factory really created the exporter. + if de == nil { + return nil, fmt.Errorf("factory for %q produced a nil exporter", config.Name()) + } + + exporter.de[dataType] = de + } } eb.logger.Info("Exporter is enabled.", zap.String("exporter", config.Name())) @@ -283,14 +302,14 @@ func (eb *ExportersBuilder) buildExporter( return exporter, nil } -func typeMismatchErr( +func exporterTypeMismatchErr( config configmodels.Exporter, requiredByPipeline *configmodels.Pipeline, dataType configmodels.DataType, ) error { - return fmt.Errorf("%s is a %s pipeline but has a %s which does not support %s", - requiredByPipeline.Name, dataType.GetString(), - config.Name(), dataType.GetString(), + return fmt.Errorf("pipeline %q of data type %q has an exporter %q, which does not support that data type", + requiredByPipeline.Name, dataType, + config.Name(), ) } @@ -332,3 +351,22 @@ func createMetricsExporter(factoryBase component.ExporterFactoryBase, // use ExporterFactoryOld.CreateMetricsExporter. return factoryBase.(component.ExporterFactoryOld).CreateMetricsExporter(logger, cfg) } + +// createDataProcessor creates a data exporter based on provided factory type. +func createDataExporter( + factoryBase component.ExporterFactoryBase, + logger *zap.Logger, + dataType configmodels.DataType, + cfg configmodels.Exporter, +) (component.DataExporter, error) { + if factory, ok := factoryBase.(component.DataExporterFactory); ok { + creationParams := component.ExporterCreateParams{Logger: logger} + ctx := context.Background() + + // If exporter is of the new type (can manipulate on internal data structure), + // use ExporterFactory.CreateDataExporter. + return factory.CreateExporter(ctx, creationParams, dataType, cfg) + } + + return nil, fmt.Errorf("exporter %q does not support data type %q", factoryBase.Type(), dataType) +} diff --git a/service/builder/exporters_builder_test.go b/service/builder/exporters_builder_test.go index 3ac11df4390..83327982861 100644 --- a/service/builder/exporters_builder_test.go +++ b/service/builder/exporters_builder_test.go @@ -103,6 +103,70 @@ func TestExportersBuilder_Build(t *testing.T) { // TODO: once we have an exporter that supports metrics data type test it too. } +func TestExportersBuilder_BuildCustom(t *testing.T) { + factories, err := config.ExampleComponents() + assert.Nil(t, err) + + cfg := &configmodels.Config{ + Exporters: map[string]configmodels.Exporter{ + "exampleexporter": &config.ExampleExporter{ + ExporterSettings: configmodels.ExporterSettings{ + NameVal: "exampleexporter", + TypeVal: "exampleexporter", + }, + }, + }, + + Service: configmodels.Service{ + Pipelines: map[string]*configmodels.Pipeline{ + "exampledata": { + Name: "exampledata", + InputType: "exampledata", + Exporters: []string{"exampleexporter"}, + }, + }, + }, + } + + exporters, err := NewExportersBuilder(zap.NewNop(), cfg, factories.Exporters).Build() + + assert.NoError(t, err) + require.NotNil(t, exporters) + + e1 := exporters[cfg.Exporters["exampleexporter"]] + + // Ensure exporter has its fields correctly populated. + require.NotNil(t, e1) + assert.Equal(t, 1, len(e1.de)) + assert.Nil(t, e1.te) + assert.Nil(t, e1.me) + + // Ensure it can be started. + err = exporters.StartAll(context.Background(), componenttest.NewNopHost()) + assert.NoError(t, err) + + // Ensure it can be stopped. + err = e1.Shutdown(context.Background()) + assert.NoError(t, err) + + // Remove the pipeline so that the exporter is not attached to any pipeline. + // This should result in creating an exporter that has none of consumption + // functions set. + delete(cfg.Service.Pipelines, "exampledata") + exporters, err = NewExportersBuilder(zap.NewNop(), cfg, factories.Exporters).Build() + assert.NotNil(t, exporters) + assert.Nil(t, err) + + e1 = exporters[cfg.Exporters["exampleexporter"]] + + // Ensure exporter has its fields correctly populated, ie Trace Exporter and + // Metrics Exporter are nil. + require.NotNil(t, e1) + assert.Nil(t, e1.te) + assert.Nil(t, e1.me) + assert.Equal(t, 0, len(e1.de)) +} + func TestExportersBuilder_StartAll(t *testing.T) { exporters := make(Exporters) expCfg := &configmodels.ExporterSettings{} diff --git a/service/builder/pipelines_builder.go b/service/builder/pipelines_builder.go index 38216f209be..0ddc4e5bcf9 100644 --- a/service/builder/pipelines_builder.go +++ b/service/builder/pipelines_builder.go @@ -35,6 +35,7 @@ type builtPipeline struct { logger *zap.Logger firstTC consumer.TraceConsumerBase firstMC consumer.MetricsConsumerBase + firstDC consumer.DataConsumer // MutatesConsumedData is set to true if any processors in the pipeline // can mutate the TraceData or MetricsData input argument. @@ -126,12 +127,15 @@ func (pb *PipelinesBuilder) buildPipeline(pipelineCfg *configmodels.Pipeline, // First create a consumer junction point that fans out the data to all exporters. var tc consumer.TraceConsumerBase var mc consumer.MetricsConsumerBase + var dc consumer.DataConsumer switch pipelineCfg.InputType { case configmodels.TracesDataType: tc = pb.buildFanoutExportersTraceConsumer(pipelineCfg.Exporters) case configmodels.MetricsDataType: mc = pb.buildFanoutExportersMetricsConsumer(pipelineCfg.Exporters) + default: + dc = pb.buildFanoutExportersConsumer(pipelineCfg.InputType, pipelineCfg.Exporters) } mutatesConsumedData := false @@ -170,6 +174,15 @@ func (pb *PipelinesBuilder) buildPipeline(pipelineCfg *configmodels.Pipeline, } processors[i] = proc mc = proc + + default: + var proc component.DataProcessor + proc, err = createProcessor(factory, componentLogger, pipelineCfg.InputType, procCfg, dc) + if proc != nil { + mutatesConsumedData = mutatesConsumedData || proc.GetCapabilities().MutatesConsumedData + } + processors[i] = proc + dc = proc } if err != nil { @@ -178,19 +191,20 @@ func (pb *PipelinesBuilder) buildPipeline(pipelineCfg *configmodels.Pipeline, } // Check if the factory really created the processor. - if tc == nil && mc == nil { + if tc == nil && mc == nil && dc == nil { return nil, fmt.Errorf("factory for %q produced a nil processor", procCfg.Name()) } } pipelineLogger := pb.logger.With(zap.String("pipeline_name", pipelineCfg.Name), - zap.String("pipeline_datatype", pipelineCfg.InputType.GetString())) + zap.String("pipeline_datatype", string(pipelineCfg.InputType))) pipelineLogger.Info("Pipeline is enabled.") bp := &builtPipeline{ pipelineLogger, tc, mc, + dc, mutatesConsumedData, processors, } @@ -243,6 +257,26 @@ func (pb *PipelinesBuilder) buildFanoutExportersMetricsConsumer(exporterNames [] return processor.CreateMetricsFanOutConnector(exporters) } +func (pb *PipelinesBuilder) buildFanoutExportersConsumer( + dataType configmodels.DataType, + exporterNames []string, +) consumer.DataConsumer { + builtExporters := pb.getBuiltExportersByNames(exporterNames) + + // Optimize for the case when there is only one exporter, no need to create junction point. + if len(builtExporters) == 1 { + return builtExporters[0].de[dataType] + } + + var exporters []consumer.DataConsumer + for _, builtExp := range builtExporters { + exporters = append(exporters, builtExp.de[dataType]) + } + + // Create a junction point that fans out to all exporters. + return processor.NewFanOutConnector(exporters) +} + // createTraceProcessor creates trace processor based on type of the current processor // and type of the downstream consumer. func createTraceProcessor( @@ -318,3 +352,30 @@ func createMetricsProcessor( metricsConverter := converter.NewOCToInternalMetricsConverter(nextConsumer.(consumer.MetricsConsumer)) return factoryOld.CreateMetricsProcessor(logger, metricsConverter, cfg) } + +// createProcessor creates processor based on type of the current processor +// and type of the downstream consumer. +func createProcessor( + factoryBase component.ProcessorFactoryBase, + logger *zap.Logger, + dataType configmodels.DataType, + cfg configmodels.Processor, + nextConsumer consumer.DataConsumer, +) (component.DataProcessor, error) { + if factory, ok := factoryBase.(component.DataProcessorFactory); ok { + creationParams := component.ProcessorCreateParams{Logger: logger} + ctx := context.Background() + + // If both processor and consumer are of the new type (can manipulate on internal data structure), + // use ProcessorFactory.CreateMetricsProcessor. + if dataConsumer, ok := nextConsumer.(consumer.DataConsumer); ok { + return factory.CreateProcessor(ctx, creationParams, dataType, cfg, dataConsumer) + } + + return nil, fmt.Errorf("processor %q is attached to a pipeline with a component that does accept data type %q", + cfg.Name(), dataType) + } + + return nil, fmt.Errorf("processor %q does support data type %q", + cfg.Name(), dataType) +} diff --git a/service/builder/pipelines_builder_test.go b/service/builder/pipelines_builder_test.go index dee3be6d350..efb13bbdb24 100644 --- a/service/builder/pipelines_builder_test.go +++ b/service/builder/pipelines_builder_test.go @@ -62,6 +62,146 @@ func TestPipelinesBuilder_Build(t *testing.T) { } } +func createExampleFactories() config.Factories { + exampleReceiverFactory := &config.ExampleReceiverFactory{} + exampleProcessorFactory := &config.ExampleProcessorFactory{} + exampleExporterFactory := &config.ExampleExporterFactory{} + + factories := config.Factories{ + Receivers: map[configmodels.Type]component.ReceiverFactoryBase{ + exampleReceiverFactory.Type(): exampleReceiverFactory, + }, + Processors: map[configmodels.Type]component.ProcessorFactoryBase{ + exampleProcessorFactory.Type(): exampleProcessorFactory, + }, + Exporters: map[configmodels.Type]component.ExporterFactoryBase{ + exampleExporterFactory.Type(): exampleExporterFactory, + }, + } + + return factories +} + +func createExampleConfig(dataType string) *configmodels.Config { + + exampleReceiverFactory := &config.ExampleReceiverFactory{} + exampleProcessorFactory := &config.ExampleProcessorFactory{} + exampleExporterFactory := &config.ExampleExporterFactory{} + + cfg := &configmodels.Config{ + Receivers: map[string]configmodels.Receiver{ + string(exampleReceiverFactory.Type()): exampleReceiverFactory.CreateDefaultConfig(), + }, + Processors: map[string]configmodels.Processor{ + string(exampleProcessorFactory.Type()): exampleProcessorFactory.CreateDefaultConfig(), + }, + Exporters: map[string]configmodels.Exporter{ + string(exampleExporterFactory.Type()): exampleExporterFactory.CreateDefaultConfig(), + }, + Service: configmodels.Service{ + Pipelines: map[string]*configmodels.Pipeline{ + dataType: { + Name: dataType, + InputType: configmodels.DataType(dataType), + Receivers: []string{string(exampleReceiverFactory.Type())}, + Processors: []string{string(exampleProcessorFactory.Type())}, + Exporters: []string{string(exampleExporterFactory.Type())}, + }, + }, + }, + } + return cfg +} + +func TestPipelinesBuilder_BuildCustom(t *testing.T) { + + factories := createExampleFactories() + + tests := []struct { + dataType string + shouldFail bool + }{ + { + dataType: "exampledata", + shouldFail: false, + }, + { + dataType: "nosuchdatatype", + shouldFail: true, + }, + } + + for _, test := range tests { + t.Run(test.dataType, func(t *testing.T) { + dataType := test.dataType + + cfg := createExampleConfig(dataType) + + // BuildProcessors the pipeline + allExporters, err := NewExportersBuilder(zap.NewNop(), cfg, factories.Exporters).Build() + if test.shouldFail { + assert.Error(t, err) + return + } + + require.NoError(t, err) + require.EqualValues(t, 1, len(allExporters)) + pipelineProcessors, err := NewPipelinesBuilder(zap.NewNop(), cfg, allExporters, factories.Processors).Build() + + assert.NoError(t, err) + require.NotNil(t, pipelineProcessors) + + err = pipelineProcessors.StartProcessors(context.Background(), componenttest.NewNopHost()) + assert.NoError(t, err) + + pipelineName := dataType + processor := pipelineProcessors[cfg.Service.Pipelines[pipelineName]] + + // Ensure pipeline has its fields correctly populated. + require.NotNil(t, processor) + assert.Nil(t, processor.firstTC) + assert.Nil(t, processor.firstMC) + assert.NotNil(t, processor.firstDC) + + // Compose the list of created exporters. + exporterNames := []string{"exampleexporter"} + var exporters []*builtExporter + for _, name := range exporterNames { + // Ensure exporter is created. + exp := allExporters[cfg.Exporters[name]] + require.NotNil(t, exp) + exporters = append(exporters, exp) + } + + // Send TraceData via processor and verify that all exporters of the pipeline receive it. + + // First check that there are no traces in the exporters yet. + var exporterConsumers []*config.ExampleExporterConsumer + for _, exporter := range exporters { + consumer := exporter.de[configmodels.DataType(dataType)].(*config.ExampleExporterConsumer) + exporterConsumers = append(exporterConsumers, consumer) + require.Equal(t, len(consumer.Data), 0) + } + + // Send one custom data. + data := &config.ExampleCustomData{Data: "testdata"} + processor.firstDC.(consumer.DataConsumer).ConsumeData(context.Background(), data) + + // Now verify received data. + for _, consumer := range exporterConsumers { + // Check that the trace is received by exporter. + require.Equal(t, 1, len(consumer.Data)) + + // Verify that span is successfully delivered. + assert.EqualValues(t, data, consumer.Data[0]) + } + + err = pipelineProcessors.ShutdownProcessors(context.Background()) + assert.NoError(t, err) + }) + } +} + func assertEqualTraceData(t *testing.T, expected consumerdata.TraceData, actual consumerdata.TraceData) { assert.True(t, proto.Equal(expected.Resource, actual.Resource)) assert.True(t, proto.Equal(expected.Node, actual.Node)) diff --git a/service/builder/receivers_builder.go b/service/builder/receivers_builder.go index 91361b827ca..d26ed167641 100644 --- a/service/builder/receivers_builder.go +++ b/service/builder/receivers_builder.go @@ -152,6 +152,10 @@ func (rb *ReceiversBuilder) findPipelinesToAttach(config configmodels.Receiver) // Is this receiver attached to the pipeline? if hasReceiver(pipelineCfg, config.Name()) { + if _, exists := pipelinesToAttach[pipelineCfg.InputType]; !exists { + pipelinesToAttach[pipelineCfg.InputType] = make([]*builtPipeline, 0) + } + // Yes, add it to the list of pipelines of corresponding data type. pipelinesToAttach[pipelineCfg.InputType] = append(pipelinesToAttach[pipelineCfg.InputType], pipelineProcessor) @@ -186,6 +190,10 @@ func (rb *ReceiversBuilder) attachReceiverToPipelines( case configmodels.MetricsDataType: junction := buildFanoutMetricConsumer(builtPipelines) createdReceiver, err = createMetricsReceiver(context.Background(), factory, logger, config, junction) + + default: + junction := buildFanoutConsumer(builtPipelines) + createdReceiver, err = createReceiver(context.Background(), factory, logger, dataType, config, junction) } if err != nil { @@ -194,8 +202,8 @@ func (rb *ReceiversBuilder) attachReceiverToPipelines( "receiver %s does not support %s but it was used in a "+ "%s pipeline", config.Name(), - dataType.GetString(), - dataType.GetString()) + dataType, + dataType) } return fmt.Errorf("cannot create receiver %s: %s", config.Name(), err.Error()) } @@ -220,7 +228,7 @@ func (rb *ReceiversBuilder) attachReceiverToPipelines( } rcv.receiver = createdReceiver - logger.Info("Receiver is enabled.", zap.String("datatype", dataType.GetString())) + logger.Info("Receiver is enabled.", zap.String("datatype", string(dataType))) return nil } @@ -314,6 +322,31 @@ func buildFanoutMetricConsumer(pipelines []*builtPipeline) consumer.MetricsConsu return processor.CreateMetricsFanOutConnector(pipelineConsumers) } +func buildFanoutConsumer(pipelines []*builtPipeline) consumer.DataConsumer { + // Optimize for the case when there is only one processor, no need to create junction point. + if len(pipelines) == 1 { + return pipelines[0].firstDC + } + + var pipelineConsumers []consumer.DataConsumer + anyPipelineMutatesData := false + for _, pipeline := range pipelines { + pipelineConsumers = append(pipelineConsumers, pipeline.firstDC) + anyPipelineMutatesData = anyPipelineMutatesData || pipeline.MutatesConsumedData + } + + // Create a junction point that fans out to all pipelines. + if anyPipelineMutatesData { + // If any pipeline mutates data use a cloning fan out connector + // so that it is safe to modify fanned out data. + // TODO: if there are more than 2 pipelines only clone data for pipelines that + // declare the intent to mutate the data. Pipelines that do not mutate the data + // can consume shared data. + return processor.NewCloningFanOutConnector(pipelineConsumers) + } + return processor.NewFanOutConnector(pipelineConsumers) +} + // createTraceReceiver is a helper function that creates trace receiver based on the current receiver type // and type of the next consumer. func createTraceReceiver( @@ -389,3 +422,28 @@ func createMetricsReceiver( metricsConverter := converter.NewOCToInternalMetricsConverter(nextConsumer.(consumer.MetricsConsumer)) return factoryOld.CreateMetricsReceiver(logger, cfg, metricsConverter) } + +// createMetricsReceiver is a helper function that creates metric receiver based +// on the current receiver type and type of the next consumer. +func createReceiver( + ctx context.Context, + factoryBase component.ReceiverFactoryBase, + logger *zap.Logger, + dataType configmodels.DataType, + cfg configmodels.Receiver, + nextConsumer consumer.DataConsumer, +) (component.DataReceiver, error) { + if factory, ok := factoryBase.(component.DataReceiverFactory); ok { + creationParams := component.ReceiverCreateParams{Logger: logger} + + if nc, ok := nextConsumer.(consumer.DataConsumer); ok { + return factory.CreateReceiver(ctx, creationParams, dataType, cfg, nc) + } + + return nil, fmt.Errorf("receiver %q is attached to a pipeline with a component that does accept data type %q", + cfg.Name(), dataType) + } + + return nil, fmt.Errorf("receiver %q does support data type %q", + cfg.Name(), dataType) +} diff --git a/service/builder/receivers_builder_test.go b/service/builder/receivers_builder_test.go index 8c3eea3306d..0ef9c66969d 100644 --- a/service/builder/receivers_builder_test.go +++ b/service/builder/receivers_builder_test.go @@ -181,6 +181,92 @@ func testReceivers( } } +func TestReceiversBuilder_BuildCustom(t *testing.T) { + factories := createExampleFactories() + + tests := []struct { + dataType string + shouldFail bool + }{ + { + dataType: "exampledata", + shouldFail: false, + }, + { + dataType: "nosuchdatatype", + shouldFail: true, + }, + } + + for _, test := range tests { + t.Run(test.dataType, func(t *testing.T) { + dataType := test.dataType + + cfg := createExampleConfig(dataType) + + // Build the pipeline + allExporters, err := NewExportersBuilder(zap.NewNop(), cfg, factories.Exporters).Build() + if test.shouldFail { + assert.Error(t, err) + return + } + + assert.NoError(t, err) + pipelineProcessors, err := NewPipelinesBuilder(zap.NewNop(), cfg, allExporters, factories.Processors).Build() + assert.NoError(t, err) + receivers, err := NewReceiversBuilder(zap.NewNop(), cfg, pipelineProcessors, factories.Receivers).Build() + + assert.NoError(t, err) + require.NotNil(t, receivers) + + receiver := receivers[cfg.Receivers["examplereceiver"]] + + // Ensure receiver has its fields correctly populated. + require.NotNil(t, receiver) + + assert.NotNil(t, receiver.receiver) + + // Compose the list of created exporters. + exporterNames := []string{"exampleexporter"} + var exporters []*builtExporter + for _, name := range exporterNames { + // Ensure exporter is created. + exp := allExporters[cfg.Exporters[name]] + require.NotNil(t, exp) + exporters = append(exporters, exp) + } + + // Send Data via receiver and verify that all exporters of the pipeline receive it. + + // First check that there are no traces in the exporters yet. + for _, exporter := range exporters { + for _, exp := range exporter.de { + consumer := exp.(*config.ExampleExporterConsumer) + require.Equal(t, len(consumer.Data), 0) + } + } + + // Send one data. + data := &config.ExampleCustomData{Data: "testdata"} + producer := receiver.receiver.(*config.ExampleReceiverProducer) + producer.DataConsumer.ConsumeData(context.Background(), data) + + // Now verify received data. + for _, name := range exporterNames { + // Check that the data is received by exporter. + exporter := allExporters[cfg.Exporters[name]] + + // Validate exported data. + for _, exp := range exporter.de { + consumer := exp.(*config.ExampleExporterConsumer) + require.Equal(t, 1, len(consumer.Data)) + assert.EqualValues(t, data, consumer.Data[0]) + } + } + }) + } +} + func TestReceiversBuilder_DataTypeError(t *testing.T) { factories, err := config.ExampleComponents() assert.NoError(t, err) diff --git a/service/builder/testdata/pipelines_builder.yaml b/service/builder/testdata/pipelines_builder.yaml index b303bb9bddd..d5a1f0762d8 100644 --- a/service/builder/testdata/pipelines_builder.yaml +++ b/service/builder/testdata/pipelines_builder.yaml @@ -38,3 +38,7 @@ service: metrics/3: receivers: [examplereceiver/3] exporters: [exampleexporter/2] + + exampledata: + receivers: [examplereceiver/3] + exporters: [exampleexporter/2] diff --git a/service/service.go b/service/service.go index 181302ba2e7..c93e8ea8c88 100644 --- a/service/service.go +++ b/service/service.go @@ -520,7 +520,7 @@ func (app *Application) getPipelinesSummaryTableData() internal.SummaryPipelines for c, p := range app.builtPipelines { row := internal.SummaryPipelinesTableRowData{ FullName: c.Name, - InputType: c.InputType.GetString(), + InputType: string(c.InputType), MutatesConsumedData: p.MutatesConsumedData, Receivers: c.Receivers, Processors: c.Processors,