Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 20 additions & 125 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func DecodeTypeAndName(key string) (typeStr configmodels.Type, fullName string,
typeStr = configmodels.Type(strings.TrimSpace(items[0]))
}

if len(items) < 1 || typeStr == "" {
if len(items) == 0 || typeStr == "" {
err = errors.New("type/name key must have the type part")
return
}
Expand Down Expand Up @@ -599,36 +599,31 @@ func ValidateConfig(cfg *configmodels.Config, logger *zap.Logger) error {
// invalid cases that we currently don't check for but which we may want to add in
// the future (e.g. disallowing receiving and exporting on the same endpoint).

if err := validateService(cfg, logger); err != nil {
if err := validateReceivers(cfg); err != nil {
return err
}

if err := validatePipelines(cfg, logger); err != nil {
if err := validateExporters(cfg); err != nil {
return err
}

if err := validateReceivers(cfg); err != nil {
if err := validateService(cfg); err != nil {
return err
}
if err := validateExporters(cfg); err != nil {

if err := validatePipelines(cfg); err != nil {
return err
}
validateProcessors(cfg)

return nil
}

func validateService(cfg *configmodels.Config, logger *zap.Logger) error {
func validateService(cfg *configmodels.Config) error {
// Currently only to validate extensions.
return validateServiceExtensions(cfg, &cfg.Service, logger)
return validateServiceExtensions(cfg, &cfg.Service)
}

func validateServiceExtensions(
cfg *configmodels.Config,
service *configmodels.Service,
logger *zap.Logger,
) error {
if len(cfg.Service.Extensions) < 1 {
func validateServiceExtensions(cfg *configmodels.Config, service *configmodels.Service) error {
if len(cfg.Service.Extensions) == 0 {
return nil
}

Expand All @@ -643,64 +638,41 @@ func validateServiceExtensions(
}
}

// Remove disabled extensions.
extensions := service.Extensions[:0]
for _, ref := range service.Extensions {
ext := cfg.Extensions[ref]
if ext.IsEnabled() {
// The extension is enabled. Keep it in the pipeline.
extensions = append(extensions, ref)
} else {
logger.Info("service references a disabled extension. Ignoring the extension.",
zap.String("extension", ref))
}
}

service.Extensions = extensions

return nil
}

func validatePipelines(cfg *configmodels.Config, logger *zap.Logger) error {
func validatePipelines(cfg *configmodels.Config) error {
// Must have at least one pipeline.
if len(cfg.Service.Pipelines) < 1 {
if len(cfg.Service.Pipelines) == 0 {
return &configError{code: errMissingPipelines, msg: "must have at least one pipeline"}
}

// Validate pipelines.
for _, pipeline := range cfg.Service.Pipelines {
if err := validatePipeline(cfg, pipeline, logger); err != nil {
if err := validatePipeline(cfg, pipeline); err != nil {
return err
}
}
return nil
}

func validatePipeline(
cfg *configmodels.Config,
pipeline *configmodels.Pipeline,
logger *zap.Logger,
) error {
if err := validatePipelineReceivers(cfg, pipeline, logger); err != nil {
func validatePipeline(cfg *configmodels.Config, pipeline *configmodels.Pipeline) error {
if err := validatePipelineReceivers(cfg, pipeline); err != nil {
return err
}

if err := validatePipelineExporters(cfg, pipeline, logger); err != nil {
if err := validatePipelineExporters(cfg, pipeline); err != nil {
return err
}

if err := validatePipelineProcessors(cfg, pipeline, logger); err != nil {
if err := validatePipelineProcessors(cfg, pipeline); err != nil {
return err
}

return nil
}

func validatePipelineReceivers(
cfg *configmodels.Config,
pipeline *configmodels.Pipeline,
logger *zap.Logger,
) error {
func validatePipelineReceivers(cfg *configmodels.Config, pipeline *configmodels.Pipeline) error {
if len(pipeline.Receivers) == 0 {
return &configError{
code: errPipelineMustHaveReceiver,
Expand All @@ -719,30 +691,10 @@ func validatePipelineReceivers(
}
}

// Remove disabled receivers.
rs := pipeline.Receivers[:0]
for _, ref := range pipeline.Receivers {
rcv := cfg.Receivers[ref]
if rcv.IsEnabled() {
// The receiver is enabled. Keep it in the pipeline.
rs = append(rs, ref)
} else {
logger.Info("pipeline references a disabled receiver. Ignoring the receiver.",
zap.String("pipeline", pipeline.Name),
zap.String("receiver", ref))
}
}

pipeline.Receivers = rs

return nil
}

func validatePipelineExporters(
cfg *configmodels.Config,
pipeline *configmodels.Pipeline,
logger *zap.Logger,
) error {
func validatePipelineExporters(cfg *configmodels.Config, pipeline *configmodels.Pipeline) error {
if len(pipeline.Exporters) == 0 {
return &configError{
code: errPipelineMustHaveExporter,
Expand All @@ -761,29 +713,10 @@ func validatePipelineExporters(
}
}

// Remove disabled exporters.
rs := pipeline.Exporters[:0]
for _, ref := range pipeline.Exporters {
exp := cfg.Exporters[ref]
if exp.IsEnabled() {
// The exporter is enabled. Keep it in the pipeline.
rs = append(rs, ref)
} else {
logger.Info("pipeline references a disabled exporter. Ignoring the exporter.",
zap.String("pipeline", pipeline.Name),
zap.String("exporter", ref))
}
}
pipeline.Exporters = rs

return nil
}

func validatePipelineProcessors(
cfg *configmodels.Config,
pipeline *configmodels.Pipeline,
logger *zap.Logger,
) error {
func validatePipelineProcessors(cfg *configmodels.Config, pipeline *configmodels.Pipeline) error {
if len(pipeline.Processors) == 0 {
return nil
}
Expand All @@ -799,32 +732,10 @@ func validatePipelineProcessors(
}
}

// Remove disabled processors.
rs := pipeline.Processors[:0]
for _, ref := range pipeline.Processors {
proc := cfg.Processors[ref]
if proc.IsEnabled() {
// The processor is enabled. Keep it in the pipeline.
rs = append(rs, ref)
} else {
logger.Info("pipeline references a disabled processor. Ignoring the processor.",
zap.String("pipeline", pipeline.Name),
zap.String("processor", ref))
}
}
pipeline.Processors = rs

return nil
}

func validateReceivers(cfg *configmodels.Config) error {
// Remove disabled receivers.
for name, rcv := range cfg.Receivers {
if !rcv.IsEnabled() {
delete(cfg.Receivers, name)
}
}

// Currently there is no default receiver enabled. The configuration must specify at least one enabled receiver to
// be valid.
if len(cfg.Receivers) == 0 {
Expand All @@ -837,13 +748,6 @@ func validateReceivers(cfg *configmodels.Config) error {
}

func validateExporters(cfg *configmodels.Config) error {
// Remove disabled exporters.
for name, rcv := range cfg.Exporters {
if !rcv.IsEnabled() {
delete(cfg.Exporters, name)
}
}

// There must be at least one enabled exporter to be considered a valid configuration.
if len(cfg.Exporters) == 0 {
return &configError{
Expand All @@ -854,15 +758,6 @@ func validateExporters(cfg *configmodels.Config) error {
return nil
}

func validateProcessors(cfg *configmodels.Config) {
// Remove disabled processors.
for name, rcv := range cfg.Processors {
if !rcv.IsEnabled() {
delete(cfg.Processors, name)
}
}
}

// expandEnvConfig creates a new viper config with expanded values for all the values (simple, list or map value).
// It does not expand the keys.
func expandEnvConfig(v *viper.Viper) {
Expand Down
Loading