From c82dd02ed78aed525d3236f8b5bbb588e299f739 Mon Sep 17 00:00:00 2001 From: Paulo Janotti Date: Tue, 30 Mar 2021 17:26:15 +0000 Subject: [PATCH 1/5] Add a config source manager --- config/internal/configsource/manager.go | 316 ++++++++++++ config/internal/configsource/manager_test.go | 457 ++++++++++++++++++ .../testdata/arrays_and_maps.yaml | 12 + .../testdata/arrays_and_maps_expected.yaml | 12 + 4 files changed, 797 insertions(+) create mode 100644 config/internal/configsource/manager.go create mode 100644 config/internal/configsource/manager_test.go create mode 100644 config/internal/configsource/testdata/arrays_and_maps.yaml create mode 100644 config/internal/configsource/testdata/arrays_and_maps_expected.yaml diff --git a/config/internal/configsource/manager.go b/config/internal/configsource/manager.go new file mode 100644 index 00000000000..1c110af1125 --- /dev/null +++ b/config/internal/configsource/manager.go @@ -0,0 +1,316 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package configsource + +import ( + "bytes" + "context" + "errors" + "fmt" + "strings" + "sync" + + "github.com/spf13/viper" + + "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/consumer/consumererror" +) + +// Manager is used to inject data from config sources into a configuration and also +// to monitor for updates on the items injected into the configuration. All methods +// of a Manager must be called only once and have a expected sequence: +// +// 1. NewManager to create a new instance; +// 2. Resolve to inject the data from config sources into a configuration; +// 3. WatchForUpdate in a goroutine to wait for configuration updates; +// 4. WaitForWatcher to wait until the watchers are in place; +// 5. Close to close the instance; +// +type Manager struct { + // configSources is map from ConfigSource names (as defined in the configuration) + // and the respective instances. + configSources map[string]ConfigSource + // sessions track all the Session objects used to retrieve values to be injected + // into the configuration. + sessions map[string]Session + // watchers keeps track of all WatchForUpdate functions for retrieved values. + watchers []func() error + // watchersWG is used to ensure that Close waits for all WatchForUpdate calls + // to complete. + watchersWG sync.WaitGroup + // watchingCh is used to notify users of the Manager that the WatchForUpdate function + // is ready and waiting for notifications. + watchingCh chan struct{} + // closeCh is used to notify the Manager WatchForUpdate function that the manager + // is being closed. + closeCh chan struct{} +} + +// NewManager creates a new instance of a Manager to be used to inject data from +// ConfigSource objects into a configuration and watch for updates on the injected +// data. +func NewManager(*config.Parser) (*Manager, error) { + // TODO: Config sources should be extracted for the config itself, need Factories for that. + + return &Manager{ + // TODO: Temporarily tests should set their config sources per their needs. + sessions: make(map[string]Session), + watchingCh: make(chan struct{}), + closeCh: make(chan struct{}), + }, nil +} + +// Resolve inspects the given config.Parser and resolves all config sources referenced +// in the configuration, returning a config.Parser fully resolved. This must be called only +// once per lifetime of a Manager object. +func (m *Manager) Resolve(ctx context.Context, parser *config.Parser) (*config.Parser, error) { + res := config.NewParser() + allKeys := parser.AllKeys() + for _, k := range allKeys { + value, err := m.expandStringValues(ctx, parser.Get(k)) + if err != nil { + // Call RetrieveEnd for all sessions used so far but don't record any errors. + _ = m.retrieveEndAllSessions(ctx) + return nil, err + } + res.Set(k, value) + } + + if errs := m.retrieveEndAllSessions(ctx); len(errs) > 0 { + return nil, consumererror.Combine(errs) + } + + return res, nil +} + +// WatchForUpdate must watch for updates on any of the values retrieved from config sources +// and injected into the configuration. Typically this method is launched in a goroutine, the +// method WaitForWatcher blocks until the WatchForUpdate goroutine is running and ready. +func (m *Manager) WatchForUpdate() error { + // Use a channel to capture the first error returned by any watcher and another one + // to ensure completion of any remaining watcher also trying to report an error. + errChannel := make(chan error, 1) + doneCh := make(chan struct{}) + defer close(doneCh) + + for _, watcher := range m.watchers { + m.watchersWG.Add(1) + watcherFn := watcher + go func() { + defer m.watchersWG.Done() + + err := watcherFn() + switch { + case errors.Is(err, ErrWatcherNotSupported): + // The watcher for the retrieved value is not supported, nothing to + // do, just exit from the goroutine. + return + case errors.Is(err, ErrSessionClosed): + // The Session from which this watcher was retrieved is being closed. + // There is no error to report, just exit from the goroutine. + return + default: + select { + case errChannel <- err: + // Try to report any other error. + case <-doneCh: + // There was either one error published or the watcher was closed. + // This channel was closed and any goroutines waiting on these + // should simply close. + } + } + }() + } + + // All goroutines were created, they may not be running yet, but the manager WatchForUpdate + // is only waiting for any of the watchers to terminate. + close(m.watchingCh) + + select { + case err := <-errChannel: + // Return the first error that reaches the channel and ignore any other error. + return err + case <-m.closeCh: + // This covers the case that all watchers returned ErrWatcherNotSupported. + return ErrSessionClosed + } +} + +// WaitForWatcher blocks until the watchers used by WatchForUpdate are all ready. +// This is used to ensure that the watchers are in place before proceeding. +func (m *Manager) WaitForWatcher() { + <-m.watchingCh +} + +// Close terminates the WatchForUpdate function and closes all Session objects used +// in the configuration. It should be called +func (m *Manager) Close(ctx context.Context) error { + var errs []error + for _, session := range m.sessions { + if err := session.Close(ctx); err != nil { + errs = append(errs, err) + } + } + + close(m.closeCh) + m.watchersWG.Wait() + + return consumererror.Combine(errs) +} + +func (m *Manager) retrieveEndAllSessions(ctx context.Context) []error { + var errs []error + for _, session := range m.sessions { + if err := session.RetrieveEnd(ctx); err != nil { + errs = append(errs, err) + } + } + return errs +} + +func (m *Manager) expandStringValues(ctx context.Context, value interface{}) (interface{}, error) { + switch v := value.(type) { + case string: + return m.expandConfigSources(ctx, v) + case []interface{}: + nslice := make([]interface{}, 0, len(v)) + for _, vint := range v { + value, err := m.expandStringValues(ctx, vint) + if err != nil { + return nil, err + } + nslice = append(nslice, value) + } + return nslice, nil + case map[interface{}]interface{}: + nmap := make(map[interface{}]interface{}, len(v)) + for k, vint := range v { + value, err := m.expandStringValues(ctx, vint) + if err != nil { + return nil, err + } + nmap[k] = value + } + return nmap, nil + default: + return v, nil + } +} + +// expandConfigSources retrieve data from the specified config sources and injects them into +// the configuration. The Manager tracks sessions and watcher objects as needed. +func (m *Manager) expandConfigSources(ctx context.Context, s string) (interface{}, error) { + // Provisional implementation: only strings prefixed with the first character '$' + // are checked for config sources. + // + // TODO: Handle different ways to express config sources: + // + // 1. Concatenated with other strings (needs delimiter syntax); + // 2. Using spaces in its declaration; + // 3. Multiline; + // + if len(s) == 0 || s[0] != '$' { + // TODO: handle escaped $. + return s, nil + } + + cfgSrcName, selector, params, err := parseCfgSrc(s[1:]) + if err != nil { + return nil, err + } + + session, ok := m.sessions[cfgSrcName] + if !ok { + // The session for this config source was not created yet. + cfgSrc, ok := m.configSources[cfgSrcName] + if !ok { + return nil, fmt.Errorf("config source %q not found", cfgSrcName) + } + + session, err = cfgSrc.NewSession(ctx) + if err != nil { + return nil, fmt.Errorf("failed to create session for config source %q: %w", cfgSrcName, err) + } + m.sessions[cfgSrcName] = session + } + + retrieved, err := session.Retrieve(ctx, selector, params) + if err != nil { + return nil, fmt.Errorf("config source %q failed to retrieve value: %w", cfgSrcName, err) + } + + m.watchers = append(m.watchers, retrieved.WatchForUpdate) + + return retrieved.Value(), nil +} + +// parseCfgSrc extracts the reference to a config source from a string value. +// The current syntax is provisional: :[?] +// The caller should check for error explicitly since it is possible for the +// other values to have been partially set. +// TODO: Improve parameter resolution. +func parseCfgSrc(s string) (cfgSrcName, selector string, params interface{}, err error) { + const cfgSrcDelim string = ":" + parts := strings.SplitN(s, cfgSrcDelim, 2) + if len(parts) != 2 { + err = fmt.Errorf("inproper config source syntax at %q, it must have at least the config source name and a selector", s) + return + } + cfgSrcName = strings.Trim(parts[0], " ") + + const selectorDelim string = "?" + parts = strings.SplitN(parts[1], selectorDelim, 2) + selector = strings.Trim(parts[0], " ") + + if len(parts) == 2 { + // There are parameters, for now simply transform it to an YAML and parse it + params, err = parseParams(parts[1]) + if err != nil { + err = fmt.Errorf("invalid parameters syntax at %q: %w", s, err) + return + } + } + + return cfgSrcName, selector, params, err +} + +func parseParams(s string) (interface{}, error) { + // Build a single-line valid yaml text to be parsed. + // The delimiter chars are all ASCII '{', '}', ',', ':', looping over string + // as bytes is fine. + yamlBuf := make([]byte, 0, 2*len(s)) + yamlBuf = append(yamlBuf, []byte("params: ")...) + + for i := 0; i < len(s); i++ { + yamlBuf = append(yamlBuf, s[i]) + if yamlDelimiter(s[i]) { + // Make it a legal YAML by adding a single space after the delimiters. + yamlBuf = append(yamlBuf, ' ') + } + } + + // yamlBuf now it is a single line representing the params, parse it. + v := viper.New() + v.SetConfigType("yaml") + if err := v.ReadConfig(bytes.NewReader(yamlBuf)); err != nil { + return nil, err + } + + return v.Get("params"), nil +} + +func yamlDelimiter(c byte) bool { + return c == '{' || c == '}' || c == ':' || c == ',' +} diff --git a/config/internal/configsource/manager_test.go b/config/internal/configsource/manager_test.go new file mode 100644 index 00000000000..a65382eb8c8 --- /dev/null +++ b/config/internal/configsource/manager_test.go @@ -0,0 +1,457 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package configsource + +import ( + "context" + "errors" + "fmt" + "path" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/config" +) + +func TestConfigSourceManager_Simple(t *testing.T) { + ctx := context.Background() + manager, err := NewManager(nil) + require.NoError(t, err) + manager.configSources = map[string]ConfigSource{ + "tstcfgsrc": &testConfigSource{ + ValueMap: map[string]valueEntry{ + "test_selector": {Value: "test_value"}, + }, + }, + } + + originalCfg := map[string]interface{}{ + "top0": map[string]interface{}{ + "int": 1, + "cfgsrc": "$tstcfgsrc:test_selector", + }, + } + expectedCfg := map[string]interface{}{ + "top0": map[string]interface{}{ + "int": 1, + "cfgsrc": "test_value", + }, + } + + cp := config.NewParserFromStringMap(originalCfg) + + res, err := manager.Resolve(ctx, cp) + require.NoError(t, err) + actualCfg := res.Viper().AllSettings() + assert.Equal(t, expectedCfg, actualCfg) + + doneCh := make(chan struct{}) + var errWatcher error + go func() { + defer close(doneCh) + errWatcher = manager.WatchForUpdate() + }() + + manager.WaitForWatcher() + assert.NoError(t, manager.Close(ctx)) + <-doneCh + assert.ErrorIs(t, errWatcher, ErrSessionClosed) +} + +func TestConfigSourceManager_ResolveErrors(t *testing.T) { + ctx := context.Background() + testErr := errors.New("test error") + + tests := []struct { + name string + config map[string]interface{} + configSourceMap map[string]ConfigSource + }{ + { + name: "not_found_config_source", + config: map[string]interface{}{ + "cfgsrc": "$unknown:test_selector", + }, + configSourceMap: map[string]ConfigSource{ + "tstcfgsrc": &testConfigSource{}, + }, + }, + { + name: "incorrect_cfgsrc_ref", + config: map[string]interface{}{ + "cfgsrc": "$tstcfgsrc:selector?{invalid}", + }, + configSourceMap: map[string]ConfigSource{ + "tstcfgsrc": &testConfigSource{}, + }, + }, + { + name: "error_on_new_session", + config: map[string]interface{}{ + "cfgsrc": "$tstcfgsrc:selector", + }, + configSourceMap: map[string]ConfigSource{ + "tstcfgsrc": &testConfigSource{ErrOnNewSession: testErr}, + }, + }, + { + name: "error_on_retrieve", + config: map[string]interface{}{ + "cfgsrc": "$tstcfgsrc:selector", + }, + configSourceMap: map[string]ConfigSource{ + "tstcfgsrc": &testConfigSource{ErrOnRetrieve: testErr}, + }, + }, + { + name: "error_on_retrieve_end", + config: map[string]interface{}{ + "cfgsrc": "$tstcfgsrc:selector", + }, + configSourceMap: map[string]ConfigSource{ + "tstcfgsrc": &testConfigSource{ + ErrOnRetrieveEnd: testErr, + ValueMap: map[string]valueEntry{ + "selector": {Value: "test_value"}, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + manager, err := NewManager(nil) + require.NoError(t, err) + manager.configSources = tt.configSourceMap + + res, err := manager.Resolve(ctx, config.NewParserFromStringMap(tt.config)) + require.Error(t, err) + require.Nil(t, res) + require.NoError(t, manager.Close(ctx)) + }) + } +} + +func TestConfigSourceManager_ArraysAndMaps(t *testing.T) { + ctx := context.Background() + manager, err := NewManager(nil) + require.NoError(t, err) + manager.configSources = map[string]ConfigSource{ + "tstcfgsrc": &testConfigSource{ + ValueMap: map[string]valueEntry{ + "elem0": {Value: "elem0_value"}, + "elem1": {Value: "elem1_value"}, + "k0": {Value: "k0_value"}, + "k1": {Value: "k1_value"}, + }, + }, + } + + file := path.Join("testdata", "arrays_and_maps.yaml") + cp, err := config.NewParserFromFile(file) + require.NoError(t, err) + + expectedFile := path.Join("testdata", "arrays_and_maps_expected.yaml") + expectedParser, err := config.NewParserFromFile(expectedFile) + require.NoError(t, err) + expectedCfg := expectedParser.Viper().AllSettings() + + res, err := manager.Resolve(ctx, cp) + require.NoError(t, err) + actualCfg := res.Viper().AllSettings() + assert.Equal(t, expectedCfg, actualCfg) + assert.NoError(t, manager.Close(ctx)) +} + +func TestConfigSourceManager_WatchForUpdate(t *testing.T) { + ctx := context.Background() + manager, err := NewManager(nil) + require.NoError(t, err) + + watchForUpdateCh := make(chan error, 1) + manager.configSources = map[string]ConfigSource{ + "tstcfgsrc": &testConfigSource{ + ValueMap: map[string]valueEntry{ + "test_selector": { + Value: "test_value", + WatchForUpdateFn: func() error { + return <-watchForUpdateCh + }, + }, + }, + }, + } + + originalCfg := map[string]interface{}{ + "top0": map[string]interface{}{ + "var0": "$tstcfgsrc:test_selector", + }, + } + + cp := config.NewParserFromStringMap(originalCfg) + _, err = manager.Resolve(ctx, cp) + require.NoError(t, err) + + doneCh := make(chan struct{}) + var errWatcher error + go func() { + defer close(doneCh) + errWatcher = manager.WatchForUpdate() + }() + + manager.WaitForWatcher() + watchForUpdateCh <- ErrValueUpdated + + <-doneCh + assert.ErrorIs(t, errWatcher, ErrValueUpdated) + assert.NoError(t, manager.Close(ctx)) +} + +func TestConfigSourceManager_MultipleWatchForUpdate(t *testing.T) { + ctx := context.Background() + manager, err := NewManager(nil) + require.NoError(t, err) + + watchDoneCh := make(chan struct{}) + const watchForUpdateChSize int = 2 + watchForUpdateCh := make(chan error, watchForUpdateChSize) + watchForUpdateFn := func() error { + select { + case errFromWatchForUpdate := <-watchForUpdateCh: + return errFromWatchForUpdate + case <-watchDoneCh: + return ErrSessionClosed + } + } + + manager.configSources = map[string]ConfigSource{ + "tstcfgsrc": &testConfigSource{ + ValueMap: map[string]valueEntry{ + "test_selector": { + Value: "test_value", + WatchForUpdateFn: watchForUpdateFn, + }, + }, + }, + } + + originalCfg := map[string]interface{}{ + "top0": map[string]interface{}{ + "var0": "$tstcfgsrc:test_selector", + "var1": "$tstcfgsrc:test_selector", + "var2": "$tstcfgsrc:test_selector", + "var3": "$tstcfgsrc:test_selector", + }, + } + + cp := config.NewParserFromStringMap(originalCfg) + _, err = manager.Resolve(ctx, cp) + require.NoError(t, err) + + doneCh := make(chan struct{}) + var errWatcher error + go func() { + defer close(doneCh) + errWatcher = manager.WatchForUpdate() + }() + + manager.WaitForWatcher() + + for i := 0; i < watchForUpdateChSize; i++ { + watchForUpdateCh <- ErrValueUpdated + } + + <-doneCh + assert.ErrorIs(t, errWatcher, ErrValueUpdated) + close(watchForUpdateCh) + assert.NoError(t, manager.Close(ctx)) +} + +func TestConfigSourceManager_expandConfigSources(t *testing.T) { + ctx := context.Background() + csp, err := NewManager(nil) + require.NoError(t, err) + csp.configSources = map[string]ConfigSource{ + "tstcfgsrc": &testConfigSource{ + ValueMap: map[string]valueEntry{ + "test_selector": {Value: "test_value"}, + }, + }, + } + + v, err := csp.expandConfigSources(ctx, "") + assert.NoError(t, err) + assert.Equal(t, "", v) + + v, err = csp.expandConfigSources(ctx, "no_cfgsrc") + assert.NoError(t, err) + assert.Equal(t, "no_cfgsrc", v) + + // Not found config source. + v, err = csp.expandConfigSources(ctx, "$cfgsrc:selector") + assert.Error(t, err) + assert.Nil(t, v) + + v, err = csp.expandConfigSources(ctx, "$tstcfgsrc:test_selector") + assert.NoError(t, err) + assert.Equal(t, "test_value", v) + + v, err = csp.expandConfigSources(ctx, "$tstcfgsrc:invalid_selector") + assert.Error(t, err) + assert.Nil(t, v) +} + +func Test_parseCfgSrc(t *testing.T) { + tests := []struct { + name string + str string + cfgSrcName string + selector string + params interface{} + wantErr bool + }{ + { + name: "basic", + str: "cfgsrc:selector", + cfgSrcName: "cfgsrc", + selector: "selector", + }, + { + name: "missing_selector", + str: "cfgsrc", + wantErr: true, + }, + { + name: "params", + str: "cfgsrc:selector?{p0:1,p1:a_string,p2:true}", + cfgSrcName: "cfgsrc", + selector: "selector", + params: map[string]interface{}{ + "p0": 1, + "p1": "a_string", + "p2": true, + }, + }, + { + name: "scalar_params", + str: "cfgsrc:selector?true", + cfgSrcName: "cfgsrc", + selector: "selector", + params: true, + }, + { + name: "array_in_params", + str: "cfgsrc:selector?{p0:[0,1,2],p1:done}", + cfgSrcName: "cfgsrc", + selector: "selector", + params: map[string]interface{}{ + "p0": []interface{}{0, 1, 2}, + "p1": "done", + }, + }, + { + name: "invalid_params", + str: "cfgsrc:selector?{no_closing:bracket", + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfgSrcName, selector, params, err := parseCfgSrc(tt.str) + if tt.wantErr { + assert.Error(t, err) + return + } + + assert.Equal(t, tt.cfgSrcName, cfgSrcName) + assert.Equal(t, tt.selector, selector) + assert.Equal(t, tt.params, params) + }) + } +} + +// testConfigSource a ConfigSource to be used in tests. +type testConfigSource struct { + ValueMap map[string]valueEntry + + ErrOnNewSession error + ErrOnRetrieve error + ErrOnRetrieveEnd error + ErrOnClose error +} + +type valueEntry struct { + Value interface{} + WatchForUpdateFn func() error +} + +var _ (ConfigSource) = (*testConfigSource)(nil) +var _ (Session) = (*testConfigSource)(nil) + +func (t *testConfigSource) NewSession(context.Context) (Session, error) { + if t.ErrOnNewSession != nil { + return nil, t.ErrOnNewSession + } + return t, nil +} + +func (t *testConfigSource) Retrieve(_ context.Context, selector string, _ interface{}) (Retrieved, error) { + if t.ErrOnRetrieve != nil { + return nil, t.ErrOnRetrieve + } + + entry, ok := t.ValueMap[selector] + if !ok { + return nil, fmt.Errorf("no value for selector %q", selector) + } + + watchForUpdateFn := func() error { + return ErrWatcherNotSupported + } + + if entry.WatchForUpdateFn != nil { + watchForUpdateFn = entry.WatchForUpdateFn + } + + return &retrieved{ + value: entry.Value, + watchForUpdateFn: watchForUpdateFn, + }, nil +} + +func (t *testConfigSource) RetrieveEnd(context.Context) error { + return t.ErrOnRetrieveEnd +} + +func (t *testConfigSource) Close(context.Context) error { + return t.ErrOnClose +} + +type retrieved struct { + value interface{} + watchForUpdateFn func() error +} + +var _ (Retrieved) = (*retrieved)(nil) + +func (r *retrieved) Value() interface{} { + return r.value +} + +func (r *retrieved) WatchForUpdate() error { + return r.watchForUpdateFn() +} diff --git a/config/internal/configsource/testdata/arrays_and_maps.yaml b/config/internal/configsource/testdata/arrays_and_maps.yaml new file mode 100644 index 00000000000..7a52ac88023 --- /dev/null +++ b/config/internal/configsource/testdata/arrays_and_maps.yaml @@ -0,0 +1,12 @@ +top0: + array0: + - $tstcfgsrc:elem0 + - $tstcfgsrc:elem1 + array1: + - entry: + str: $tstcfgsrc:elem0 + - entry: + str: $tstcfgsrc:elem1 + map0: + k0: $tstcfgsrc:k0 + k1: $tstcfgsrc:k1 diff --git a/config/internal/configsource/testdata/arrays_and_maps_expected.yaml b/config/internal/configsource/testdata/arrays_and_maps_expected.yaml new file mode 100644 index 00000000000..9abf036675d --- /dev/null +++ b/config/internal/configsource/testdata/arrays_and_maps_expected.yaml @@ -0,0 +1,12 @@ +top0: + array0: + - elem0_value + - elem1_value + array1: + - entry: + str: elem0_value + - entry: + str: elem1_value + map0: + k0: k0_value + k1: k1_value From 1d8bd4e1bdf0e7bd21cdc23c3d7376746d66f6b5 Mon Sep 17 00:00:00 2001 From: Paulo Janotti Date: Tue, 30 Mar 2021 18:32:58 +0000 Subject: [PATCH 2/5] Add cfg src syntax description --- config/internal/configsource/manager.go | 29 +++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/config/internal/configsource/manager.go b/config/internal/configsource/manager.go index 1c110af1125..4699598aa2f 100644 --- a/config/internal/configsource/manager.go +++ b/config/internal/configsource/manager.go @@ -38,6 +38,35 @@ import ( // 4. WaitForWatcher to wait until the watchers are in place; // 5. Close to close the instance; // +// The current syntax to reference a config source in a YAML is provisional. Currently +// only now-wrapped single line is supported: +// +// $:[?] +// +// The is a name string used to indentify the config source instance to be used +// to retrieve the value. +// +// The is the mandatory parameter required when retrieving data from a config source. +// +// is an optional parameter in syntax similar to single line YAML but removing the spaces +// to ensure that it is a string value. Not all config sources need these params, they are used to +// provide extra control when retrieving the data. +// +// Assuming a config source named "env" that retrieve environment variables and one named "file" that +// retrieves contents from individual files, here are some examples: +// +// component: +// # Retrieves the value of the environment variable LOGS_DIR. +// logs_dir: $env:LOGS_DIR +// +// # Retrieves the value from the file /etc/secret.bin and injects its contents as a []byte. +// bytes_from_file: $file:/etc/secret.bin?{binary:true} +// +// # Retrieves the value from the file /etc/text.txt and injects its contents as a string. +// # Hypothetically the "file" config source by default tries to inject the file contents +// # as a string if params doesn't specify that "binary" is true. +// text_from_file: $file:/etc/text.txt +// type Manager struct { // configSources is map from ConfigSource names (as defined in the configuration) // and the respective instances. From 8ce9b898fab582e4479f9346636525a7592ad365 Mon Sep 17 00:00:00 2001 From: Paulo Janotti Date: Mon, 5 Apr 2021 11:40:21 -0700 Subject: [PATCH 3/5] Handle single line params similar to URL query --- config/internal/configsource/manager.go | 27 +++++++++++--------- config/internal/configsource/manager_test.go | 21 +++++++-------- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/config/internal/configsource/manager.go b/config/internal/configsource/manager.go index 4699598aa2f..319ca599887 100644 --- a/config/internal/configsource/manager.go +++ b/config/internal/configsource/manager.go @@ -60,7 +60,7 @@ import ( // logs_dir: $env:LOGS_DIR // // # Retrieves the value from the file /etc/secret.bin and injects its contents as a []byte. -// bytes_from_file: $file:/etc/secret.bin?{binary:true} +// bytes_from_file: $file:/etc/secret.bin?binary=true // // # Retrieves the value from the file /etc/text.txt and injects its contents as a string. // # Hypothetically the "file" config source by default tries to inject the file contents @@ -294,7 +294,7 @@ func parseCfgSrc(s string) (cfgSrcName, selector string, params interface{}, err const cfgSrcDelim string = ":" parts := strings.SplitN(s, cfgSrcDelim, 2) if len(parts) != 2 { - err = fmt.Errorf("inproper config source syntax at %q, it must have at least the config source name and a selector", s) + err = fmt.Errorf("invalid config source syntax at %q, it must have at least the config source name and a selector", s) return } cfgSrcName = strings.Trim(parts[0], " ") @@ -317,20 +317,26 @@ func parseCfgSrc(s string) (cfgSrcName, selector string, params interface{}, err func parseParams(s string) (interface{}, error) { // Build a single-line valid yaml text to be parsed. - // The delimiter chars are all ASCII '{', '}', ',', ':', looping over string - // as bytes is fine. + // The delimiter chars are all ASCII '&' and '=', looping over string + // as bytes is fine. yamlBuf := make([]byte, 0, 2*len(s)) - yamlBuf = append(yamlBuf, []byte("params: ")...) + yamlBuf = append(yamlBuf, []byte("params: { ")...) for i := 0; i < len(s); i++ { - yamlBuf = append(yamlBuf, s[i]) - if yamlDelimiter(s[i]) { - // Make it a legal YAML by adding a single space after the delimiters. - yamlBuf = append(yamlBuf, ' ') + switch s[i] { + case '=': + // End of paramater name + yamlBuf = append(yamlBuf, ": "...) + case '&': + // Starting next parameter + yamlBuf = append(yamlBuf, ", "...) + default: + yamlBuf = append(yamlBuf, s[i]) } } // yamlBuf now it is a single line representing the params, parse it. + yamlBuf = append(yamlBuf, " }"...) v := viper.New() v.SetConfigType("yaml") if err := v.ReadConfig(bytes.NewReader(yamlBuf)); err != nil { @@ -340,6 +346,3 @@ func parseParams(s string) (interface{}, error) { return v.Get("params"), nil } -func yamlDelimiter(c byte) bool { - return c == '{' || c == '}' || c == ':' || c == ',' -} diff --git a/config/internal/configsource/manager_test.go b/config/internal/configsource/manager_test.go index a65382eb8c8..5ce2758ba27 100644 --- a/config/internal/configsource/manager_test.go +++ b/config/internal/configsource/manager_test.go @@ -337,7 +337,7 @@ func Test_parseCfgSrc(t *testing.T) { }, { name: "params", - str: "cfgsrc:selector?{p0:1,p1:a_string,p2:true}", + str: "cfgsrc:selector?p0=1&p1=a_string&p2=true", cfgSrcName: "cfgsrc", selector: "selector", params: map[string]interface{}{ @@ -346,16 +346,9 @@ func Test_parseCfgSrc(t *testing.T) { "p2": true, }, }, - { - name: "scalar_params", - str: "cfgsrc:selector?true", - cfgSrcName: "cfgsrc", - selector: "selector", - params: true, - }, { name: "array_in_params", - str: "cfgsrc:selector?{p0:[0,1,2],p1:done}", + str: "cfgsrc:selector?p0=[0,1,2]&p1=done", cfgSrcName: "cfgsrc", selector: "selector", params: map[string]interface{}{ @@ -364,9 +357,13 @@ func Test_parseCfgSrc(t *testing.T) { }, }, { - name: "invalid_params", - str: "cfgsrc:selector?{no_closing:bracket", - wantErr: true, + name: "empty_param", + str: "cfgsrc:selector?no_closing=", + cfgSrcName: "cfgsrc", + selector: "selector", + params: map[string]interface{}{ + "no_closing": interface{}(nil), + }, }, } for _, tt := range tests { From 2e5471cff537d75e46d95795802958398756cd2a Mon Sep 17 00:00:00 2001 From: Paulo Janotti Date: Mon, 5 Apr 2021 15:22:17 -0700 Subject: [PATCH 4/5] Add multi-line and make single-line params as URL query --- config/internal/configsource/manager.go | 138 +++++++++++------- config/internal/configsource/manager_test.go | 76 +++++++++- .../testdata/params_handling.yaml | 13 ++ .../testdata/params_handling_expected.yaml | 14 ++ 4 files changed, 190 insertions(+), 51 deletions(-) create mode 100644 config/internal/configsource/testdata/params_handling.yaml create mode 100644 config/internal/configsource/testdata/params_handling_expected.yaml diff --git a/config/internal/configsource/manager.go b/config/internal/configsource/manager.go index 319ca599887..35c92c861f1 100644 --- a/config/internal/configsource/manager.go +++ b/config/internal/configsource/manager.go @@ -19,10 +19,11 @@ import ( "context" "errors" "fmt" + "net/url" "strings" "sync" - "github.com/spf13/viper" + "gopkg.in/yaml.v2" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer/consumererror" @@ -39,18 +40,41 @@ import ( // 5. Close to close the instance; // // The current syntax to reference a config source in a YAML is provisional. Currently -// only now-wrapped single line is supported: +// single-line: // -// $:[?] +// param_to_be_retrieved: $:[?] +// +// and multi-line are supported: +// +// param_to_be_retrieved: | +// $: +// [] // // The is a name string used to indentify the config source instance to be used // to retrieve the value. // // The is the mandatory parameter required when retrieving data from a config source. // -// is an optional parameter in syntax similar to single line YAML but removing the spaces -// to ensure that it is a string value. Not all config sources need these params, they are used to -// provide extra control when retrieving the data. +// Not all config sources need the optional parameters, they are used to provide extra control when +// retrieving and preparing the data to be injected into the configuration. +// +// For single-line format uses the same syntax as URL query parameters. +// Hypothetical example in a YAML file: +// +// component: +// config_field: $file:/etc/secret.bin?binary=true +// +// For mult-line format uses syntax as a YAML inside YAML. Possible usage +// example in a YAML file: +// +// component: +// config_field: | +// $yamltemplate: /etc/component_template.yaml +// logs_path: /var/logs/component.log +// timeout: 10s +// +// Not all config sources need these optional parameters, they are used to provide extra control when +// retrieving and data to be injected into the configuration. // // Assuming a config source named "env" that retrieve environment variables and one named "file" that // retrieves contents from individual files, here are some examples: @@ -244,11 +268,7 @@ func (m *Manager) expandConfigSources(ctx context.Context, s string) (interface{ // Provisional implementation: only strings prefixed with the first character '$' // are checked for config sources. // - // TODO: Handle different ways to express config sources: - // - // 1. Concatenated with other strings (needs delimiter syntax); - // 2. Using spaces in its declaration; - // 3. Multiline; + // TODO: Handle concatenated with other strings (needs delimiter syntax); // if len(s) == 0 || s[0] != '$' { // TODO: handle escaped $. @@ -286,10 +306,8 @@ func (m *Manager) expandConfigSources(ctx context.Context, s string) (interface{ } // parseCfgSrc extracts the reference to a config source from a string value. -// The current syntax is provisional: :[?] // The caller should check for error explicitly since it is possible for the // other values to have been partially set. -// TODO: Improve parameter resolution. func parseCfgSrc(s string) (cfgSrcName, selector string, params interface{}, err error) { const cfgSrcDelim string = ":" parts := strings.SplitN(s, cfgSrcDelim, 2) @@ -299,50 +317,72 @@ func parseCfgSrc(s string) (cfgSrcName, selector string, params interface{}, err } cfgSrcName = strings.Trim(parts[0], " ") - const selectorDelim string = "?" - parts = strings.SplitN(parts[1], selectorDelim, 2) - selector = strings.Trim(parts[0], " ") + // Separate multi-line and single line case. + afterCfgSrcName := parts[1] + switch { + case strings.Contains(afterCfgSrcName, "\n"): + // Multi-line, until the first \n it is the selector, everything after as YAML. + parts = strings.SplitN(afterCfgSrcName, "\n", 2) + selector = strings.Trim(parts[0], " ") + + if len(parts) > 1 && len(parts[1]) > 0 { + v := config.NewViper() + v.SetConfigType("yaml") + if err = v.ReadConfig(bytes.NewReader([]byte(parts[1]))); err != nil { + return + } + params = v.AllSettings() + } - if len(parts) == 2 { - // There are parameters, for now simply transform it to an YAML and parse it - params, err = parseParams(parts[1]) - if err != nil { - err = fmt.Errorf("invalid parameters syntax at %q: %w", s, err) - return + default: + // Single line, and parameters as URL query. + const selectorDelim string = "?" + parts = strings.SplitN(parts[1], selectorDelim, 2) + selector = strings.Trim(parts[0], " ") + + if len(parts) == 2 { + paramsPart := parts[1] + params, err = parseParamsAsURLQuery(paramsPart) + if err != nil { + err = fmt.Errorf("invalid parameters syntax at %q: %w", s, err) + return + } } } return cfgSrcName, selector, params, err } -func parseParams(s string) (interface{}, error) { - // Build a single-line valid yaml text to be parsed. - // The delimiter chars are all ASCII '&' and '=', looping over string - // as bytes is fine. - yamlBuf := make([]byte, 0, 2*len(s)) - yamlBuf = append(yamlBuf, []byte("params: { ")...) - - for i := 0; i < len(s); i++ { - switch s[i] { - case '=': - // End of paramater name - yamlBuf = append(yamlBuf, ": "...) - case '&': - // Starting next parameter - yamlBuf = append(yamlBuf, ", "...) - default: - yamlBuf = append(yamlBuf, s[i]) - } - } - - // yamlBuf now it is a single line representing the params, parse it. - yamlBuf = append(yamlBuf, " }"...) - v := viper.New() - v.SetConfigType("yaml") - if err := v.ReadConfig(bytes.NewReader(yamlBuf)); err != nil { +func parseParamsAsURLQuery(s string) (interface{}, error) { + values, err := url.ParseQuery(s) + if err != nil { return nil, err } - return v.Get("params"), nil + // Transform single array values in scalars. + params := make(map[string]interface{}) + for k, v := range values { + switch len(v) { + case 0: + params[k] = nil + case 1: + var iface interface{} + if err := yaml.Unmarshal([]byte(v[0]), &iface); err != nil { + return nil, err + } + params[k] = iface + default: + // It is a slice add element by element + elemSlice := make([]interface{}, 0, len(v)) + for _, elem := range v { + var iface interface{} + if err := yaml.Unmarshal([]byte(elem), &iface); err != nil { + return nil, err + } + elemSlice = append(elemSlice, iface) + } + params[k] = elemSlice + } + } + return params, err } - diff --git a/config/internal/configsource/manager_test.go b/config/internal/configsource/manager_test.go index 5ce2758ba27..7792be6c8ca 100644 --- a/config/internal/configsource/manager_test.go +++ b/config/internal/configsource/manager_test.go @@ -177,6 +177,60 @@ func TestConfigSourceManager_ArraysAndMaps(t *testing.T) { assert.NoError(t, manager.Close(ctx)) } +func TestConfigSourceManager_ParamsHandling(t *testing.T) { + ctx := context.Background() + tstCfgSrc := testConfigSource{ + ValueMap: map[string]valueEntry{ + "elem0": {Value: nil}, + "elem1": { + Value: map[string]interface{}{ + "p0": true, + "p1": "a string with spaces", + "p3": 42, + }, + }, + "k0": {Value: nil}, + "k1": { + Value: map[string]interface{}{ + "p0": true, + "p1": "a string with spaces", + "p2": map[string]interface{}{ + "p2_0": "a nested map0", + "p2_1": true, + }, + }, + }, + }, + } + + // Set OnRetrieve to check if the parameters were parsed as expected. + tstCfgSrc.OnRetrieve = func (ctx context.Context, selector string, params interface{}) error { + assert.Equal(t, tstCfgSrc.ValueMap[selector].Value, params) + return nil + } + + manager, err := NewManager(nil) + require.NoError(t, err) + manager.configSources = map[string]ConfigSource{ + "tstcfgsrc": &tstCfgSrc, + } + + file := path.Join("testdata", "params_handling.yaml") + cp, err := config.NewParserFromFile(file) + require.NoError(t, err) + + expectedFile := path.Join("testdata", "params_handling_expected.yaml") + expectedParser, err := config.NewParserFromFile(expectedFile) + require.NoError(t, err) + expectedCfg := expectedParser.Viper().AllSettings() + + res, err := manager.Resolve(ctx, cp) + require.NoError(t, err) + actualCfg := res.Viper().AllSettings() + assert.Equal(t, expectedCfg, actualCfg) + assert.NoError(t, manager.Close(ctx)) +} + func TestConfigSourceManager_WatchForUpdate(t *testing.T) { ctx := context.Background() manager, err := NewManager(nil) @@ -348,7 +402,7 @@ func Test_parseCfgSrc(t *testing.T) { }, { name: "array_in_params", - str: "cfgsrc:selector?p0=[0,1,2]&p1=done", + str: "cfgsrc:selector?p0=0&p0=1&p0=2&p1=done", cfgSrcName: "cfgsrc", selector: "selector", params: map[string]interface{}{ @@ -365,6 +419,15 @@ func Test_parseCfgSrc(t *testing.T) { "no_closing": interface{}(nil), }, }, + { + name: "use_url_encode", + str: "cfgsrc:selector?p0=contains+%3D+and+%26+too", + cfgSrcName: "cfgsrc", + selector: "selector", + params: map[string]interface{}{ + "p0": "contains = and & too", + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -374,6 +437,7 @@ func Test_parseCfgSrc(t *testing.T) { return } + assert.NoError(t, err) assert.Equal(t, tt.cfgSrcName, cfgSrcName) assert.Equal(t, tt.selector, selector) assert.Equal(t, tt.params, params) @@ -389,6 +453,8 @@ type testConfigSource struct { ErrOnRetrieve error ErrOnRetrieveEnd error ErrOnClose error + + OnRetrieve func(ctx context.Context, selector string, params interface{}) error } type valueEntry struct { @@ -406,7 +472,13 @@ func (t *testConfigSource) NewSession(context.Context) (Session, error) { return t, nil } -func (t *testConfigSource) Retrieve(_ context.Context, selector string, _ interface{}) (Retrieved, error) { +func (t *testConfigSource) Retrieve(ctx context.Context, selector string, params interface{}) (Retrieved, error) { + if t.OnRetrieve != nil { + if err := t.OnRetrieve(ctx, selector, params); err != nil { + return nil, err + } + } + if t.ErrOnRetrieve != nil { return nil, t.ErrOnRetrieve } diff --git a/config/internal/configsource/testdata/params_handling.yaml b/config/internal/configsource/testdata/params_handling.yaml new file mode 100644 index 00000000000..5543d2ee268 --- /dev/null +++ b/config/internal/configsource/testdata/params_handling.yaml @@ -0,0 +1,13 @@ +single_line: + ex0: $tstcfgsrc:elem0 + ex1: $tstcfgsrc:elem1?p0=true&p1=a string with spaces&p3=42 +multi_line: + k0: | + $tstcfgsrc: k0 + k1: | + $tstcfgsrc: k1 + p0: true + p1: a string with spaces + p2: + p2_0: a nested map0 + p2_1: true diff --git a/config/internal/configsource/testdata/params_handling_expected.yaml b/config/internal/configsource/testdata/params_handling_expected.yaml new file mode 100644 index 00000000000..ce6fc9c55a0 --- /dev/null +++ b/config/internal/configsource/testdata/params_handling_expected.yaml @@ -0,0 +1,14 @@ +single_line: + ex0: + ex1: + p0: true + p1: a string with spaces + p3: 42 +multi_line: + k0: + k1: + p0: true + p1: a string with spaces + p2: + p2_0: a nested map0 + p2_1: true From e5609410af055e88fbe78539a86f7e02ecd49916 Mon Sep 17 00:00:00 2001 From: Paulo Janotti Date: Mon, 5 Apr 2021 15:35:43 -0700 Subject: [PATCH 5/5] Format and lint fixes --- config/internal/configsource/manager.go | 10 +++++----- config/internal/configsource/manager_test.go | 12 ++++++------ 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/config/internal/configsource/manager.go b/config/internal/configsource/manager.go index 35c92c861f1..015b123c1f7 100644 --- a/config/internal/configsource/manager.go +++ b/config/internal/configsource/manager.go @@ -69,8 +69,8 @@ import ( // // component: // config_field: | -// $yamltemplate: /etc/component_template.yaml -// logs_path: /var/logs/component.log +// $yamltemplate: /etc/log_template.yaml +// logs_path: /var/logs/ // timeout: 10s // // Not all config sources need these optional parameters, they are used to provide extra control when @@ -339,7 +339,7 @@ func parseCfgSrc(s string) (cfgSrcName, selector string, params interface{}, err const selectorDelim string = "?" parts = strings.SplitN(parts[1], selectorDelim, 2) selector = strings.Trim(parts[0], " ") - + if len(parts) == 2 { paramsPart := parts[1] params, err = parseParamsAsURLQuery(paramsPart) @@ -367,7 +367,7 @@ func parseParamsAsURLQuery(s string) (interface{}, error) { params[k] = nil case 1: var iface interface{} - if err := yaml.Unmarshal([]byte(v[0]), &iface); err != nil { + if err = yaml.Unmarshal([]byte(v[0]), &iface); err != nil { return nil, err } params[k] = iface @@ -376,7 +376,7 @@ func parseParamsAsURLQuery(s string) (interface{}, error) { elemSlice := make([]interface{}, 0, len(v)) for _, elem := range v { var iface interface{} - if err := yaml.Unmarshal([]byte(elem), &iface); err != nil { + if err = yaml.Unmarshal([]byte(elem), &iface); err != nil { return nil, err } elemSlice = append(elemSlice, iface) diff --git a/config/internal/configsource/manager_test.go b/config/internal/configsource/manager_test.go index 7792be6c8ca..70913940085 100644 --- a/config/internal/configsource/manager_test.go +++ b/config/internal/configsource/manager_test.go @@ -194,7 +194,7 @@ func TestConfigSourceManager_ParamsHandling(t *testing.T) { Value: map[string]interface{}{ "p0": true, "p1": "a string with spaces", - "p2": map[string]interface{}{ + "p2": map[string]interface{}{ "p2_0": "a nested map0", "p2_1": true, }, @@ -204,7 +204,7 @@ func TestConfigSourceManager_ParamsHandling(t *testing.T) { } // Set OnRetrieve to check if the parameters were parsed as expected. - tstCfgSrc.OnRetrieve = func (ctx context.Context, selector string, params interface{}) error { + tstCfgSrc.OnRetrieve = func(ctx context.Context, selector string, params interface{}) error { assert.Equal(t, tstCfgSrc.ValueMap[selector].Value, params) return nil } @@ -411,8 +411,8 @@ func Test_parseCfgSrc(t *testing.T) { }, }, { - name: "empty_param", - str: "cfgsrc:selector?no_closing=", + name: "empty_param", + str: "cfgsrc:selector?no_closing=", cfgSrcName: "cfgsrc", selector: "selector", params: map[string]interface{}{ @@ -420,8 +420,8 @@ func Test_parseCfgSrc(t *testing.T) { }, }, { - name: "use_url_encode", - str: "cfgsrc:selector?p0=contains+%3D+and+%26+too", + name: "use_url_encode", + str: "cfgsrc:selector?p0=contains+%3D+and+%26+too", cfgSrcName: "cfgsrc", selector: "selector", params: map[string]interface{}{