Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions component/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ type MetricsExporter interface {
MetricsExporterBase
}

// DataExporter is an DataConsumer that is also an Exporter.
type DataExporter interface {
Exporter
consumer.DataConsumer
}

// ExporterFactoryBase defines the common functions for all exporter factories.
type ExporterFactoryBase interface {
Factory
Expand Down Expand Up @@ -111,3 +117,18 @@ type ExporterFactory interface {
CreateMetricsExporter(ctx context.Context, params ExporterCreateParams,
cfg configmodels.Exporter) (MetricsExporter, error)
}

// DataExporterFactory can create Exporter.
type DataExporterFactory interface {
ExporterFactoryBase

// CreateExporter creates an exporter based on this config.
// If the exporter type does not support the data type or if the config is not valid
// error will be returned instead.
CreateExporter(
ctx context.Context,
params ExporterCreateParams,
dataType configmodels.DataType,
cfg configmodels.Exporter,
) (DataExporter, error)
}
23 changes: 23 additions & 0 deletions component/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ type MetricsProcessor interface {
MetricsProcessorBase
}

// DataProcessor composes DataConsumer with some additional processor-specific functions.
type DataProcessor interface {
Processor
consumer.DataConsumer
}

// ProcessorCapabilities describes the capabilities of a Processor.
type ProcessorCapabilities struct {
// MutatesConsumedData is set to true if Consume* function of the
Expand Down Expand Up @@ -131,3 +137,20 @@ type ProcessorFactory interface {
CreateMetricsProcessor(ctx context.Context, params ProcessorCreateParams,
nextConsumer consumer.MetricsConsumer, cfg configmodels.Processor) (MetricsProcessor, error)
}

// DataProcessorFactory is factory interface for processors. This is the
// new factory type that can create new style processors.
type DataProcessorFactory interface {
ProcessorFactoryBase

// CreateProcessor creates a processor based on this config.
// If the processor type does not support the dataType or if the config is not valid
// error will be returned instead.
CreateProcessor(
ctx context.Context,
params ProcessorCreateParams,
dataType configmodels.DataType,
cfg configmodels.Processor,
nextConsumer consumer.DataConsumer,
) (DataProcessor, error)
}
26 changes: 26 additions & 0 deletions component/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ type MetricsReceiver interface {
Receiver
}

// A DataReceiver is an "arbitrary data"-to-"internal format" converter.
// Its purpose is to translate data from the wild into internal data format.
// DataReceiver feeds a consumer.DataConsumer with data.
//
// For example it could be Zipkin data source which translates
// Zipkin spans into consumerdata.DataData.
type DataReceiver interface {
Receiver
}

// ReceiverFactoryBase defines the common functions for all receiver factories.
type ReceiverFactoryBase interface {
Factory
Expand Down Expand Up @@ -117,3 +127,19 @@ type ReceiverFactory interface {
CreateMetricsReceiver(ctx context.Context, params ReceiverCreateParams,
cfg configmodels.Receiver, nextConsumer consumer.MetricsConsumer) (MetricsReceiver, error)
}

// DataReceiverFactory can create Receiver and MetricsReceiver.
type DataReceiverFactory interface {
ReceiverFactoryBase

// CreateReceiver creates a receiver based on this config.
// If the receiver type does not support the dataType or if the config is not valid
// error will be returned instead.
CreateReceiver(
ctx context.Context,
params ReceiverCreateParams,
dataType configmodels.DataType,
cfg configmodels.Receiver,
nextConsumer consumer.DataConsumer,
) (DataReceiver, error)
}
12 changes: 1 addition & 11 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,17 +539,7 @@ func loadPipelines(v *viper.Viper) (configmodels.Pipelines, error) {
var pipelineCfg configmodels.Pipeline

// Set the type.
switch typeStr {
case configmodels.TracesDataTypeStr:
pipelineCfg.InputType = configmodels.TracesDataType
case configmodels.MetricsDataTypeStr:
pipelineCfg.InputType = configmodels.MetricsDataType
default:
return nil, &configError{
code: errInvalidPipelineType,
msg: fmt.Sprintf("invalid pipeline type %q (must be metrics or traces)", typeStr),
}
}
pipelineCfg.InputType = configmodels.DataType(typeStr)

pipelineConfig := ViperSub(pipelinesConfig, key)

Expand Down
5 changes: 2 additions & 3 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestDecodeConfig(t *testing.T) {
assert.Equal(t, 1, len(config.Processors), "Incorrect processors count")

assert.Equal(t,
&ExampleProcessor{
&ExampleProcessorCfg{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "exampleprocessor",
NameVal: "exampleprocessor",
Expand Down Expand Up @@ -258,7 +258,7 @@ func TestSimpleConfig(t *testing.T) {
assert.Equalf(t, 1, len(config.Processors), "TEST[%s]", test.name)

assert.Equalf(t,
&ExampleProcessor{
&ExampleProcessorCfg{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "exampleprocessor",
NameVal: "exampleprocessor",
Expand Down Expand Up @@ -367,7 +367,6 @@ func TestDecodeConfig_Invalid(t *testing.T) {
{name: "unknown-processor-type", expected: errUnknownProcessorType},
{name: "invalid-service-extensions-value", expected: errUnmarshalErrorOnService},
{name: "invalid-sequence-value", expected: errUnmarshalErrorOnPipeline},
{name: "invalid-pipeline-type", expected: errInvalidPipelineType},
{name: "invalid-pipeline-type-and-name", expected: errInvalidTypeAndNameKey},
{name: "duplicate-extension", expected: errDuplicateExtensionName},
{name: "duplicate-receiver", expected: errDuplicateReceiverName},
Expand Down
27 changes: 4 additions & 23 deletions config/configmodels/configmodels.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,37 +79,18 @@ type Processors map[string]Processor

// DataType is the data type that is supported for collection. We currently support
// collecting metrics and traces, this can expand in the future (e.g. logs, events, etc).
type DataType int

type DataType string

// Currently supported data types. Add new data types here when new types are supported in the future.
const (
_ DataType = iota // skip 0, start types from 1.

// TracesDataType is the data type tag for traces.
TracesDataType
TracesDataType DataType = "traces"

// MetricsDataType is the data type tag for metrics.
MetricsDataType
MetricsDataType DataType = "metrics"
)

// Data type strings.
const (
TracesDataTypeStr = "traces"
MetricsDataTypeStr = "metrics"
)

// GetString converts data type to string.
func (dataType DataType) GetString() string {
switch dataType {
case TracesDataType:
return TracesDataTypeStr
case MetricsDataType:
return MetricsDataTypeStr
default:
panic("unknown data type")
}
}

// Pipeline defines a single pipeline.
type Pipeline struct {
Name string `mapstructure:"-"`
Expand Down
Loading