diff --git a/component/exporter.go b/component/exporter.go index 4570b25b52f..66c4dabc3e9 100644 --- a/component/exporter.go +++ b/component/exporter.go @@ -23,7 +23,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector/consumer" ) -// Exporter defines functions that trace and metric exporters must implement. +// Exporter defines functions that all exporters must implement. type Exporter interface { Component } @@ -33,13 +33,13 @@ type TraceExporterBase interface { Exporter } -// TraceExporterOld is a TraceConsumer that is also an Exporter. +// TraceExporterOld is a TraceExporter that can consume old-style traces. type TraceExporterOld interface { consumer.TraceConsumerOld TraceExporterBase } -// TraceExporter is an TraceConsumer that is also an Exporter. +// TraceExporter is a TraceExporter that can consume new-style traces. type TraceExporter interface { consumer.TraceConsumer TraceExporterBase @@ -50,18 +50,24 @@ type MetricsExporterBase interface { Exporter } -// MetricsExporterOld is a MetricsConsumer that is also an Exporter. +// MetricsExporterOld is a TraceExporter that can consume old-style metrics. type MetricsExporterOld interface { consumer.MetricsConsumerOld MetricsExporterBase } -// MetricsExporter is a MetricsConsumer that is also an Exporter. +// MetricsExporter is a TraceExporter that can consume new-style metrics. type MetricsExporter interface { consumer.MetricsConsumer MetricsExporterBase } +// LogExporter is a LogConsumer that is also an Exporter. +type LogExporter interface { + Exporter + consumer.LogConsumer +} + // ExporterFactoryBase defines the common functions for all exporter factories. type ExporterFactoryBase interface { Factory @@ -87,7 +93,7 @@ type ExporterFactoryOld interface { CreateMetricsExporter(logger *zap.Logger, cfg configmodels.Exporter) (MetricsExporterOld, error) } -// ExporterCreateParams is passed to ExporterFactory.Create* functions. +// ExporterCreateParams is passed to Create*Exporter functions. type ExporterCreateParams struct { // Logger that the factory can use during creation and can pass to the created // component to be used later as well. @@ -111,3 +117,17 @@ type ExporterFactory interface { CreateMetricsExporter(ctx context.Context, params ExporterCreateParams, cfg configmodels.Exporter) (MetricsExporter, error) } + +// LogExporterFactory can create a LogExporter. +type LogExporterFactory interface { + ExporterFactoryBase + + // CreateLogExporter creates an exporter based on the config. + // If the exporter type does not support logs or if the config is not valid + // error will be returned instead. + CreateLogExporter( + ctx context.Context, + params ExporterCreateParams, + cfg configmodels.Exporter, + ) (LogExporter, error) +} diff --git a/component/processor.go b/component/processor.go index c7bb9733611..32e0b34a615 100644 --- a/component/processor.go +++ b/component/processor.go @@ -37,13 +37,13 @@ type TraceProcessorBase interface { Processor } -// TraceProcessorOld composes TraceConsumer with some additional processor-specific functions. +// TraceProcessorOld is a processor that can consume old-style traces. type TraceProcessorOld interface { consumer.TraceConsumerOld TraceProcessorBase } -// TraceProcessor composes TraceConsumer with some additional processor-specific functions. +// TraceProcessor is a processor that can consume traces. type TraceProcessor interface { consumer.TraceConsumer TraceProcessorBase @@ -54,18 +54,24 @@ type MetricsProcessorBase interface { Processor } -// MetricsProcessor composes MetricsConsumer with some additional processor-specific functions. +// MetricsProcessor is a processor that can consume old-style metrics. type MetricsProcessorOld interface { consumer.MetricsConsumerOld MetricsProcessorBase } -// MetricsProcessor composes MetricsConsumer with some additional processor-specific functions. +// MetricsProcessor is a processor that can consume metrics. type MetricsProcessor interface { consumer.MetricsConsumer MetricsProcessorBase } +// LogProcessor is a processor that can consume logs. +type LogProcessor interface { + Processor + consumer.LogConsumer +} + // ProcessorCapabilities describes the capabilities of a Processor. type ProcessorCapabilities struct { // MutatesConsumedData is set to true if Consume* function of the @@ -131,3 +137,18 @@ type ProcessorFactory interface { CreateMetricsProcessor(ctx context.Context, params ProcessorCreateParams, nextConsumer consumer.MetricsConsumer, cfg configmodels.Processor) (MetricsProcessor, error) } + +// LogProcessorFactory can create LogProcessor. +type LogProcessorFactory interface { + ProcessorFactoryBase + + // CreateLogProcessor creates a processor based on the config. + // If the processor type does not support logs or if the config is not valid + // error will be returned instead. + CreateLogProcessor( + ctx context.Context, + params ProcessorCreateParams, + cfg configmodels.Processor, + nextConsumer consumer.LogConsumer, + ) (LogProcessor, error) +} diff --git a/component/receiver.go b/component/receiver.go index 4ea05be7e14..7cd742f4f1d 100644 --- a/component/receiver.go +++ b/component/receiver.go @@ -49,6 +49,13 @@ type MetricsReceiver interface { Receiver } +// A LogReceiver is a "log data"-to-"internal format" converter. +// Its purpose is to translate data from the wild into internal data format. +// LogReceiver feeds a consumer.LogConsumer with data. +type LogReceiver interface { + Receiver +} + // ReceiverFactoryBase defines the common functions for all receiver factories. type ReceiverFactoryBase interface { Factory @@ -117,3 +124,18 @@ type ReceiverFactory interface { CreateMetricsReceiver(ctx context.Context, params ReceiverCreateParams, cfg configmodels.Receiver, nextConsumer consumer.MetricsConsumer) (MetricsReceiver, error) } + +// LogReceiverFactory can create a LogReceiver. +type LogReceiverFactory interface { + ReceiverFactoryBase + + // CreateLogReceiver creates a log receiver based on this config. + // If the receiver type does not support the data type or if the config is not valid + // error will be returned instead. + CreateLogReceiver( + ctx context.Context, + params ReceiverCreateParams, + cfg configmodels.Receiver, + nextConsumer consumer.LogConsumer, + ) (LogReceiver, error) +} diff --git a/config/config.go b/config/config.go index 5e59df4e877..09a388c2f74 100644 --- a/config/config.go +++ b/config/config.go @@ -539,11 +539,11 @@ func loadPipelines(v *viper.Viper) (configmodels.Pipelines, error) { var pipelineCfg configmodels.Pipeline // Set the type. - switch typeStr { - case configmodels.TracesDataTypeStr: - pipelineCfg.InputType = configmodels.TracesDataType - case configmodels.MetricsDataTypeStr: - pipelineCfg.InputType = configmodels.MetricsDataType + pipelineCfg.InputType = configmodels.DataType(typeStr) + switch pipelineCfg.InputType { + case configmodels.TracesDataType: + case configmodels.MetricsDataType: + case configmodels.LogsDataType: default: return nil, &configError{ code: errInvalidPipelineType, diff --git a/config/config_test.go b/config/config_test.go index b0b2a7a2b8d..87e6dd7ce59 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -100,7 +100,7 @@ func TestDecodeConfig(t *testing.T) { assert.Equal(t, 1, len(config.Processors), "Incorrect processors count") assert.Equal(t, - &ExampleProcessor{ + &ExampleProcessorCfg{ ProcessorSettings: configmodels.ProcessorSettings{ TypeVal: "exampleprocessor", NameVal: "exampleprocessor", @@ -258,7 +258,7 @@ func TestSimpleConfig(t *testing.T) { assert.Equalf(t, 1, len(config.Processors), "TEST[%s]", test.name) assert.Equalf(t, - &ExampleProcessor{ + &ExampleProcessorCfg{ ProcessorSettings: configmodels.ProcessorSettings{ TypeVal: "exampleprocessor", NameVal: "exampleprocessor", diff --git a/config/configmodels/configmodels.go b/config/configmodels/configmodels.go index 21be2d643f3..92312e6c764 100644 --- a/config/configmodels/configmodels.go +++ b/config/configmodels/configmodels.go @@ -78,38 +78,22 @@ type Processor interface { type Processors map[string]Processor // DataType is the data type that is supported for collection. We currently support -// collecting metrics and traces, this can expand in the future (e.g. logs, events, etc). -type DataType int +// collecting metrics, traces and logs, this can expand in the future. + +type DataType string // Currently supported data types. Add new data types here when new types are supported in the future. const ( - _ DataType = iota // skip 0, start types from 1. - // TracesDataType is the data type tag for traces. - TracesDataType + TracesDataType DataType = "traces" // MetricsDataType is the data type tag for metrics. - MetricsDataType -) + MetricsDataType DataType = "metrics" -// Data type strings. -const ( - TracesDataTypeStr = "traces" - MetricsDataTypeStr = "metrics" + // LogsDataType is the data type tag for logs. + LogsDataType DataType = "logs" ) -// GetString converts data type to string. -func (dataType DataType) GetString() string { - switch dataType { - case TracesDataType: - return TracesDataTypeStr - case MetricsDataType: - return MetricsDataTypeStr - default: - panic("unknown data type") - } -} - // Pipeline defines a single pipeline. type Pipeline struct { Name string `mapstructure:"-"` diff --git a/config/example_factories.go b/config/example_factories.go index 7d37c526bf6..2075a81acc8 100644 --- a/config/example_factories.go +++ b/config/example_factories.go @@ -25,6 +25,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector/config/configmodels" "github.com/open-telemetry/opentelemetry-collector/consumer" "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" + "github.com/open-telemetry/opentelemetry-collector/internal/data" ) // ExampleReceiver is for testing purposes. We are defining an example config and factory @@ -60,7 +61,8 @@ func (f *ExampleReceiverFactory) CustomUnmarshaler() component.CustomUnmarshaler func (f *ExampleReceiverFactory) CreateDefaultConfig() configmodels.Receiver { return &ExampleReceiver{ ReceiverSettings: configmodels.ReceiverSettings{ - TypeVal: "examplereceiver", + TypeVal: f.Type(), + NameVal: string(f.Type()), Endpoint: "localhost:1000", }, ExtraSetting: "some string", @@ -80,7 +82,14 @@ func (f *ExampleReceiverFactory) CreateTraceReceiver( return nil, configerror.ErrDataTypeIsNotSupported } - // There must be one receiver for both metrics and traces. We maintain a map of + receiver := f.createReceiver(cfg) + receiver.TraceConsumer = nextConsumer + + return receiver, nil +} + +func (f *ExampleReceiverFactory) createReceiver(cfg configmodels.Receiver) *ExampleReceiverProducer { + // There must be one receiver for all data types. We maintain a map of // receivers per config. // Check to see if there is already a receiver for this config. @@ -90,9 +99,8 @@ func (f *ExampleReceiverFactory) CreateTraceReceiver( // Remember the receiver in the map exampleReceivers[cfg] = receiver } - receiver.TraceConsumer = nextConsumer - return receiver, nil + return receiver } // CreateMetricsReceiver creates a metrics receiver based on this config. @@ -105,27 +113,31 @@ func (f *ExampleReceiverFactory) CreateMetricsReceiver( return nil, configerror.ErrDataTypeIsNotSupported } - // There must be one receiver for both metrics and traces. We maintain a map of - // receivers per config. - - // Check to see if there is already a receiver for this config. - receiver, ok := exampleReceivers[cfg] - if !ok { - receiver = &ExampleReceiverProducer{} - // Remember the receiver in the map - exampleReceivers[cfg] = receiver - } + receiver := f.createReceiver(cfg) receiver.MetricsConsumer = nextConsumer return receiver, nil } +func (f *ExampleReceiverFactory) CreateLogReceiver( + ctx context.Context, + params component.ReceiverCreateParams, + cfg configmodels.Receiver, + nextConsumer consumer.LogConsumer, +) (component.LogReceiver, error) { + receiver := f.createReceiver(cfg) + receiver.LogConsumer = nextConsumer + + return receiver, nil +} + // ExampleReceiverProducer allows producing traces and metrics for testing purposes. type ExampleReceiverProducer struct { - TraceConsumer consumer.TraceConsumerOld Started bool Stopped bool + TraceConsumer consumer.TraceConsumerOld MetricsConsumer consumer.MetricsConsumerOld + LogConsumer consumer.LogConsumer } // Start tells the receiver to start its processing. @@ -199,7 +211,8 @@ func (f *MultiProtoReceiverFactory) CustomUnmarshaler() component.CustomUnmarsha // CreateDefaultConfig creates the default configuration for the Receiver. func (f *MultiProtoReceiverFactory) CreateDefaultConfig() configmodels.Receiver { return &MultiProtoReceiver{ - TypeVal: "multireceiver", + TypeVal: f.Type(), + NameVal: string(f.Type()), Protocols: map[string]MultiProtoReceiverOneCfg{ "http": { Endpoint: "example.com:8888", @@ -256,7 +269,10 @@ func (f *ExampleExporterFactory) Type() configmodels.Type { // CreateDefaultConfig creates the default configuration for the Exporter. func (f *ExampleExporterFactory) CreateDefaultConfig() configmodels.Exporter { return &ExampleExporter{ - ExporterSettings: configmodels.ExporterSettings{TypeVal: f.Type()}, + ExporterSettings: configmodels.ExporterSettings{ + TypeVal: f.Type(), + NameVal: string(f.Type()), + }, ExtraSetting: "some export string", ExtraMapSetting: nil, ExtraListSetting: nil, @@ -273,10 +289,19 @@ func (f *ExampleExporterFactory) CreateMetricsExporter(logger *zap.Logger, cfg c return &ExampleExporterConsumer{}, nil } +func (f *ExampleExporterFactory) CreateLogExporter( + ctx context.Context, + params component.ExporterCreateParams, + cfg configmodels.Exporter, +) (component.LogExporter, error) { + return &ExampleExporterConsumer{}, nil +} + // ExampleExporterConsumer stores consumed traces and metrics for testing purposes. type ExampleExporterConsumer struct { Traces []consumerdata.TraceData Metrics []consumerdata.MetricsData + Logs []data.Logs ExporterStarted bool ExporterShutdown bool } @@ -301,6 +326,11 @@ func (exp *ExampleExporterConsumer) ConsumeMetricsData(ctx context.Context, md c return nil } +func (exp *ExampleExporterConsumer) ConsumeLogs(ctx context.Context, ld data.Logs) error { + exp.Logs = append(exp.Logs, ld) + return nil +} + // Name returns the name of the exporter. func (exp *ExampleExporterConsumer) Name() string { return "exampleexporter" @@ -312,9 +342,9 @@ func (exp *ExampleExporterConsumer) Shutdown(context.Context) error { return nil } -// ExampleProcessor is for testing purposes. We are defining an example config and factory +// ExampleProcessorCfg is for testing purposes. We are defining an example config and factory // for "exampleprocessor" processor type. -type ExampleProcessor struct { +type ExampleProcessorCfg struct { configmodels.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct ExtraSetting string `mapstructure:"extra"` ExtraMapSetting map[string]string `mapstructure:"extra_map"` @@ -332,11 +362,14 @@ func (f *ExampleProcessorFactory) Type() configmodels.Type { // CreateDefaultConfig creates the default configuration for the Processor. func (f *ExampleProcessorFactory) CreateDefaultConfig() configmodels.Processor { - return &ExampleProcessor{ - ProcessorSettings: configmodels.ProcessorSettings{}, - ExtraSetting: "some export string", - ExtraMapSetting: nil, - ExtraListSetting: nil, + return &ExampleProcessorCfg{ + ProcessorSettings: configmodels.ProcessorSettings{ + TypeVal: f.Type(), + NameVal: string(f.Type()), + }, + ExtraSetting: "some export string", + ExtraMapSetting: nil, + ExtraListSetting: nil, } } @@ -358,6 +391,35 @@ func (f *ExampleProcessorFactory) CreateMetricsProcessor( return nil, configerror.ErrDataTypeIsNotSupported } +func (f *ExampleProcessorFactory) CreateLogProcessor( + ctx context.Context, + params component.ProcessorCreateParams, + cfg configmodels.Processor, + nextConsumer consumer.LogConsumer, +) (component.LogProcessor, error) { + return &ExampleProcessor{nextConsumer}, nil +} + +type ExampleProcessor struct { + nextConsumer consumer.LogConsumer +} + +func (ep *ExampleProcessor) Start(ctx context.Context, host component.Host) error { + return nil +} + +func (ep *ExampleProcessor) Shutdown(ctx context.Context) error { + return nil +} + +func (ep *ExampleProcessor) GetCapabilities() component.ProcessorCapabilities { + return component.ProcessorCapabilities{MutatesConsumedData: false} +} + +func (ep *ExampleProcessor) ConsumeLogs(ctx context.Context, ld data.Logs) error { + return ep.nextConsumer.ConsumeLogs(ctx, ld) +} + // ExampleExtensionCfg is for testing purposes. We are defining an example config and factory // for "exampleextension" extension type. type ExampleExtensionCfg struct { @@ -387,10 +449,13 @@ func (f *ExampleExtensionFactory) Type() configmodels.Type { // CreateDefaultConfig creates the default configuration for the Extension. func (f *ExampleExtensionFactory) CreateDefaultConfig() configmodels.Extension { return &ExampleExtensionCfg{ - ExtensionSettings: configmodels.ExtensionSettings{TypeVal: f.Type()}, - ExtraSetting: "extra string setting", - ExtraMapSetting: nil, - ExtraListSetting: nil, + ExtensionSettings: configmodels.ExtensionSettings{ + TypeVal: f.Type(), + NameVal: string(f.Type()), + }, + ExtraSetting: "extra string setting", + ExtraMapSetting: nil, + ExtraListSetting: nil, } } diff --git a/consumer/consumer.go b/consumer/consumer.go index db9278cbd16..75c5a2699cf 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -20,6 +20,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" "github.com/open-telemetry/opentelemetry-collector/consumer/pdata" + "github.com/open-telemetry/opentelemetry-collector/internal/data" ) // MetricsConsumerBase defines a common interface for MetricsConsumerOld and MetricsConsumer. @@ -60,3 +61,10 @@ type TraceConsumer interface { // ConsumeTraces receives pdata.Traces for processing. ConsumeTraces(ctx context.Context, td pdata.Traces) error } + +// LogConsumer is an interface that receives data.Logs, processes it +// as needed, and sends it to the next processing node if any or to the destination. +type LogConsumer interface { + // ConsumeLogs receives data.Logs for processing. + ConsumeLogs(ctx context.Context, ld data.Logs) error +} diff --git a/internal/data/log.go b/internal/data/log.go new file mode 100644 index 00000000000..e0f2827704e --- /dev/null +++ b/internal/data/log.go @@ -0,0 +1,25 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package data + +// This file defines in-memory data structures to represent logs. + +// Logs is the top-level struct that is propagated through the logs pipeline. +type Logs struct { + // TODO: add data fields once OTLP protocol defines logs format. +} + +func (l Logs) Clone() Logs { + return Logs{} +} diff --git a/processor/cloningfanoutconnector.go b/processor/cloningfanoutconnector.go index 68f06057575..afed82a53f7 100644 --- a/processor/cloningfanoutconnector.go +++ b/processor/cloningfanoutconnector.go @@ -28,6 +28,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector/consumer/converter" "github.com/open-telemetry/opentelemetry-collector/consumer/pdata" "github.com/open-telemetry/opentelemetry-collector/consumer/pdatautil" + "github.com/open-telemetry/opentelemetry-collector/internal/data" ) // This file contains implementations of cloning Trace/Metrics connectors @@ -215,6 +216,39 @@ func (tfc traceCloningFanOutConnector) ConsumeTraces(ctx context.Context, td pda return componenterror.CombineErrors(errs) } +// NewLogCloningFanOutConnector wraps multiple trace consumers in a single one. +func NewLogCloningFanOutConnector(lcs []consumer.LogConsumer) consumer.LogConsumer { + return LogCloningFanOutConnector(lcs) +} + +type LogCloningFanOutConnector []consumer.LogConsumer + +var _ consumer.LogConsumer = (*LogCloningFanOutConnector)(nil) + +// ConsumeLogs exports the span data to all consumers wrapped by the current one. +func (lfc LogCloningFanOutConnector) ConsumeLogs(ctx context.Context, ld data.Logs) error { + var errs []error + + // Fan out to first len-1 consumers. + for i := 0; i < len(lfc)-1; i++ { + // Create a clone of data. We need to clone because consumers may modify the data. + clone := ld.Clone() + if err := lfc[i].ConsumeLogs(ctx, clone); err != nil { + errs = append(errs, err) + } + } + + if len(lfc) > 0 { + // Give the original data to the last consumer. + lastTc := lfc[len(lfc)-1] + if err := lastTc.ConsumeLogs(ctx, ld); err != nil { + errs = append(errs, err) + } + } + + return componenterror.CombineErrors(errs) +} + func cloneTraceDataOld(td consumerdata.TraceData) consumerdata.TraceData { clone := consumerdata.TraceData{ SourceFormat: td.SourceFormat, diff --git a/processor/fanoutconnector.go b/processor/fanoutconnector.go index 5aeebb3cdd4..9c6d3fc9d40 100644 --- a/processor/fanoutconnector.go +++ b/processor/fanoutconnector.go @@ -22,6 +22,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" "github.com/open-telemetry/opentelemetry-collector/consumer/converter" "github.com/open-telemetry/opentelemetry-collector/consumer/pdata" + "github.com/open-telemetry/opentelemetry-collector/internal/data" ) // This file contains implementations of Trace/Metrics connectors @@ -154,3 +155,23 @@ func (tfc traceFanOutConnector) ConsumeTraces(ctx context.Context, td pdata.Trac } return componenterror.CombineErrors(errs) } + +// NewLogFanOutConnector wraps multiple new type consumers in a single one. +func NewLogFanOutConnector(lcs []consumer.LogConsumer) consumer.LogConsumer { + return LogFanOutConnector(lcs) +} + +type LogFanOutConnector []consumer.LogConsumer + +var _ consumer.LogConsumer = (*LogFanOutConnector)(nil) + +// Consume exports the span data to all consumers wrapped by the current one. +func (fc LogFanOutConnector) ConsumeLogs(ctx context.Context, ld data.Logs) error { + var errs []error + for _, tc := range fc { + if err := tc.ConsumeLogs(ctx, ld); err != nil { + errs = append(errs, err) + } + } + return componenterror.CombineErrors(errs) +} diff --git a/service/builder/exporters_builder.go b/service/builder/exporters_builder.go index 65e3a59cb9a..00dbd5587ce 100644 --- a/service/builder/exporters_builder.go +++ b/service/builder/exporters_builder.go @@ -32,6 +32,7 @@ type builtExporter struct { logger *zap.Logger te component.TraceExporterBase me component.MetricsExporterBase + le component.LogExporter } // Start the exporter. @@ -235,47 +236,71 @@ func (eb *ExportersBuilder) buildExporter( // Move this validation to config/config.go:validateConfig // No data types where requested for this exporter. This can only happen // if there are no pipelines associated with the exporter. - logger.Warn("Exportee is not associated with any pipeline and will not export data.") + logger.Warn("Exporter is not associated with any pipeline and will not export data.") return exporter, nil } - if requirement, ok := inputDataTypes[configmodels.TracesDataType]; ok { - // Traces data type is required. Create a trace exporter based on config. - te, err := createTraceExporter(factory, logger, config) - if err != nil { - if err == configerror.ErrDataTypeIsNotSupported { - // Could not create because this exporter does not support this data type. - return nil, typeMismatchErr(config, requirement.requiredBy, configmodels.TracesDataType) + for dataType, requirement := range inputDataTypes { + + switch dataType { + case configmodels.TracesDataType: + // Traces data type is required. Create a trace exporter based on config. + te, err := createTraceExporter(factory, logger, config) + if err != nil { + if err == configerror.ErrDataTypeIsNotSupported { + // Could not create because this exporter does not support this data type. + return nil, exporterTypeMismatchErr(config, requirement.requiredBy, dataType) + } + return nil, fmt.Errorf("error creating %s exporter: %v", config.Name(), err) } - return nil, fmt.Errorf("error creating %s exporter: %v", config.Name(), err) - } - // Check if the factory really created the exporter. - if te == nil { - return nil, fmt.Errorf("factory for %q produced a nil exporter", config.Name()) - } + // Check if the factory really created the exporter. + if te == nil { + return nil, fmt.Errorf("factory for %q produced a nil exporter", config.Name()) + } - exporter.te = te - } + exporter.te = te + + case configmodels.MetricsDataType: + // Metrics data type is required. Create a trace exporter based on config. + me, err := createMetricsExporter(factory, logger, config) + if err != nil { + if err == configerror.ErrDataTypeIsNotSupported { + // Could not create because this exporter does not support this data type. + return nil, exporterTypeMismatchErr(config, requirement.requiredBy, dataType) + } + return nil, fmt.Errorf("error creating %s exporter: %v", config.Name(), err) + } - if requirement, ok := inputDataTypes[configmodels.MetricsDataType]; ok { - // Metrics data type is required. Create a trace exporter based on config. - me, err := createMetricsExporter(factory, logger, config) - if err != nil { - if err == configerror.ErrDataTypeIsNotSupported { - // Could not create because this exporter does not support this data type. - return nil, typeMismatchErr(config, requirement.requiredBy, configmodels.MetricsDataType) + // The factories can be implemented by third parties, check if they really + // created the exporter. + if me == nil { + return nil, fmt.Errorf("factory for %q produced a nil exporter", config.Name()) } - return nil, fmt.Errorf("error creating %s exporter: %v", config.Name(), err) - } - // The factories can be implemented by third parties, check if they really - // created the exporter. - if me == nil { - return nil, fmt.Errorf("factory for %q produced a nil exporter", config.Name()) - } + exporter.me = me + + case configmodels.LogsDataType: + le, err := createLogExporter(factory, logger, config) + if err != nil { + if err == configerror.ErrDataTypeIsNotSupported { + // Could not create because this exporter does not support this data type. + return nil, exporterTypeMismatchErr(config, requirement.requiredBy, dataType) + } + return nil, fmt.Errorf("error creating %s exporter: %v", config.Name(), err) + } - exporter.me = me + // Check if the factory really created the exporter. + if le == nil { + return nil, fmt.Errorf("factory for %q produced a nil exporter", config.Name()) + } + + exporter.le = le + + default: + // Could not create because this exporter does not support this data type. + return nil, exporterTypeMismatchErr(config, requirement.requiredBy, dataType) + } } eb.logger.Info("Exporter is enabled.", zap.String("exporter", config.Name())) @@ -283,14 +308,14 @@ func (eb *ExportersBuilder) buildExporter( return exporter, nil } -func typeMismatchErr( +func exporterTypeMismatchErr( config configmodels.Exporter, requiredByPipeline *configmodels.Pipeline, dataType configmodels.DataType, ) error { - return fmt.Errorf("%s is a %s pipeline but has a %s which does not support %s", - requiredByPipeline.Name, dataType.GetString(), - config.Name(), dataType.GetString(), + return fmt.Errorf("pipeline %q of data type %q has an exporter %q, which does not support that data type", + requiredByPipeline.Name, dataType, + config.Name(), ) } @@ -332,3 +357,18 @@ func createMetricsExporter(factoryBase component.ExporterFactoryBase, // use ExporterFactoryOld.CreateMetricsExporter. return factoryBase.(component.ExporterFactoryOld).CreateMetricsExporter(logger, cfg) } + +// createLogExporter creates a data exporter based on provided factory type. +func createLogExporter( + factoryBase component.ExporterFactoryBase, + logger *zap.Logger, + cfg configmodels.Exporter, +) (component.LogExporter, error) { + factory, ok := factoryBase.(component.LogExporterFactory) + if !ok { + return nil, fmt.Errorf("exporter %q does not support data type %q", factoryBase.Type(), configmodels.LogsDataType) + } + creationParams := component.ExporterCreateParams{Logger: logger} + ctx := context.Background() + return factory.CreateLogExporter(ctx, creationParams, cfg) +} diff --git a/service/builder/exporters_builder_test.go b/service/builder/exporters_builder_test.go index 3ac11df4390..f761a14f160 100644 --- a/service/builder/exporters_builder_test.go +++ b/service/builder/exporters_builder_test.go @@ -103,6 +103,70 @@ func TestExportersBuilder_Build(t *testing.T) { // TODO: once we have an exporter that supports metrics data type test it too. } +func TestExportersBuilder_BuildLogs(t *testing.T) { + factories, err := config.ExampleComponents() + assert.Nil(t, err) + + cfg := &configmodels.Config{ + Exporters: map[string]configmodels.Exporter{ + "exampleexporter": &config.ExampleExporter{ + ExporterSettings: configmodels.ExporterSettings{ + NameVal: "exampleexporter", + TypeVal: "exampleexporter", + }, + }, + }, + + Service: configmodels.Service{ + Pipelines: map[string]*configmodels.Pipeline{ + "logs": { + Name: "logs", + InputType: "logs", + Exporters: []string{"exampleexporter"}, + }, + }, + }, + } + + exporters, err := NewExportersBuilder(zap.NewNop(), cfg, factories.Exporters).Build() + + assert.NoError(t, err) + require.NotNil(t, exporters) + + e1 := exporters[cfg.Exporters["exampleexporter"]] + + // Ensure exporter has its fields correctly populated. + require.NotNil(t, e1) + assert.NotNil(t, e1.le) + assert.Nil(t, e1.te) + assert.Nil(t, e1.me) + + // Ensure it can be started. + err = exporters.StartAll(context.Background(), componenttest.NewNopHost()) + assert.NoError(t, err) + + // Ensure it can be stopped. + err = e1.Shutdown(context.Background()) + assert.NoError(t, err) + + // Remove the pipeline so that the exporter is not attached to any pipeline. + // This should result in creating an exporter that has none of consumption + // functions set. + delete(cfg.Service.Pipelines, "logs") + exporters, err = NewExportersBuilder(zap.NewNop(), cfg, factories.Exporters).Build() + assert.NotNil(t, exporters) + assert.Nil(t, err) + + e1 = exporters[cfg.Exporters["exampleexporter"]] + + // Ensure exporter has its fields correctly populated, ie Trace Exporter and + // Metrics Exporter are nil. + require.NotNil(t, e1) + assert.Nil(t, e1.te) + assert.Nil(t, e1.me) + assert.Nil(t, e1.le) +} + func TestExportersBuilder_StartAll(t *testing.T) { exporters := make(Exporters) expCfg := &configmodels.ExporterSettings{} diff --git a/service/builder/pipelines_builder.go b/service/builder/pipelines_builder.go index 38216f209be..921cec6b234 100644 --- a/service/builder/pipelines_builder.go +++ b/service/builder/pipelines_builder.go @@ -35,6 +35,7 @@ type builtPipeline struct { logger *zap.Logger firstTC consumer.TraceConsumerBase firstMC consumer.MetricsConsumerBase + firstLC consumer.LogConsumer // MutatesConsumedData is set to true if any processors in the pipeline // can mutate the TraceData or MetricsData input argument. @@ -126,12 +127,15 @@ func (pb *PipelinesBuilder) buildPipeline(pipelineCfg *configmodels.Pipeline, // First create a consumer junction point that fans out the data to all exporters. var tc consumer.TraceConsumerBase var mc consumer.MetricsConsumerBase + var lc consumer.LogConsumer switch pipelineCfg.InputType { case configmodels.TracesDataType: tc = pb.buildFanoutExportersTraceConsumer(pipelineCfg.Exporters) case configmodels.MetricsDataType: mc = pb.buildFanoutExportersMetricsConsumer(pipelineCfg.Exporters) + case configmodels.LogsDataType: + lc = pb.buildFanoutExportersLogConsumer(pipelineCfg.Exporters) } mutatesConsumedData := false @@ -170,6 +174,19 @@ func (pb *PipelinesBuilder) buildPipeline(pipelineCfg *configmodels.Pipeline, } processors[i] = proc mc = proc + + case configmodels.LogsDataType: + var proc component.LogProcessor + proc, err = createLogProcessor(factory, componentLogger, procCfg, lc) + if proc != nil { + mutatesConsumedData = mutatesConsumedData || proc.GetCapabilities().MutatesConsumedData + } + processors[i] = proc + lc = proc + + default: + return nil, fmt.Errorf("error creating processor %q in pipeline %q, data type %s is not supported", + procName, pipelineCfg.Name, pipelineCfg.InputType) } if err != nil { @@ -178,19 +195,20 @@ func (pb *PipelinesBuilder) buildPipeline(pipelineCfg *configmodels.Pipeline, } // Check if the factory really created the processor. - if tc == nil && mc == nil { + if tc == nil && mc == nil && lc == nil { return nil, fmt.Errorf("factory for %q produced a nil processor", procCfg.Name()) } } pipelineLogger := pb.logger.With(zap.String("pipeline_name", pipelineCfg.Name), - zap.String("pipeline_datatype", pipelineCfg.InputType.GetString())) + zap.String("pipeline_datatype", string(pipelineCfg.InputType))) pipelineLogger.Info("Pipeline is enabled.") bp := &builtPipeline{ pipelineLogger, tc, mc, + lc, mutatesConsumedData, processors, } @@ -243,6 +261,25 @@ func (pb *PipelinesBuilder) buildFanoutExportersMetricsConsumer(exporterNames [] return processor.CreateMetricsFanOutConnector(exporters) } +func (pb *PipelinesBuilder) buildFanoutExportersLogConsumer( + exporterNames []string, +) consumer.LogConsumer { + builtExporters := pb.getBuiltExportersByNames(exporterNames) + + // Optimize for the case when there is only one exporter, no need to create junction point. + if len(builtExporters) == 1 { + return builtExporters[0].le + } + + exporters := make([]consumer.LogConsumer, len(builtExporters)) + for _, builtExp := range builtExporters { + exporters = append(exporters, builtExp.le) + } + + // Create a junction point that fans out to all exporters. + return processor.NewLogFanOutConnector(exporters) +} + // createTraceProcessor creates trace processor based on type of the current processor // and type of the downstream consumer. func createTraceProcessor( @@ -318,3 +355,20 @@ func createMetricsProcessor( metricsConverter := converter.NewOCToInternalMetricsConverter(nextConsumer.(consumer.MetricsConsumer)) return factoryOld.CreateMetricsProcessor(logger, metricsConverter, cfg) } + +// createLogProcessor creates a log processor using given factory and next consumer. +func createLogProcessor( + factoryBase component.ProcessorFactoryBase, + logger *zap.Logger, + cfg configmodels.Processor, + nextConsumer consumer.LogConsumer, +) (component.LogProcessor, error) { + factory, ok := factoryBase.(component.LogProcessorFactory) + if !ok { + return nil, fmt.Errorf("processor %q does support data type %q", + cfg.Name(), configmodels.LogsDataType) + } + creationParams := component.ProcessorCreateParams{Logger: logger} + ctx := context.Background() + return factory.CreateLogProcessor(ctx, creationParams, cfg, nextConsumer) +} diff --git a/service/builder/pipelines_builder_test.go b/service/builder/pipelines_builder_test.go index dee3be6d350..1adfc29ccd1 100644 --- a/service/builder/pipelines_builder_test.go +++ b/service/builder/pipelines_builder_test.go @@ -33,6 +33,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector/config/configmodels" "github.com/open-telemetry/opentelemetry-collector/consumer" "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" + idata "github.com/open-telemetry/opentelemetry-collector/internal/data" "github.com/open-telemetry/opentelemetry-collector/processor/attributesprocessor" "github.com/open-telemetry/opentelemetry-collector/translator/internaldata" ) @@ -62,6 +63,146 @@ func TestPipelinesBuilder_Build(t *testing.T) { } } +func createExampleFactories() config.Factories { + exampleReceiverFactory := &config.ExampleReceiverFactory{} + exampleProcessorFactory := &config.ExampleProcessorFactory{} + exampleExporterFactory := &config.ExampleExporterFactory{} + + factories := config.Factories{ + Receivers: map[configmodels.Type]component.ReceiverFactoryBase{ + exampleReceiverFactory.Type(): exampleReceiverFactory, + }, + Processors: map[configmodels.Type]component.ProcessorFactoryBase{ + exampleProcessorFactory.Type(): exampleProcessorFactory, + }, + Exporters: map[configmodels.Type]component.ExporterFactoryBase{ + exampleExporterFactory.Type(): exampleExporterFactory, + }, + } + + return factories +} + +func createExampleConfig(dataType string) *configmodels.Config { + + exampleReceiverFactory := &config.ExampleReceiverFactory{} + exampleProcessorFactory := &config.ExampleProcessorFactory{} + exampleExporterFactory := &config.ExampleExporterFactory{} + + cfg := &configmodels.Config{ + Receivers: map[string]configmodels.Receiver{ + string(exampleReceiverFactory.Type()): exampleReceiverFactory.CreateDefaultConfig(), + }, + Processors: map[string]configmodels.Processor{ + string(exampleProcessorFactory.Type()): exampleProcessorFactory.CreateDefaultConfig(), + }, + Exporters: map[string]configmodels.Exporter{ + string(exampleExporterFactory.Type()): exampleExporterFactory.CreateDefaultConfig(), + }, + Service: configmodels.Service{ + Pipelines: map[string]*configmodels.Pipeline{ + dataType: { + Name: dataType, + InputType: configmodels.DataType(dataType), + Receivers: []string{string(exampleReceiverFactory.Type())}, + Processors: []string{string(exampleProcessorFactory.Type())}, + Exporters: []string{string(exampleExporterFactory.Type())}, + }, + }, + }, + } + return cfg +} + +func TestPipelinesBuilder_BuildVarious(t *testing.T) { + + factories := createExampleFactories() + + tests := []struct { + dataType string + shouldFail bool + }{ + { + dataType: "logs", + shouldFail: false, + }, + { + dataType: "nosuchdatatype", + shouldFail: true, + }, + } + + for _, test := range tests { + t.Run(test.dataType, func(t *testing.T) { + dataType := test.dataType + + cfg := createExampleConfig(dataType) + + // BuildProcessors the pipeline + allExporters, err := NewExportersBuilder(zap.NewNop(), cfg, factories.Exporters).Build() + if test.shouldFail { + assert.Error(t, err) + return + } + + require.NoError(t, err) + require.EqualValues(t, 1, len(allExporters)) + pipelineProcessors, err := NewPipelinesBuilder(zap.NewNop(), cfg, allExporters, factories.Processors).Build() + + assert.NoError(t, err) + require.NotNil(t, pipelineProcessors) + + err = pipelineProcessors.StartProcessors(context.Background(), componenttest.NewNopHost()) + assert.NoError(t, err) + + pipelineName := dataType + processor := pipelineProcessors[cfg.Service.Pipelines[pipelineName]] + + // Ensure pipeline has its fields correctly populated. + require.NotNil(t, processor) + assert.Nil(t, processor.firstTC) + assert.Nil(t, processor.firstMC) + assert.NotNil(t, processor.firstLC) + + // Compose the list of created exporters. + exporterNames := []string{"exampleexporter"} + var exporters []*builtExporter + for _, name := range exporterNames { + // Ensure exporter is created. + exp := allExporters[cfg.Exporters[name]] + require.NotNil(t, exp) + exporters = append(exporters, exp) + } + + // Send Logs via processor and verify that all exporters of the pipeline receive it. + + // First check that there are no logs in the exporters yet. + var exporterConsumers []*config.ExampleExporterConsumer + for _, exporter := range exporters { + consumer := exporter.le.(*config.ExampleExporterConsumer) + exporterConsumers = append(exporterConsumers, consumer) + require.Equal(t, len(consumer.Logs), 0) + } + + // Send one custom data. + log := idata.Logs{} + processor.firstLC.(consumer.LogConsumer).ConsumeLogs(context.Background(), log) + + // Now verify received data. + for _, consumer := range exporterConsumers { + // Check that the trace is received by exporter. + require.Equal(t, 1, len(consumer.Logs)) + + // Verify that span is successfully delivered. + assert.EqualValues(t, log, consumer.Logs[0]) + } + + err = pipelineProcessors.ShutdownProcessors(context.Background()) + assert.NoError(t, err) + }) + } +} + func assertEqualTraceData(t *testing.T, expected consumerdata.TraceData, actual consumerdata.TraceData) { assert.True(t, proto.Equal(expected.Resource, actual.Resource)) assert.True(t, proto.Equal(expected.Node, actual.Node)) diff --git a/service/builder/receivers_builder.go b/service/builder/receivers_builder.go index 91361b827ca..ebb92c00475 100644 --- a/service/builder/receivers_builder.go +++ b/service/builder/receivers_builder.go @@ -152,6 +152,10 @@ func (rb *ReceiversBuilder) findPipelinesToAttach(config configmodels.Receiver) // Is this receiver attached to the pipeline? if hasReceiver(pipelineCfg, config.Name()) { + if _, exists := pipelinesToAttach[pipelineCfg.InputType]; !exists { + pipelinesToAttach[pipelineCfg.InputType] = make([]*builtPipeline, 0) + } + // Yes, add it to the list of pipelines of corresponding data type. pipelinesToAttach[pipelineCfg.InputType] = append(pipelinesToAttach[pipelineCfg.InputType], pipelineProcessor) @@ -186,6 +190,13 @@ func (rb *ReceiversBuilder) attachReceiverToPipelines( case configmodels.MetricsDataType: junction := buildFanoutMetricConsumer(builtPipelines) createdReceiver, err = createMetricsReceiver(context.Background(), factory, logger, config, junction) + + case configmodels.LogsDataType: + junction := buildFanoutLogConsumer(builtPipelines) + createdReceiver, err = createLogReceiver(context.Background(), factory, logger, config, junction) + + default: + err = configerror.ErrDataTypeIsNotSupported } if err != nil { @@ -194,8 +205,8 @@ func (rb *ReceiversBuilder) attachReceiverToPipelines( "receiver %s does not support %s but it was used in a "+ "%s pipeline", config.Name(), - dataType.GetString(), - dataType.GetString()) + dataType, + dataType) } return fmt.Errorf("cannot create receiver %s: %s", config.Name(), err.Error()) } @@ -220,7 +231,7 @@ func (rb *ReceiversBuilder) attachReceiverToPipelines( } rcv.receiver = createdReceiver - logger.Info("Receiver is enabled.", zap.String("datatype", dataType.GetString())) + logger.Info("Receiver is enabled.", zap.String("datatype", string(dataType))) return nil } @@ -314,6 +325,31 @@ func buildFanoutMetricConsumer(pipelines []*builtPipeline) consumer.MetricsConsu return processor.CreateMetricsFanOutConnector(pipelineConsumers) } +func buildFanoutLogConsumer(pipelines []*builtPipeline) consumer.LogConsumer { + // Optimize for the case when there is only one processor, no need to create junction point. + if len(pipelines) == 1 { + return pipelines[0].firstLC + } + + var pipelineConsumers []consumer.LogConsumer + anyPipelineMutatesData := false + for _, pipeline := range pipelines { + pipelineConsumers = append(pipelineConsumers, pipeline.firstLC) + anyPipelineMutatesData = anyPipelineMutatesData || pipeline.MutatesConsumedData + } + + // Create a junction point that fans out to all pipelines. + if anyPipelineMutatesData { + // If any pipeline mutates data use a cloning fan out connector + // so that it is safe to modify fanned out data. + // TODO: if there are more than 2 pipelines only clone data for pipelines that + // declare the intent to mutate the data. Pipelines that do not mutate the data + // can consume shared data. + return processor.NewLogCloningFanOutConnector(pipelineConsumers) + } + return processor.NewLogFanOutConnector(pipelineConsumers) +} + // createTraceReceiver is a helper function that creates trace receiver based on the current receiver type // and type of the next consumer. func createTraceReceiver( @@ -389,3 +425,20 @@ func createMetricsReceiver( metricsConverter := converter.NewOCToInternalMetricsConverter(nextConsumer.(consumer.MetricsConsumer)) return factoryOld.CreateMetricsReceiver(logger, cfg, metricsConverter) } + +// createLogReceiver creates a log receiver using given factory and next consumer. +func createLogReceiver( + ctx context.Context, + factoryBase component.ReceiverFactoryBase, + logger *zap.Logger, + cfg configmodels.Receiver, + nextConsumer consumer.LogConsumer, +) (component.LogReceiver, error) { + factory, ok := factoryBase.(component.LogReceiverFactory) + if !ok { + return nil, fmt.Errorf("receiver %q does support data type %q", + cfg.Name(), configmodels.LogsDataType) + } + creationParams := component.ReceiverCreateParams{Logger: logger} + return factory.CreateLogReceiver(ctx, creationParams, cfg, nextConsumer) +} diff --git a/service/builder/receivers_builder_test.go b/service/builder/receivers_builder_test.go index 8c3eea3306d..60b194da0c7 100644 --- a/service/builder/receivers_builder_test.go +++ b/service/builder/receivers_builder_test.go @@ -29,6 +29,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector/config/configmodels" "github.com/open-telemetry/opentelemetry-collector/consumer" "github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata" + idata "github.com/open-telemetry/opentelemetry-collector/internal/data" "github.com/open-telemetry/opentelemetry-collector/processor/attributesprocessor" "github.com/open-telemetry/opentelemetry-collector/processor/processortest" "github.com/open-telemetry/opentelemetry-collector/receiver/zipkinreceiver" @@ -181,6 +182,88 @@ func testReceivers( } } +func TestReceiversBuilder_BuildCustom(t *testing.T) { + factories := createExampleFactories() + + tests := []struct { + dataType string + shouldFail bool + }{ + { + dataType: "logs", + shouldFail: false, + }, + { + dataType: "nosuchdatatype", + shouldFail: true, + }, + } + + for _, test := range tests { + t.Run(test.dataType, func(t *testing.T) { + dataType := test.dataType + + cfg := createExampleConfig(dataType) + + // Build the pipeline + allExporters, err := NewExportersBuilder(zap.NewNop(), cfg, factories.Exporters).Build() + if test.shouldFail { + assert.Error(t, err) + return + } + + assert.NoError(t, err) + pipelineProcessors, err := NewPipelinesBuilder(zap.NewNop(), cfg, allExporters, factories.Processors).Build() + assert.NoError(t, err) + receivers, err := NewReceiversBuilder(zap.NewNop(), cfg, pipelineProcessors, factories.Receivers).Build() + + assert.NoError(t, err) + require.NotNil(t, receivers) + + receiver := receivers[cfg.Receivers["examplereceiver"]] + + // Ensure receiver has its fields correctly populated. + require.NotNil(t, receiver) + + assert.NotNil(t, receiver.receiver) + + // Compose the list of created exporters. + exporterNames := []string{"exampleexporter"} + var exporters []*builtExporter + for _, name := range exporterNames { + // Ensure exporter is created. + exp := allExporters[cfg.Exporters[name]] + require.NotNil(t, exp) + exporters = append(exporters, exp) + } + + // Send Data via receiver and verify that all exporters of the pipeline receive it. + + // First check that there are no traces in the exporters yet. + for _, exporter := range exporters { + consumer := exporter.le.(*config.ExampleExporterConsumer) + require.Equal(t, len(consumer.Logs), 0) + } + + // Send one data. + log := idata.Logs{} + producer := receiver.receiver.(*config.ExampleReceiverProducer) + producer.LogConsumer.ConsumeLogs(context.Background(), log) + + // Now verify received data. + for _, name := range exporterNames { + // Check that the data is received by exporter. + exporter := allExporters[cfg.Exporters[name]] + + // Validate exported data. + consumer := exporter.le.(*config.ExampleExporterConsumer) + require.Equal(t, 1, len(consumer.Logs)) + assert.EqualValues(t, log, consumer.Logs[0]) + } + }) + } +} + func TestReceiversBuilder_DataTypeError(t *testing.T) { factories, err := config.ExampleComponents() assert.NoError(t, err) diff --git a/service/builder/testdata/pipelines_builder.yaml b/service/builder/testdata/pipelines_builder.yaml index b303bb9bddd..d7e0eeed213 100644 --- a/service/builder/testdata/pipelines_builder.yaml +++ b/service/builder/testdata/pipelines_builder.yaml @@ -38,3 +38,7 @@ service: metrics/3: receivers: [examplereceiver/3] exporters: [exampleexporter/2] + + logs: + receivers: [examplereceiver/3] + exporters: [exampleexporter/2] diff --git a/service/service.go b/service/service.go index 181302ba2e7..c93e8ea8c88 100644 --- a/service/service.go +++ b/service/service.go @@ -520,7 +520,7 @@ func (app *Application) getPipelinesSummaryTableData() internal.SummaryPipelines for c, p := range app.builtPipelines { row := internal.SummaryPipelinesTableRowData{ FullName: c.Name, - InputType: c.InputType.GetString(), + InputType: string(c.InputType), MutatesConsumedData: p.MutatesConsumedData, Receivers: c.Receivers, Processors: c.Processors,