From 8f9dbc1f6377b21b50e4db6671e88282def90336 Mon Sep 17 00:00:00 2001 From: Annanay Date: Fri, 22 Nov 2019 18:23:41 -0800 Subject: [PATCH] Add remote sampling extension --- defaults/defaults.go | 2 + defaults/defaults_test.go | 8 +- extension/remotesamplingextension/config.go | 35 ++++++ .../remotesamplingextension/config_test.go | 56 +++++++++ extension/remotesamplingextension/doc.go | 17 +++ extension/remotesamplingextension/factory.go | 88 +++++++++++++ .../remotesamplingextension/factory_test.go | 83 ++++++++++++ .../remotesamplingextension.go | 117 +++++++++++++++++ .../remotesamplingextension_test.go | 119 ++++++++++++++++++ .../testdata/config.yaml | 21 ++++ 10 files changed, 543 insertions(+), 3 deletions(-) create mode 100644 extension/remotesamplingextension/config.go create mode 100644 extension/remotesamplingextension/config_test.go create mode 100644 extension/remotesamplingextension/doc.go create mode 100644 extension/remotesamplingextension/factory.go create mode 100644 extension/remotesamplingextension/factory_test.go create mode 100644 extension/remotesamplingextension/remotesamplingextension.go create mode 100644 extension/remotesamplingextension/remotesamplingextension_test.go create mode 100644 extension/remotesamplingextension/testdata/config.yaml diff --git a/defaults/defaults.go b/defaults/defaults.go index 3c17059bc0e..bee0b3a26ee 100644 --- a/defaults/defaults.go +++ b/defaults/defaults.go @@ -27,6 +27,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector/extension" "github.com/open-telemetry/opentelemetry-collector/extension/healthcheckextension" "github.com/open-telemetry/opentelemetry-collector/extension/pprofextension" + "github.com/open-telemetry/opentelemetry-collector/extension/remotesamplingextension" "github.com/open-telemetry/opentelemetry-collector/extension/zpagesextension" "github.com/open-telemetry/opentelemetry-collector/oterr" "github.com/open-telemetry/opentelemetry-collector/processor" @@ -55,6 +56,7 @@ func Components() ( &healthcheckextension.Factory{}, &pprofextension.Factory{}, &zpagesextension.Factory{}, + &remotesamplingextension.Factory{}, ) if err != nil { errs = append(errs, err) diff --git a/defaults/defaults_test.go b/defaults/defaults_test.go index cf0ce1b7b71..cb7ae0f916e 100644 --- a/defaults/defaults_test.go +++ b/defaults/defaults_test.go @@ -32,6 +32,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector/extension" "github.com/open-telemetry/opentelemetry-collector/extension/healthcheckextension" "github.com/open-telemetry/opentelemetry-collector/extension/pprofextension" + "github.com/open-telemetry/opentelemetry-collector/extension/remotesamplingextension" "github.com/open-telemetry/opentelemetry-collector/extension/zpagesextension" "github.com/open-telemetry/opentelemetry-collector/processor" "github.com/open-telemetry/opentelemetry-collector/processor/attributesprocessor" @@ -49,9 +50,10 @@ import ( func TestDefaultComponents(t *testing.T) { expectedExtensions := map[string]extension.Factory{ - "health_check": &healthcheckextension.Factory{}, - "pprof": &pprofextension.Factory{}, - "zpages": &zpagesextension.Factory{}, + "health_check": &healthcheckextension.Factory{}, + "pprof": &pprofextension.Factory{}, + "zpages": &zpagesextension.Factory{}, + "remotesampling": &remotesamplingextension.Factory{}, } expectedReceivers := map[string]receiver.Factory{ "jaeger": &jaegerreceiver.Factory{}, diff --git a/extension/remotesamplingextension/config.go b/extension/remotesamplingextension/config.go new file mode 100644 index 00000000000..8044400d90c --- /dev/null +++ b/extension/remotesamplingextension/config.go @@ -0,0 +1,35 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remotesamplingextension + +import ( + "github.com/open-telemetry/opentelemetry-collector/config/configmodels" +) + +// Config has the configuration settings for the remote sampling extension, +// used to fetch sampling configuration from the upstream Jaeger collector instance. +// +// It is an extension of configmodels.ExtensionSettings +type Config struct { + configmodels.ExtensionSettings `mapstructure:",squash"` + + // Port is the port used to publish the health check status. + // Default is `5778` (https://www.jaegertracing.io/docs/1.15/deployment/#agent). + Port uint16 `mapstructure:"port"` + + // Addr is the upstream Jaeger collector address that can be used to fetch + // sampling configurations. The default value is `:14250`. + Addr string `mapstructure:"addr"` +} diff --git a/extension/remotesamplingextension/config_test.go b/extension/remotesamplingextension/config_test.go new file mode 100644 index 00000000000..ff6af211372 --- /dev/null +++ b/extension/remotesamplingextension/config_test.go @@ -0,0 +1,56 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remotesamplingextension + +import ( + "path" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector/config" + "github.com/open-telemetry/opentelemetry-collector/config/configmodels" +) + +func TestLoadConfig(t *testing.T) { + factories, err := config.ExampleComponents() + assert.Nil(t, err) + + factory := &Factory{} + factories.Extensions[typeStr] = factory + cfg, err := config.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories) + + require.Nil(t, err) + require.NotNil(t, cfg) + + ext0 := cfg.Extensions["remotesampling"] + assert.Equal(t, factory.CreateDefaultConfig(), ext0) + + ext1 := cfg.Extensions["remotesampling/1"] + assert.Equal(t, + &Config{ + ExtensionSettings: configmodels.ExtensionSettings{ + TypeVal: "remotesampling", + NameVal: "remotesampling/1", + }, + Port: 5779, + Addr: "0.0.0.0:14251", + }, + ext1) + + assert.Equal(t, 1, len(cfg.Service.Extensions)) + assert.Equal(t, "remotesampling/1", cfg.Service.Extensions[0]) +} diff --git a/extension/remotesamplingextension/doc.go b/extension/remotesamplingextension/doc.go new file mode 100644 index 00000000000..38803f10219 --- /dev/null +++ b/extension/remotesamplingextension/doc.go @@ -0,0 +1,17 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package remotesamplingextension implements an extension that serves as a proxy +// and routes client requests for sampling config to the Jaeger collector. +package remotesamplingextension diff --git a/extension/remotesamplingextension/factory.go b/extension/remotesamplingextension/factory.go new file mode 100644 index 00000000000..ebf346eda09 --- /dev/null +++ b/extension/remotesamplingextension/factory.go @@ -0,0 +1,88 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remotesamplingextension + +import ( + "errors" + "sync/atomic" + + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector/config/configmodels" + "github.com/open-telemetry/opentelemetry-collector/extension" +) + +const ( + // The value of extension "type" in configuration. + typeStr = "remotesampling" +) + +// Factory is the factory for the extension. +type Factory struct { +} + +var _ (extension.Factory) = (*Factory)(nil) + +// Type gets the type of the config created by this factory. +func (f *Factory) Type() string { + return typeStr +} + +// CreateDefaultConfig creates the default configuration for the extension. +func (f *Factory) CreateDefaultConfig() configmodels.Extension { + return &Config{ + ExtensionSettings: configmodels.ExtensionSettings{ + TypeVal: typeStr, + NameVal: typeStr, + }, + Port: 5778, + Addr: "0.0.0.0:14250", + } +} + +// CreateExtension creates the extension based on this config. +func (f *Factory) CreateExtension( + logger *zap.Logger, + cfg configmodels.Extension, +) (extension.ServiceExtension, error) { + config := cfg.(*Config) + if config.Addr == "" { + return nil, errors.New("\"Addr\" is required when using the \"remotesampling\" extension") + } + if config.Port == 0 { + return nil, errors.New("\"Port\" is required when using the \"remotesampling\" extension") + } + + // The runtime settings are global to the application, so while in principle it + // is possible to have more than one instance, running multiple will mean that + // the settings of the last started instance will prevail. In order to avoid + // this issue we will allow the creation of a single instance once per process + // while keeping the private function that allow the creation of multiple + // instances for unit tests. Summary: only a single instance can be created + // via the factory. + if !atomic.CompareAndSwapInt32(&instanceState, instanceNotCreated, instanceCreated) { + return nil, errors.New("only a single instance can be created per process") + } + + return newServer(*config, logger) +} + +// See comment in CreateExtension how these are used. +var instanceState int32 + +const ( + instanceNotCreated int32 = 0 + instanceCreated int32 = 1 +) diff --git a/extension/remotesamplingextension/factory_test.go b/extension/remotesamplingextension/factory_test.go new file mode 100644 index 00000000000..8d02a1fa4da --- /dev/null +++ b/extension/remotesamplingextension/factory_test.go @@ -0,0 +1,83 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remotesamplingextension + +import ( + "sync/atomic" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector/config/configcheck" + "github.com/open-telemetry/opentelemetry-collector/config/configmodels" +) + +func TestFactory_Type(t *testing.T) { + factory := Factory{} + require.Equal(t, typeStr, factory.Type()) +} + +func TestFactory_CreateDefaultConfig(t *testing.T) { + factory := Factory{} + cfg := factory.CreateDefaultConfig() + assert.Equal(t, &Config{ + ExtensionSettings: configmodels.ExtensionSettings{ + NameVal: typeStr, + TypeVal: typeStr, + }, + Port: 5778, + Addr: "0.0.0.0:14250", + }, + cfg) + + assert.NoError(t, configcheck.ValidateConfig(cfg)) + ext, err := factory.CreateExtension(zap.NewNop(), cfg) + require.NoError(t, err) + require.NotNil(t, ext) + + // Restore instance tracking from factory, for other tests. + atomic.StoreInt32(&instanceState, instanceNotCreated) +} + +func TestFactory_CreateExtension(t *testing.T) { + factory := Factory{} + cfg := factory.CreateDefaultConfig().(*Config) + + ext, err := factory.CreateExtension(zap.NewNop(), cfg) + require.NoError(t, err) + require.NotNil(t, ext) + + // Restore instance tracking from factory, for other tests. + atomic.StoreInt32(&instanceState, instanceNotCreated) +} + +func TestFactory_CreateExtensionOnlyOnce(t *testing.T) { + factory := Factory{} + cfg := factory.CreateDefaultConfig().(*Config) + + logger := zap.NewNop() + ext, err := factory.CreateExtension(logger, cfg) + require.NoError(t, err) + require.NotNil(t, ext) + + ext1, err := factory.CreateExtension(logger, cfg) + require.Error(t, err) + require.Nil(t, ext1) + + // Restore instance tracking from factory, for other tests. + atomic.StoreInt32(&instanceState, instanceNotCreated) +} diff --git a/extension/remotesamplingextension/remotesamplingextension.go b/extension/remotesamplingextension/remotesamplingextension.go new file mode 100644 index 00000000000..095faadc807 --- /dev/null +++ b/extension/remotesamplingextension/remotesamplingextension.go @@ -0,0 +1,117 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This extension implements a proxy to route client requests for sampling config +// to the Jaeger collector. +// +// +---------------+ +--------------+ +-----------------+ +// | | get | | proxy | | +// | client +--- sampling ---->+ agent +------------->+ collector | +// | | strategy | | | | +// +---------------+ +--------------+ +-----------------+ + +package remotesamplingextension + +import ( + "encoding/json" + "fmt" + "net" + "net/http" + "strconv" + + jAgent "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager/grpc" + "go.uber.org/zap" + "google.golang.org/grpc" + + "github.com/open-telemetry/opentelemetry-collector/extension" +) + +const mimeTypeApplicationJSON = "application/json" + +type remoteSamplingExtension struct { + config Config + logger *zap.Logger + server http.Server + jProxy *jAgent.SamplingManager +} + +func (rs *remoteSamplingExtension) Handler() http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + services := r.URL.Query()["service"] + if len(services) != 1 { + http.Error(w, "'service' parameter must be provided once", http.StatusBadRequest) + } + + // Jaeger agent's GRPC handler to call the upstream collector. + resp, err := rs.jProxy.GetSamplingStrategy(services[0]) + if err != nil { + http.Error(w, fmt.Sprintf("collector error: %+v", err), http.StatusInternalServerError) + return + } + jsonBytes, err := json.Marshal(resp) + if err != nil { + http.Error(w, "Cannot marshall Thrift to JSON", http.StatusInternalServerError) + return + } + + w.Header().Add("Content-Type", mimeTypeApplicationJSON) + if _, err := w.Write(jsonBytes); err != nil { + return + } + return + }) +} + +// Start implements ServiceExtension interface +func (rs *remoteSamplingExtension) Start(host extension.Host) error { + rs.logger.Info("Starting remote sampling extension", zap.Any("config", rs.config)) + + // Initialize listener to accept client requests for sampling config + portStr := ":" + strconv.Itoa(int(rs.config.Port)) + ln, err := net.Listen("tcp", portStr) + if err != nil { + host.ReportFatalError(err) + return nil + } + + // grpc connection to the upstream Jaeger collector + conn, err := grpc.Dial(rs.config.Addr, grpc.WithInsecure()) + rs.jProxy = jAgent.NewConfigManager(conn) + + // Register the http handler at the sampling URI + rs.server.Handler = rs.Handler() + + go func() { + // The listener ownership goes to the server. + if err := rs.server.Serve(ln); err != http.ErrServerClosed && err != nil { + host.ReportFatalError(err) + } + }() + + return nil +} + +func (rs *remoteSamplingExtension) Shutdown() error { + return rs.server.Close() +} + +func newServer(config Config, logger *zap.Logger) (*remoteSamplingExtension, error) { + rs := &remoteSamplingExtension{ + config: config, + logger: logger, + server: http.Server{}, + } + + return rs, nil +} diff --git a/extension/remotesamplingextension/remotesamplingextension_test.go b/extension/remotesamplingextension/remotesamplingextension_test.go new file mode 100644 index 00000000000..de04cfa2632 --- /dev/null +++ b/extension/remotesamplingextension/remotesamplingextension_test.go @@ -0,0 +1,119 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remotesamplingextension + +import ( + "net" + "net/http" + "runtime" + "strconv" + "testing" + + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector/extension/extensiontest" +) + +func TestPerformanceProfilerExtensionUsage(t *testing.T) { + config := Config{ + Port: 5778, + Addr: "0.0.0.0:14250", + } + + remotesamplingExt, err := newServer(config, zap.NewNop()) + require.NoError(t, err) + require.NotNil(t, remotesamplingExt) + + mh := extensiontest.NewMockHost() + require.NoError(t, remotesamplingExt.Start(mh)) + defer remotesamplingExt.Shutdown() + + // Give a chance for the server goroutine to run. + runtime.Gosched() + + client := &http.Client{} + resp, err := client.Get("http://localhost:" + strconv.Itoa(int(config.Port)) + "/debug/pprof") + require.NoError(t, err) + defer resp.Body.Close() + + require.Equal(t, http.StatusOK, resp.StatusCode) +} + +func TestPerformanceProfilerExtensionPortAlreadyInUse(t *testing.T) { + config := Config{ + Port: 5778, + Addr: "0.0.0.0:14250", + } + + ln, err := net.Listen("tcp", ":"+strconv.Itoa(int(config.Port))) + require.NoError(t, err) + defer ln.Close() + + remotesamplingExt, err := newServer(config, zap.NewNop()) + require.NoError(t, err) + require.NotNil(t, remotesamplingExt) + + mh := extensiontest.NewMockHost() + require.Error(t, remotesamplingExt.Start(mh)) +} + +func TestPerformanceProfilerMultipleStarts(t *testing.T) { + config := Config{ + Port: 5778, + Addr: "0.0.0.0:14250", + } + + remotesamplingExt, err := newServer(config, zap.NewNop()) + require.NoError(t, err) + require.NotNil(t, remotesamplingExt) + + mh := extensiontest.NewMockHost() + require.NoError(t, remotesamplingExt.Start(mh)) + defer remotesamplingExt.Shutdown() + + // Try to start it again, it will fail since it is on the same endpoint. + require.Error(t, remotesamplingExt.Start(mh)) +} + +func TestPerformanceProfilerMultipleShutdowns(t *testing.T) { + config := Config{ + Port: 5778, + Addr: "0.0.0.0:14250", + } + + remotesamplingExt, err := newServer(config, zap.NewNop()) + require.NoError(t, err) + require.NotNil(t, remotesamplingExt) + + mh := extensiontest.NewMockHost() + require.NoError(t, remotesamplingExt.Start(mh)) + + require.NoError(t, remotesamplingExt.Shutdown()) + require.NoError(t, remotesamplingExt.Shutdown()) +} + +func TestPerformanceProfilerShutdownWithoutStart(t *testing.T) { + config := Config{ + Port: 5778, + Addr: "0.0.0.0:14250", + } + + remotesamplingExt, err := newServer(config, zap.NewNop()) + require.NoError(t, err) + require.NotNil(t, remotesamplingExt) + + require.NoError(t, remotesamplingExt.Shutdown()) +} diff --git a/extension/remotesamplingextension/testdata/config.yaml b/extension/remotesamplingextension/testdata/config.yaml new file mode 100644 index 00000000000..f7c9c6411f0 --- /dev/null +++ b/extension/remotesamplingextension/testdata/config.yaml @@ -0,0 +1,21 @@ +extensions: + remotesampling: + remotesampling/1: + port: 5779 + addr: "0.0.0.0:14251" + +service: + extensions: [remotesampling/1] + pipelines: + traces: + receivers: [examplereceiver] + processors: [exampleprocessor] + exporters: [exampleexporter] + +# Data pipeline is required to load the config. +receivers: + examplereceiver: +processors: + exampleprocessor: +exporters: + exampleexporter: