Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions component/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector/consumer"
)

// Exporter defines functions that trace and metric exporters must implement.
// Exporter defines functions that all exporters must implement.
type Exporter interface {
Component
}
Expand All @@ -33,13 +33,13 @@ type TraceExporterBase interface {
Exporter
}

// TraceExporterOld is a TraceConsumer that is also an Exporter.
// TraceExporterOld is a TraceExporter that can consume old-style traces.
type TraceExporterOld interface {
consumer.TraceConsumerOld
TraceExporterBase
}

// TraceExporter is an TraceConsumer that is also an Exporter.
// TraceExporter is a TraceExporter that can consume new-style traces.
type TraceExporter interface {
consumer.TraceConsumer
TraceExporterBase
Expand All @@ -50,18 +50,24 @@ type MetricsExporterBase interface {
Exporter
}

// MetricsExporterOld is a MetricsConsumer that is also an Exporter.
// MetricsExporterOld is a TraceExporter that can consume old-style metrics.
type MetricsExporterOld interface {
consumer.MetricsConsumerOld
MetricsExporterBase
}

// MetricsExporter is a MetricsConsumer that is also an Exporter.
// MetricsExporter is a TraceExporter that can consume new-style metrics.
type MetricsExporter interface {
consumer.MetricsConsumer
MetricsExporterBase
}

// LogExporter is a LogConsumer that is also an Exporter.
type LogExporter interface {
Exporter
consumer.LogConsumer
}

// ExporterFactoryBase defines the common functions for all exporter factories.
type ExporterFactoryBase interface {
Factory
Expand All @@ -87,7 +93,7 @@ type ExporterFactoryOld interface {
CreateMetricsExporter(logger *zap.Logger, cfg configmodels.Exporter) (MetricsExporterOld, error)
}

// ExporterCreateParams is passed to ExporterFactory.Create* functions.
// ExporterCreateParams is passed to Create*Exporter functions.
type ExporterCreateParams struct {
// Logger that the factory can use during creation and can pass to the created
// component to be used later as well.
Expand All @@ -111,3 +117,17 @@ type ExporterFactory interface {
CreateMetricsExporter(ctx context.Context, params ExporterCreateParams,
cfg configmodels.Exporter) (MetricsExporter, error)
}

// LogExporterFactory can create a LogExporter.
type LogExporterFactory interface {
ExporterFactoryBase

// CreateLogExporter creates an exporter based on the config.
// If the exporter type does not support logs or if the config is not valid
// error will be returned instead.
CreateLogExporter(
ctx context.Context,
params ExporterCreateParams,
cfg configmodels.Exporter,
) (LogExporter, error)
}
29 changes: 25 additions & 4 deletions component/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ type TraceProcessorBase interface {
Processor
}

// TraceProcessorOld composes TraceConsumer with some additional processor-specific functions.
// TraceProcessorOld is a processor that can consume old-style traces.
type TraceProcessorOld interface {
consumer.TraceConsumerOld
TraceProcessorBase
}

// TraceProcessor composes TraceConsumer with some additional processor-specific functions.
// TraceProcessor is a processor that can consume traces.
type TraceProcessor interface {
consumer.TraceConsumer
TraceProcessorBase
Expand All @@ -54,18 +54,24 @@ type MetricsProcessorBase interface {
Processor
}

// MetricsProcessor composes MetricsConsumer with some additional processor-specific functions.
// MetricsProcessor is a processor that can consume old-style metrics.
type MetricsProcessorOld interface {
consumer.MetricsConsumerOld
MetricsProcessorBase
}

// MetricsProcessor composes MetricsConsumer with some additional processor-specific functions.
// MetricsProcessor is a processor that can consume metrics.
type MetricsProcessor interface {
consumer.MetricsConsumer
MetricsProcessorBase
}

// LogProcessor is a processor that can consume logs.
type LogProcessor interface {
Processor
consumer.LogConsumer
}

// ProcessorCapabilities describes the capabilities of a Processor.
type ProcessorCapabilities struct {
// MutatesConsumedData is set to true if Consume* function of the
Expand Down Expand Up @@ -131,3 +137,18 @@ type ProcessorFactory interface {
CreateMetricsProcessor(ctx context.Context, params ProcessorCreateParams,
nextConsumer consumer.MetricsConsumer, cfg configmodels.Processor) (MetricsProcessor, error)
}

// LogProcessorFactory can create LogProcessor.
type LogProcessorFactory interface {
ProcessorFactoryBase

// CreateLogProcessor creates a processor based on the config.
// If the processor type does not support logs or if the config is not valid
// error will be returned instead.
CreateLogProcessor(
ctx context.Context,
params ProcessorCreateParams,
cfg configmodels.Processor,
nextConsumer consumer.LogConsumer,
) (LogProcessor, error)
}
22 changes: 22 additions & 0 deletions component/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ type MetricsReceiver interface {
Receiver
}

// A LogReceiver is a "log data"-to-"internal format" converter.
// Its purpose is to translate data from the wild into internal data format.
// LogReceiver feeds a consumer.LogConsumer with data.
type LogReceiver interface {
Receiver
}

// ReceiverFactoryBase defines the common functions for all receiver factories.
type ReceiverFactoryBase interface {
Factory
Expand Down Expand Up @@ -117,3 +124,18 @@ type ReceiverFactory interface {
CreateMetricsReceiver(ctx context.Context, params ReceiverCreateParams,
cfg configmodels.Receiver, nextConsumer consumer.MetricsConsumer) (MetricsReceiver, error)
}

// LogReceiverFactory can create a LogReceiver.
type LogReceiverFactory interface {
ReceiverFactoryBase

// CreateLogReceiver creates a log receiver based on this config.
// If the receiver type does not support the data type or if the config is not valid
// error will be returned instead.
CreateLogReceiver(
ctx context.Context,
params ReceiverCreateParams,
cfg configmodels.Receiver,
nextConsumer consumer.LogConsumer,
) (LogReceiver, error)
}
10 changes: 5 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,11 +539,11 @@ func loadPipelines(v *viper.Viper) (configmodels.Pipelines, error) {
var pipelineCfg configmodels.Pipeline

// Set the type.
switch typeStr {
case configmodels.TracesDataTypeStr:
pipelineCfg.InputType = configmodels.TracesDataType
case configmodels.MetricsDataTypeStr:
pipelineCfg.InputType = configmodels.MetricsDataType
pipelineCfg.InputType = configmodels.DataType(typeStr)
switch pipelineCfg.InputType {
case configmodels.TracesDataType:
case configmodels.MetricsDataType:
case configmodels.LogsDataType:
default:
return nil, &configError{
code: errInvalidPipelineType,
Expand Down
4 changes: 2 additions & 2 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestDecodeConfig(t *testing.T) {
assert.Equal(t, 1, len(config.Processors), "Incorrect processors count")

assert.Equal(t,
&ExampleProcessor{
&ExampleProcessorCfg{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "exampleprocessor",
NameVal: "exampleprocessor",
Expand Down Expand Up @@ -258,7 +258,7 @@ func TestSimpleConfig(t *testing.T) {
assert.Equalf(t, 1, len(config.Processors), "TEST[%s]", test.name)

assert.Equalf(t,
&ExampleProcessor{
&ExampleProcessorCfg{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "exampleprocessor",
NameVal: "exampleprocessor",
Expand Down
30 changes: 7 additions & 23 deletions config/configmodels/configmodels.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,38 +78,22 @@ type Processor interface {
type Processors map[string]Processor

// DataType is the data type that is supported for collection. We currently support
// collecting metrics and traces, this can expand in the future (e.g. logs, events, etc).
type DataType int
// collecting metrics, traces and logs, this can expand in the future.

type DataType string

// Currently supported data types. Add new data types here when new types are supported in the future.
const (
_ DataType = iota // skip 0, start types from 1.

// TracesDataType is the data type tag for traces.
TracesDataType
TracesDataType DataType = "traces"

// MetricsDataType is the data type tag for metrics.
MetricsDataType
)
MetricsDataType DataType = "metrics"

// Data type strings.
const (
TracesDataTypeStr = "traces"
MetricsDataTypeStr = "metrics"
// LogsDataType is the data type tag for logs.
LogsDataType DataType = "logs"
)

// GetString converts data type to string.
func (dataType DataType) GetString() string {
switch dataType {
case TracesDataType:
return TracesDataTypeStr
case MetricsDataType:
return MetricsDataTypeStr
default:
panic("unknown data type")
}
}

// Pipeline defines a single pipeline.
type Pipeline struct {
Name string `mapstructure:"-"`
Expand Down
Loading