Skip to content

Commit efe4df6

Browse files
author
Tigran Najaryan
committed
Add factories and component types to support logs data type
We are planning on experimenting with logs data type in the Collector. This commit introduces factories and components types for logs and implements pipeline building for logs. The data.Log struct is for now empty. We need to wait for OTLP protocol to define the log data format and we will use it in data.Log. data.Log is in internal package since logs support is experimental and is not intended for public usage. Resolves #957
1 parent f588c89 commit efe4df6

19 files changed

+739
-98
lines changed

component/exporter.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,12 @@ type MetricsExporter interface {
6262
MetricsExporterBase
6363
}
6464

65+
// LogExporter is an LogConsumer that is also an Exporter.
66+
type LogExporter interface {
67+
Exporter
68+
consumer.LogConsumer
69+
}
70+
6571
// ExporterFactoryBase defines the common functions for all exporter factories.
6672
type ExporterFactoryBase interface {
6773
Factory
@@ -111,3 +117,17 @@ type ExporterFactory interface {
111117
CreateMetricsExporter(ctx context.Context, params ExporterCreateParams,
112118
cfg configmodels.Exporter) (MetricsExporter, error)
113119
}
120+
121+
// LogExporterFactory can create Exporter.
122+
type LogExporterFactory interface {
123+
ExporterFactoryBase
124+
125+
// CreateLogExporter creates an exporter based on this config.
126+
// If the exporter type does not support the data type or if the config is not valid
127+
// error will be returned instead.
128+
CreateLogExporter(
129+
ctx context.Context,
130+
params ExporterCreateParams,
131+
cfg configmodels.Exporter,
132+
) (LogExporter, error)
133+
}

component/processor.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,12 @@ type MetricsProcessor interface {
6666
MetricsProcessorBase
6767
}
6868

69+
// LogProcessor composes LogConsumer with some additional processor-specific functions.
70+
type LogProcessor interface {
71+
Processor
72+
consumer.LogConsumer
73+
}
74+
6975
// ProcessorCapabilities describes the capabilities of a Processor.
7076
type ProcessorCapabilities struct {
7177
// MutatesConsumedData is set to true if Consume* function of the
@@ -131,3 +137,18 @@ type ProcessorFactory interface {
131137
CreateMetricsProcessor(ctx context.Context, params ProcessorCreateParams,
132138
nextConsumer consumer.MetricsConsumer, cfg configmodels.Processor) (MetricsProcessor, error)
133139
}
140+
141+
// LogProcessorFactory is factory interface for processors.
142+
type LogProcessorFactory interface {
143+
ProcessorFactoryBase
144+
145+
// CreateLogProcessor creates a processor based on this config.
146+
// If the processor type does not support the data type or if the config is not valid
147+
// error will be returned instead.
148+
CreateLogProcessor(
149+
ctx context.Context,
150+
params ProcessorCreateParams,
151+
cfg configmodels.Processor,
152+
nextConsumer consumer.LogConsumer,
153+
) (LogProcessor, error)
154+
}

component/receiver.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,13 @@ type MetricsReceiver interface {
4949
Receiver
5050
}
5151

52+
// A LogReceiver is a "log data"-to-"internal format" converter.
53+
// Its purpose is to translate data from the wild into internal data format.
54+
// LogReceiver feeds a consumer.LogConsumer with data.
55+
type LogReceiver interface {
56+
Receiver
57+
}
58+
5259
// ReceiverFactoryBase defines the common functions for all receiver factories.
5360
type ReceiverFactoryBase interface {
5461
Factory
@@ -117,3 +124,18 @@ type ReceiverFactory interface {
117124
CreateMetricsReceiver(ctx context.Context, params ReceiverCreateParams,
118125
cfg configmodels.Receiver, nextConsumer consumer.MetricsConsumer) (MetricsReceiver, error)
119126
}
127+
128+
// LogReceiverFactory can create a LogReceiver.
129+
type LogReceiverFactory interface {
130+
ReceiverFactoryBase
131+
132+
// CreateLogReceiver creates a log receiver based on this config.
133+
// If the receiver type does not support the data type or if the config is not valid
134+
// error will be returned instead.
135+
CreateLogReceiver(
136+
ctx context.Context,
137+
params ReceiverCreateParams,
138+
cfg configmodels.Receiver,
139+
nextConsumer consumer.LogConsumer,
140+
) (LogReceiver, error)
141+
}

config/config.go

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -539,17 +539,7 @@ func loadPipelines(v *viper.Viper) (configmodels.Pipelines, error) {
539539
var pipelineCfg configmodels.Pipeline
540540

541541
// Set the type.
542-
switch typeStr {
543-
case configmodels.TracesDataTypeStr:
544-
pipelineCfg.InputType = configmodels.TracesDataType
545-
case configmodels.MetricsDataTypeStr:
546-
pipelineCfg.InputType = configmodels.MetricsDataType
547-
default:
548-
return nil, &configError{
549-
code: errInvalidPipelineType,
550-
msg: fmt.Sprintf("invalid pipeline type %q (must be metrics or traces)", typeStr),
551-
}
552-
}
542+
pipelineCfg.InputType = configmodels.DataType(typeStr)
553543

554544
pipelineConfig := ViperSub(pipelinesConfig, key)
555545

config/config_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func TestDecodeConfig(t *testing.T) {
100100
assert.Equal(t, 1, len(config.Processors), "Incorrect processors count")
101101

102102
assert.Equal(t,
103-
&ExampleProcessor{
103+
&ExampleProcessorCfg{
104104
ProcessorSettings: configmodels.ProcessorSettings{
105105
TypeVal: "exampleprocessor",
106106
NameVal: "exampleprocessor",
@@ -258,7 +258,7 @@ func TestSimpleConfig(t *testing.T) {
258258
assert.Equalf(t, 1, len(config.Processors), "TEST[%s]", test.name)
259259

260260
assert.Equalf(t,
261-
&ExampleProcessor{
261+
&ExampleProcessorCfg{
262262
ProcessorSettings: configmodels.ProcessorSettings{
263263
TypeVal: "exampleprocessor",
264264
NameVal: "exampleprocessor",
@@ -367,7 +367,6 @@ func TestDecodeConfig_Invalid(t *testing.T) {
367367
{name: "unknown-processor-type", expected: errUnknownProcessorType},
368368
{name: "invalid-service-extensions-value", expected: errUnmarshalErrorOnService},
369369
{name: "invalid-sequence-value", expected: errUnmarshalErrorOnPipeline},
370-
{name: "invalid-pipeline-type", expected: errInvalidPipelineType},
371370
{name: "invalid-pipeline-type-and-name", expected: errInvalidTypeAndNameKey},
372371
{name: "duplicate-extension", expected: errDuplicateExtensionName},
373372
{name: "duplicate-receiver", expected: errDuplicateReceiverName},

config/configmodels/configmodels.go

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -79,37 +79,21 @@ type Processors map[string]Processor
7979

8080
// DataType is the data type that is supported for collection. We currently support
8181
// collecting metrics and traces, this can expand in the future (e.g. logs, events, etc).
82-
type DataType int
82+
83+
type DataType string
8384

8485
// Currently supported data types. Add new data types here when new types are supported in the future.
8586
const (
86-
_ DataType = iota // skip 0, start types from 1.
87-
8887
// TracesDataType is the data type tag for traces.
89-
TracesDataType
88+
TracesDataType DataType = "traces"
9089

9190
// MetricsDataType is the data type tag for metrics.
92-
MetricsDataType
93-
)
91+
MetricsDataType DataType = "metrics"
9492

95-
// Data type strings.
96-
const (
97-
TracesDataTypeStr = "traces"
98-
MetricsDataTypeStr = "metrics"
93+
// LogsDataType is the data type tag for logs.
94+
LogsDataType DataType = "logs"
9995
)
10096

101-
// GetString converts data type to string.
102-
func (dataType DataType) GetString() string {
103-
switch dataType {
104-
case TracesDataType:
105-
return TracesDataTypeStr
106-
case MetricsDataType:
107-
return MetricsDataTypeStr
108-
default:
109-
panic("unknown data type")
110-
}
111-
}
112-
11397
// Pipeline defines a single pipeline.
11498
type Pipeline struct {
11599
Name string `mapstructure:"-"`

config/example_factories.go

Lines changed: 95 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
2626
"github.com/open-telemetry/opentelemetry-collector/consumer"
2727
"github.com/open-telemetry/opentelemetry-collector/consumer/consumerdata"
28+
"github.com/open-telemetry/opentelemetry-collector/internal/data"
2829
)
2930

3031
// ExampleReceiver is for testing purposes. We are defining an example config and factory
@@ -60,7 +61,8 @@ func (f *ExampleReceiverFactory) CustomUnmarshaler() component.CustomUnmarshaler
6061
func (f *ExampleReceiverFactory) CreateDefaultConfig() configmodels.Receiver {
6162
return &ExampleReceiver{
6263
ReceiverSettings: configmodels.ReceiverSettings{
63-
TypeVal: "examplereceiver",
64+
TypeVal: f.Type(),
65+
NameVal: string(f.Type()),
6466
Endpoint: "localhost:1000",
6567
},
6668
ExtraSetting: "some string",
@@ -80,7 +82,7 @@ func (f *ExampleReceiverFactory) CreateTraceReceiver(
8082
return nil, configerror.ErrDataTypeIsNotSupported
8183
}
8284

83-
// There must be one receiver for both metrics and traces. We maintain a map of
85+
// There must be one receiver for all data types. We maintain a map of
8486
// receivers per config.
8587

8688
// Check to see if there is already a receiver for this config.
@@ -105,7 +107,7 @@ func (f *ExampleReceiverFactory) CreateMetricsReceiver(
105107
return nil, configerror.ErrDataTypeIsNotSupported
106108
}
107109

108-
// There must be one receiver for both metrics and traces. We maintain a map of
110+
// There must be one receiver for all data types. We maintain a map of
109111
// receivers per config.
110112

111113
// Check to see if there is already a receiver for this config.
@@ -120,12 +122,35 @@ func (f *ExampleReceiverFactory) CreateMetricsReceiver(
120122
return receiver, nil
121123
}
122124

125+
func (f *ExampleReceiverFactory) CreateLogReceiver(
126+
ctx context.Context,
127+
params component.ReceiverCreateParams,
128+
cfg configmodels.Receiver,
129+
nextConsumer consumer.LogConsumer,
130+
) (component.LogReceiver, error) {
131+
// There must be one receiver for all data types. We maintain a map of
132+
// receivers per config.
133+
134+
// Check to see if there is already a receiver for this config.
135+
receiver, ok := exampleReceivers[cfg]
136+
if !ok {
137+
receiver = &ExampleReceiverProducer{}
138+
// Remember the receiver in the map
139+
exampleReceivers[cfg] = receiver
140+
}
141+
receiver.LogConsumer = nextConsumer
142+
143+
return receiver, nil
144+
145+
}
146+
123147
// ExampleReceiverProducer allows producing traces and metrics for testing purposes.
124148
type ExampleReceiverProducer struct {
125-
TraceConsumer consumer.TraceConsumerOld
126149
Started bool
127150
Stopped bool
151+
TraceConsumer consumer.TraceConsumerOld
128152
MetricsConsumer consumer.MetricsConsumerOld
153+
LogConsumer consumer.LogConsumer
129154
}
130155

131156
// Start tells the receiver to start its processing.
@@ -199,7 +224,8 @@ func (f *MultiProtoReceiverFactory) CustomUnmarshaler() component.CustomUnmarsha
199224
// CreateDefaultConfig creates the default configuration for the Receiver.
200225
func (f *MultiProtoReceiverFactory) CreateDefaultConfig() configmodels.Receiver {
201226
return &MultiProtoReceiver{
202-
TypeVal: "multireceiver",
227+
TypeVal: f.Type(),
228+
NameVal: string(f.Type()),
203229
Protocols: map[string]MultiProtoReceiverOneCfg{
204230
"http": {
205231
Endpoint: "example.com:8888",
@@ -256,7 +282,10 @@ func (f *ExampleExporterFactory) Type() configmodels.Type {
256282
// CreateDefaultConfig creates the default configuration for the Exporter.
257283
func (f *ExampleExporterFactory) CreateDefaultConfig() configmodels.Exporter {
258284
return &ExampleExporter{
259-
ExporterSettings: configmodels.ExporterSettings{TypeVal: f.Type()},
285+
ExporterSettings: configmodels.ExporterSettings{
286+
TypeVal: f.Type(),
287+
NameVal: string(f.Type()),
288+
},
260289
ExtraSetting: "some export string",
261290
ExtraMapSetting: nil,
262291
ExtraListSetting: nil,
@@ -273,10 +302,19 @@ func (f *ExampleExporterFactory) CreateMetricsExporter(logger *zap.Logger, cfg c
273302
return &ExampleExporterConsumer{}, nil
274303
}
275304

305+
func (f *ExampleExporterFactory) CreateLogExporter(
306+
ctx context.Context,
307+
params component.ExporterCreateParams,
308+
cfg configmodels.Exporter,
309+
) (component.LogExporter, error) {
310+
return &ExampleExporterConsumer{}, nil
311+
}
312+
276313
// ExampleExporterConsumer stores consumed traces and metrics for testing purposes.
277314
type ExampleExporterConsumer struct {
278315
Traces []consumerdata.TraceData
279316
Metrics []consumerdata.MetricsData
317+
Logs []data.Logs
280318
ExporterStarted bool
281319
ExporterShutdown bool
282320
}
@@ -301,6 +339,11 @@ func (exp *ExampleExporterConsumer) ConsumeMetricsData(ctx context.Context, md c
301339
return nil
302340
}
303341

342+
func (exp *ExampleExporterConsumer) ConsumeLogs(ctx context.Context, ld data.Logs) error {
343+
exp.Logs = append(exp.Logs, ld)
344+
return nil
345+
}
346+
304347
// Name returns the name of the exporter.
305348
func (exp *ExampleExporterConsumer) Name() string {
306349
return "exampleexporter"
@@ -312,9 +355,9 @@ func (exp *ExampleExporterConsumer) Shutdown(context.Context) error {
312355
return nil
313356
}
314357

315-
// ExampleProcessor is for testing purposes. We are defining an example config and factory
358+
// ExampleProcessorCfg is for testing purposes. We are defining an example config and factory
316359
// for "exampleprocessor" processor type.
317-
type ExampleProcessor struct {
360+
type ExampleProcessorCfg struct {
318361
configmodels.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
319362
ExtraSetting string `mapstructure:"extra"`
320363
ExtraMapSetting map[string]string `mapstructure:"extra_map"`
@@ -332,11 +375,14 @@ func (f *ExampleProcessorFactory) Type() configmodels.Type {
332375

333376
// CreateDefaultConfig creates the default configuration for the Processor.
334377
func (f *ExampleProcessorFactory) CreateDefaultConfig() configmodels.Processor {
335-
return &ExampleProcessor{
336-
ProcessorSettings: configmodels.ProcessorSettings{},
337-
ExtraSetting: "some export string",
338-
ExtraMapSetting: nil,
339-
ExtraListSetting: nil,
378+
return &ExampleProcessorCfg{
379+
ProcessorSettings: configmodels.ProcessorSettings{
380+
TypeVal: f.Type(),
381+
NameVal: string(f.Type()),
382+
},
383+
ExtraSetting: "some export string",
384+
ExtraMapSetting: nil,
385+
ExtraListSetting: nil,
340386
}
341387
}
342388

@@ -358,6 +404,35 @@ func (f *ExampleProcessorFactory) CreateMetricsProcessor(
358404
return nil, configerror.ErrDataTypeIsNotSupported
359405
}
360406

407+
func (f *ExampleProcessorFactory) CreateLogProcessor(
408+
ctx context.Context,
409+
params component.ProcessorCreateParams,
410+
cfg configmodels.Processor,
411+
nextConsumer consumer.LogConsumer,
412+
) (component.LogProcessor, error) {
413+
return &ExampleProcessor{nextConsumer}, nil
414+
}
415+
416+
type ExampleProcessor struct {
417+
nextConsumer consumer.LogConsumer
418+
}
419+
420+
func (ep *ExampleProcessor) Start(ctx context.Context, host component.Host) error {
421+
return nil
422+
}
423+
424+
func (ep *ExampleProcessor) Shutdown(ctx context.Context) error {
425+
return nil
426+
}
427+
428+
func (ep *ExampleProcessor) GetCapabilities() component.ProcessorCapabilities {
429+
return component.ProcessorCapabilities{MutatesConsumedData: false}
430+
}
431+
432+
func (ep *ExampleProcessor) ConsumeLogs(ctx context.Context, ld data.Logs) error {
433+
return ep.nextConsumer.ConsumeLogs(ctx, ld)
434+
}
435+
361436
// ExampleExtensionCfg is for testing purposes. We are defining an example config and factory
362437
// for "exampleextension" extension type.
363438
type ExampleExtensionCfg struct {
@@ -387,10 +462,13 @@ func (f *ExampleExtensionFactory) Type() configmodels.Type {
387462
// CreateDefaultConfig creates the default configuration for the Extension.
388463
func (f *ExampleExtensionFactory) CreateDefaultConfig() configmodels.Extension {
389464
return &ExampleExtensionCfg{
390-
ExtensionSettings: configmodels.ExtensionSettings{TypeVal: f.Type()},
391-
ExtraSetting: "extra string setting",
392-
ExtraMapSetting: nil,
393-
ExtraListSetting: nil,
465+
ExtensionSettings: configmodels.ExtensionSettings{
466+
TypeVal: f.Type(),
467+
NameVal: string(f.Type()),
468+
},
469+
ExtraSetting: "extra string setting",
470+
ExtraMapSetting: nil,
471+
ExtraListSetting: nil,
394472
}
395473
}
396474

0 commit comments

Comments
 (0)