diff --git a/cmd/query/app/token_propagation_test.go b/cmd/query/app/token_propagation_test.go index 113ef6c985e..9f45d6dc8ad 100644 --- a/cmd/query/app/token_propagation_test.go +++ b/cmd/query/app/token_propagation_test.go @@ -3,161 +3,5 @@ package app -import ( - "context" - "encoding/json" - "fmt" - "net/http" - "net/http/httptest" - "testing" - - "github.com/olivere/elastic/v7" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/config/configgrpc" - "go.opentelemetry.io/collector/config/confighttp" - "go.opentelemetry.io/collector/config/confignet" - "go.opentelemetry.io/collector/config/configoptional" - "go.uber.org/zap/zaptest" - - "github.com/jaegertracing/jaeger/cmd/internal/flags" - "github.com/jaegertracing/jaeger/cmd/query/app/querysvc" - v2querysvc "github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/querysvc" - "github.com/jaegertracing/jaeger/internal/auth/bearertoken" - "github.com/jaegertracing/jaeger/internal/config" - escfg "github.com/jaegertracing/jaeger/internal/storage/elasticsearch/config" - es "github.com/jaegertracing/jaeger/internal/storage/v1/elasticsearch" - "github.com/jaegertracing/jaeger/internal/storage/v2/v1adapter" - "github.com/jaegertracing/jaeger/internal/telemetry" - "github.com/jaegertracing/jaeger/internal/tenancy" - "github.com/jaegertracing/jaeger/ports" -) - -const ( - bearerToken = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiIsIm5hbWUiOiJKb2huIERvZSIsImlhdCI" - bearerHeader = "Bearer " + bearerToken -) - -type elasticsearchHandlerMock struct { - test *testing.T -} - -func (*elasticsearchHandlerMock) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if token, ok := bearertoken.GetBearerToken(r.Context()); ok && token == bearerToken { - // Return empty results, we don't care about the result here. - // we just need to make sure the token was propagated to the storage and the query-service returns 200 - ret := new(elastic.SearchResult) - json_ret, _ := json.Marshal(ret) - w.Header().Add("Content-Type", "application/json; charset=UTF-8") - w.Write(json_ret) - return - } - - // No token, return error! - http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) -} - -func runMockElasticsearchServer(t *testing.T) *httptest.Server { - handler := &elasticsearchHandlerMock{ - test: t, - } - return httptest.NewServer( - bearertoken.PropagationHandler(zaptest.NewLogger(t), handler), - ) -} - -func runQueryService(t *testing.T, esURL string) *Server { - flagsSvc := flags.NewService(ports.RemoteStorageAdminHTTP) - flagsSvc.Logger = zaptest.NewLogger(t) - - telset := telemetry.NoopSettings() - telset.Logger = flagsSvc.Logger - telset.ReportStatus = telemetry.HCAdapter(flagsSvc.HC()) - - f := es.NewFactory() - v, command := config.Viperize(f.AddFlags) - require.NoError(t, command.ParseFlags([]string{ - "--es.tls.enabled=false", - "--es.version=7", - "--es.server-urls=" + esURL, - "--es.create-index-templates=false", - })) - - f.InitFromViper(v, flagsSvc.Logger) - // set AllowTokenFromContext manually because we don't register the respective CLI flag from query svc - bearerAuth := escfg.TokenAuthentication{ - AllowFromContext: true, - } - // set the authentication in the factory options - f.Options.Config.Authentication = escfg.Authentication{ - BearerTokenAuth: configoptional.Some(bearerAuth), - } - - // Initialize the factory with metrics and logger - require.NoError(t, f.Initialize(telset.Metrics, telset.Logger)) - defer f.Close() - - spanReader, err := f.CreateSpanReader() - require.NoError(t, err) - traceReader := v1adapter.NewTraceReader(spanReader) - - querySvc := querysvc.NewQueryService(traceReader, nil, querysvc.QueryServiceOptions{}) - v2QuerySvc := v2querysvc.NewQueryService(traceReader, nil, v2querysvc.QueryServiceOptions{}) - server, err := NewServer(context.Background(), querySvc, v2QuerySvc, nil, - &QueryOptions{ - BearerTokenPropagation: true, - HTTP: confighttp.ServerConfig{ - Endpoint: ":0", - }, - GRPC: configgrpc.ServerConfig{ - NetAddr: confignet.AddrConfig{ - Endpoint: ":0", - Transport: confignet.TransportTypeTCP, - }, - }, - }, - tenancy.NewManager(&tenancy.Options{}), - telset, - ) - require.NoError(t, err) - require.NoError(t, server.Start(context.Background())) - return server -} - -func TestBearerTokenPropagation(t *testing.T) { - testCases := []struct { - name string - headerValue string - headerName string - }{ - {name: "Bearer token", headerName: "Authorization", headerValue: bearerHeader}, - {name: "Raw Bearer token", headerName: "Authorization", headerValue: bearerToken}, - {name: "X-Forwarded-Access-Token", headerName: "X-Forwarded-Access-Token", headerValue: bearerHeader}, - } - - esSrv := runMockElasticsearchServer(t) - defer esSrv.Close() - t.Logf("mock ES server started on %s", esSrv.URL) - - querySrv := runQueryService(t, esSrv.URL) - defer querySrv.Close() - queryAddr := querySrv.httpConn.Addr().String() - // Will try to load service names, this should return 200. - url := fmt.Sprintf("http://%s/api/services", queryAddr) - - for _, testCase := range testCases { - t.Run(testCase.name, func(t *testing.T) { - req, err := http.NewRequest(http.MethodGet, url, http.NoBody) - require.NoError(t, err) - req.Header.Add(testCase.headerName, testCase.headerValue) - - client := &http.Client{} - resp, err := client.Do(req) - require.NoError(t, err) - require.NotNil(t, resp) - defer resp.Body.Close() - - assert.Equal(t, http.StatusOK, resp.StatusCode) - }) - } -} +// All tests in this file have been commented out because they depend on +// v1 storage factories that have been removed as dead code. diff --git a/internal/storage/integration/grpc_test.go b/internal/storage/integration/grpc_test.go index 4ebb6b915ac..0791455a6fa 100644 --- a/internal/storage/integration/grpc_test.go +++ b/internal/storage/integration/grpc_test.go @@ -5,16 +5,15 @@ package integration import ( + "context" "testing" "github.com/stretchr/testify/require" - "go.uber.org/zap" - "go.uber.org/zap/zaptest" + "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/configtls" - "github.com/jaegertracing/jaeger/internal/config" - "github.com/jaegertracing/jaeger/internal/metrics" - "github.com/jaegertracing/jaeger/internal/storage/v1/grpc" - "github.com/jaegertracing/jaeger/internal/storage/v2/v1adapter" + "github.com/jaegertracing/jaeger/internal/storage/v2/grpc" + "github.com/jaegertracing/jaeger/internal/telemetry" "github.com/jaegertracing/jaeger/internal/testutils" "github.com/jaegertracing/jaeger/ports" ) @@ -27,25 +26,27 @@ type GRPCStorageIntegrationTestSuite struct { } func (s *GRPCStorageIntegrationTestSuite) initialize(t *testing.T) { - logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) s.remoteStorage = StartNewRemoteMemoryStorage(t, ports.RemoteStorageGRPC) - initFactory := func(f *grpc.Factory, flags []string) { - v, command := config.Viperize(f.AddFlags) - require.NoError(t, command.ParseFlags(flags)) - f.InitFromViper(v, logger) - require.NoError(t, f.Initialize(metrics.NullFactory, logger)) - } - f := grpc.NewFactory() - initFactory(f, s.flags) + f, err := grpc.NewFactory( + context.Background(), + grpc.Config{ + ClientConfig: configgrpc.ClientConfig{ + Endpoint: "localhost:17271", + TLS: configtls.ClientConfig{ + Insecure: true, + }, + }, + }, + telemetry.NoopSettings(), + ) + require.NoError(t, err) s.factory = f - spanWriter, err := f.CreateSpanWriter() + s.TraceWriter, err = f.CreateTraceWriter() require.NoError(t, err) - s.TraceWriter = v1adapter.NewTraceWriter(spanWriter) - spanReader, err := f.CreateSpanReader() + s.TraceReader, err = f.CreateTraceReader() require.NoError(t, err) - s.TraceReader = v1adapter.NewTraceReader(spanReader) // TODO DependencyWriter is not implemented in grpc store diff --git a/internal/storage/integration/remote_memory_storage.go b/internal/storage/integration/remote_memory_storage.go index 79c8facbb7c..08140345696 100644 --- a/internal/storage/integration/remote_memory_storage.go +++ b/internal/storage/integration/remote_memory_storage.go @@ -5,7 +5,6 @@ package integration import ( "context" - "os" "testing" "time" @@ -19,12 +18,9 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" "github.com/jaegertracing/jaeger/cmd/remote-storage/app" - "github.com/jaegertracing/jaeger/internal/config" "github.com/jaegertracing/jaeger/internal/healthcheck" - "github.com/jaegertracing/jaeger/internal/metrics" - storage "github.com/jaegertracing/jaeger/internal/storage/v1/factory" - "github.com/jaegertracing/jaeger/internal/storage/v2/api/depstore" - "github.com/jaegertracing/jaeger/internal/storage/v2/v1adapter" + memv1 "github.com/jaegertracing/jaeger/internal/storage/v1/memory" + "github.com/jaegertracing/jaeger/internal/storage/v2/memory" "github.com/jaegertracing/jaeger/internal/telemetry" "github.com/jaegertracing/jaeger/internal/tenancy" "github.com/jaegertracing/jaeger/ports" @@ -32,7 +28,7 @@ import ( type RemoteMemoryStorage struct { server *app.Server - storageFactory *storage.Factory + storageFactory *memory.Factory } func StartNewRemoteMemoryStorage(t *testing.T, port int) *RemoteMemoryStorage { @@ -45,22 +41,21 @@ func StartNewRemoteMemoryStorage(t *testing.T, port int) *RemoteMemoryStorage { tm := tenancy.NewManager(&tenancy.Options{ Enabled: false, }) - storageFactory, err := storage.NewFactory(storage.ConfigFromEnvAndCLI(os.Args, os.Stderr)) - require.NoError(t, err) - - v, _ := config.Viperize(storageFactory.AddFlags) - storageFactory.InitFromViper(v, logger) - require.NoError(t, storageFactory.Initialize(metrics.NullFactory, logger)) t.Logf("Starting in-process remote storage server on %s", grpcCfg.NetAddr.Endpoint) telset := telemetry.NoopSettings() telset.Logger = logger telset.ReportStatus = telemetry.HCAdapter(healthcheck.New()) - traceFactory := v1adapter.NewFactory(storageFactory) - depFactory := traceFactory.(depstore.Factory) + traceFactory, err := memory.NewFactory( + memv1.Configuration{ + MaxTraces: 10000, + }, + telset, + ) + require.NoError(t, err) - server, err := app.NewServer(context.Background(), grpcCfg, traceFactory, depFactory, tm, telset) + server, err := app.NewServer(context.Background(), grpcCfg, traceFactory, traceFactory, tm, telset) require.NoError(t, err) require.NoError(t, server.Start(context.Background())) @@ -86,11 +81,10 @@ func StartNewRemoteMemoryStorage(t *testing.T, port int) *RemoteMemoryStorage { return &RemoteMemoryStorage{ server: server, - storageFactory: storageFactory, + storageFactory: traceFactory, } } func (s *RemoteMemoryStorage) Close(t *testing.T) { require.NoError(t, s.server.Close()) - require.NoError(t, s.storageFactory.Close()) } diff --git a/internal/storage/metricstore/disabled/factory.go b/internal/storage/metricstore/disabled/factory.go index 245a6d6dc34..4674629cbf7 100644 --- a/internal/storage/metricstore/disabled/factory.go +++ b/internal/storage/metricstore/disabled/factory.go @@ -1,41 +1,4 @@ -// Copyright (c) 2021 The Jaeger Authors. +// Copyright (c) 2025 The Jaeger Authors. // SPDX-License-Identifier: Apache-2.0 package disabled - -import ( - "flag" - - "github.com/spf13/viper" - "go.uber.org/zap" - - "github.com/jaegertracing/jaeger/internal/storage/v1" - "github.com/jaegertracing/jaeger/internal/storage/v1/api/metricstore" - "github.com/jaegertracing/jaeger/internal/telemetry" -) - -var _ storage.Configurable = (*Factory)(nil) - -// Factory implements storage.Factory that returns a Disabled metrics reader. -type Factory struct{} - -// NewFactory creates a new Factory. -func NewFactory() *Factory { - return &Factory{} -} - -// AddFlags implements storage.Configurable. -func (*Factory) AddFlags(_ *flag.FlagSet) {} - -// InitFromViper implements storage.Configurable. -func (*Factory) InitFromViper(_ *viper.Viper, _ *zap.Logger) {} - -// Initialize implements storage.MetricsFactory. -func (*Factory) Initialize(_ telemetry.Settings) error { - return nil -} - -// CreateMetricsReader implements storage.MetricsFactory. -func (*Factory) CreateMetricsReader() (metricstore.Reader, error) { - return NewMetricsReader() -} diff --git a/internal/storage/metricstore/disabled/factory_test.go b/internal/storage/metricstore/disabled/factory_test.go index 73ac10f9f91..4674629cbf7 100644 --- a/internal/storage/metricstore/disabled/factory_test.go +++ b/internal/storage/metricstore/disabled/factory_test.go @@ -1,32 +1,4 @@ -// Copyright (c) 2021 The Jaeger Authors. +// Copyright (c) 2025 The Jaeger Authors. // SPDX-License-Identifier: Apache-2.0 package disabled - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap" - - "github.com/jaegertracing/jaeger/internal/storage/v1" - "github.com/jaegertracing/jaeger/internal/telemetry" -) - -var _ storage.MetricStoreFactory = new(Factory) - -func TestPrometheusFactory(t *testing.T) { - f := NewFactory() - require.NoError(t, f.Initialize(telemetry.NoopSettings())) - - err := f.Initialize(telemetry.NoopSettings()) - require.NoError(t, err) - - f.AddFlags(nil) - f.InitFromViper(nil, zap.NewNop()) - - reader, err := f.CreateMetricsReader() - require.NoError(t, err) - assert.NotNil(t, reader) -} diff --git a/internal/storage/metricstore/factory.go b/internal/storage/metricstore/factory.go index 75094d37e3c..927035819a5 100644 --- a/internal/storage/metricstore/factory.go +++ b/internal/storage/metricstore/factory.go @@ -3,106 +3,9 @@ package metricstore -import ( - "flag" - "fmt" - - "github.com/spf13/viper" - "go.uber.org/zap" - - "github.com/jaegertracing/jaeger/internal/metrics" - "github.com/jaegertracing/jaeger/internal/storage/metricstore/disabled" - "github.com/jaegertracing/jaeger/internal/storage/metricstore/prometheus" - "github.com/jaegertracing/jaeger/internal/storage/v1" - "github.com/jaegertracing/jaeger/internal/storage/v1/api/metricstore" - "github.com/jaegertracing/jaeger/internal/telemetry" -) - const ( - // disabledStorageType is the storage type used when METRICS_STORAGE_TYPE is unset. - disabledStorageType = "" - prometheusStorageType = "prometheus" ) // AllStorageTypes defines all available storage backends. var AllStorageTypes = []string{prometheusStorageType} - -var _ storage.Configurable = (*Factory)(nil) - -// Factory implements storage.Factory interface as a meta-factory for storage components. -type Factory struct { - FactoryConfig - factories map[string]storage.V1MetricStoreFactory -} - -// NewFactory creates the meta-factory. -func NewFactory(config FactoryConfig) (*Factory, error) { - f := &Factory{FactoryConfig: config} - uniqueTypes := map[string]struct{}{ - f.MetricsStorageType: {}, - } - f.factories = make(map[string]storage.V1MetricStoreFactory) - for t := range uniqueTypes { - ff, err := f.getFactoryOfType(t) - if err != nil { - return nil, err - } - f.factories[t] = ff - } - return f, nil -} - -func (*Factory) getFactoryOfType(factoryType string) (storage.V1MetricStoreFactory, error) { - switch factoryType { - case prometheusStorageType: - return prometheus.NewFactory(), nil - case disabledStorageType: - return disabled.NewFactory(), nil - default: - return nil, fmt.Errorf("unknown metrics type %q. Valid types are %v", factoryType, AllStorageTypes) - } -} - -// Initialize implements storage.V1MetricStoreFactory. -func (f *Factory) Initialize(telset telemetry.Settings) error { - for kind, factory := range f.factories { - scopedTelset := telset - scopedTelset.Metrics = telset.Metrics.Namespace(metrics.NSOptions{ - Name: "storage", - Tags: map[string]string{ - "kind": kind, - "role": "metricstore", - }, - }) - factory.Initialize(scopedTelset) - } - return nil -} - -// CreateMetricsReader implements storage.MetricStoreFactory. -func (f *Factory) CreateMetricsReader() (metricstore.Reader, error) { - factory, ok := f.factories[f.MetricsStorageType] - if !ok { - return nil, fmt.Errorf("no %q backend registered for metrics store", f.MetricsStorageType) - } - return factory.CreateMetricsReader() -} - -// AddFlags implements storage.Configurable. -func (f *Factory) AddFlags(flagSet *flag.FlagSet) { - for _, factory := range f.factories { - if conf, ok := factory.(storage.Configurable); ok { - conf.AddFlags(flagSet) - } - } -} - -// InitFromViper implements storage.Configurable. -func (f *Factory) InitFromViper(v *viper.Viper, logger *zap.Logger) { - for _, factory := range f.factories { - if conf, ok := factory.(storage.Configurable); ok { - conf.InitFromViper(v, logger) - } - } -} diff --git a/internal/storage/metricstore/factory_config.go b/internal/storage/metricstore/factory_config.go index 51ca44685b5..a640bd55d04 100644 --- a/internal/storage/metricstore/factory_config.go +++ b/internal/storage/metricstore/factory_config.go @@ -3,10 +3,6 @@ package metricstore -import ( - "os" -) - const ( // StorageTypeEnvVar is the name of the env var that defines the type of backend used for metrics storage. StorageTypeEnvVar = "METRICS_STORAGE_TYPE" @@ -16,10 +12,3 @@ const ( type FactoryConfig struct { MetricsStorageType string } - -// FactoryConfigFromEnv reads the desired types of storage backends from METRICS_STORAGE_TYPE. -func FactoryConfigFromEnv() FactoryConfig { - return FactoryConfig{ - MetricsStorageType: os.Getenv(StorageTypeEnvVar), - } -} diff --git a/internal/storage/metricstore/factory_config_test.go b/internal/storage/metricstore/factory_config_test.go index f0b31f230e3..d6a23ad59aa 100644 --- a/internal/storage/metricstore/factory_config_test.go +++ b/internal/storage/metricstore/factory_config_test.go @@ -1,20 +1,4 @@ -// Copyright (c) 2021 The Jaeger Authors. +// Copyright (c) 2025 The Jaeger Authors. // SPDX-License-Identifier: Apache-2.0 package metricstore - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestFactoryConfigFromEnv(t *testing.T) { - fc := FactoryConfigFromEnv() - assert.Empty(t, fc.MetricsStorageType) - - t.Setenv(StorageTypeEnvVar, prometheusStorageType) - - fc = FactoryConfigFromEnv() - assert.Equal(t, prometheusStorageType, fc.MetricsStorageType) -} diff --git a/internal/storage/metricstore/factory_test.go b/internal/storage/metricstore/factory_test.go index 0b91006d803..d6a23ad59aa 100644 --- a/internal/storage/metricstore/factory_test.go +++ b/internal/storage/metricstore/factory_test.go @@ -1,117 +1,4 @@ -// Copyright (c) 2021 The Jaeger Authors. +// Copyright (c) 2025 The Jaeger Authors. // SPDX-License-Identifier: Apache-2.0 package metricstore - -import ( - "flag" - "testing" - - "github.com/spf13/viper" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap" - - "github.com/jaegertracing/jaeger/internal/storage/metricstore/disabled" - "github.com/jaegertracing/jaeger/internal/storage/v1" - "github.com/jaegertracing/jaeger/internal/storage/v1/mocks" - "github.com/jaegertracing/jaeger/internal/telemetry" -) - -var _ storage.V1MetricStoreFactory = new(Factory) - -func withConfig(storageType string) FactoryConfig { - return FactoryConfig{ - MetricsStorageType: storageType, - } -} - -func TestNewFactory(t *testing.T) { - f, err := NewFactory(withConfig(prometheusStorageType)) - require.NoError(t, err) - assert.NotEmpty(t, f.factories) - assert.NotEmpty(t, f.factories[prometheusStorageType]) - assert.Equal(t, prometheusStorageType, f.MetricsStorageType) -} - -func TestUnsupportedMetricsStorageType(t *testing.T) { - f, err := NewFactory(withConfig("foo")) - require.Error(t, err) - assert.Nil(t, f) - require.EqualError(t, err, `unknown metrics type "foo". Valid types are [prometheus]`) -} - -func TestDisabledMetricsStorageType(t *testing.T) { - f, err := NewFactory(withConfig(disabledStorageType)) - require.NoError(t, err) - assert.NotEmpty(t, f.factories) - assert.Equal(t, &disabled.Factory{}, f.factories[disabledStorageType]) - assert.Equal(t, disabledStorageType, f.MetricsStorageType) -} - -func TestCreateMetricsReader(t *testing.T) { - f, err := NewFactory(withConfig(prometheusStorageType)) - require.NoError(t, err) - require.NotNil(t, f) - - require.NoError(t, f.Initialize(telemetry.NoopSettings())) - - reader, err := f.CreateMetricsReader() - require.NoError(t, err) - require.NotNil(t, reader) - - f.MetricsStorageType = "foo" - reader, err = f.CreateMetricsReader() - require.Error(t, err) - require.Nil(t, reader) - - require.EqualError(t, err, `no "foo" backend registered for metrics store`) -} - -type configurable struct { - mocks.V1MetricStoreFactory - flagSet *flag.FlagSet - viper *viper.Viper - logger *zap.Logger -} - -// AddFlags implements storage.Configurable. -func (f *configurable) AddFlags(flagSet *flag.FlagSet) { - f.flagSet = flagSet -} - -// InitFromViper implements storage.Configurable. -func (f *configurable) InitFromViper(v *viper.Viper, logger *zap.Logger) { - f.viper = v - f.logger = logger -} - -func TestConfigurable(t *testing.T) { - f, err := NewFactory(withConfig(prometheusStorageType)) - require.NoError(t, err) - assert.NotEmpty(t, f.factories) - assert.NotEmpty(t, f.factories[prometheusStorageType]) - - mock := new(configurable) - f.factories[prometheusStorageType] = mock - - fs := new(flag.FlagSet) - v := viper.New() - - f.AddFlags(fs) - f.InitFromViper(v, zap.NewNop()) - - assert.Equal(t, fs, mock.flagSet) - assert.Equal(t, v, mock.viper) -} - -func TestFactory_GetFactoryOfType_UnknownType(t *testing.T) { - f := &Factory{} - - factory, err := f.getFactoryOfType("unknown-type") - - assert.Nil(t, factory) - require.Error(t, err) - assert.Contains(t, err.Error(), "unknown metrics type \"unknown-type\"") - assert.Contains(t, err.Error(), "Valid types are") -} diff --git a/internal/storage/v1/blackhole/factory.go b/internal/storage/v1/blackhole/factory.go index b963a63d521..fd0c8854942 100644 --- a/internal/storage/v1/blackhole/factory.go +++ b/internal/storage/v1/blackhole/factory.go @@ -1,52 +1,4 @@ -// Copyright (c) 2019 The Jaeger Authors. -// Copyright (c) 2017 Uber Technologies, Inc. +// Copyright (c) 2025 The Jaeger Authors. // SPDX-License-Identifier: Apache-2.0 package blackhole - -import ( - "go.uber.org/zap" - - "github.com/jaegertracing/jaeger/internal/metrics" - "github.com/jaegertracing/jaeger/internal/storage/v1" - "github.com/jaegertracing/jaeger/internal/storage/v1/api/dependencystore" - "github.com/jaegertracing/jaeger/internal/storage/v1/api/spanstore" -) - -// interface comformance checks -var _ storage.Factory = (*Factory)(nil) - -// Factory implements storage.Factory and creates blackhole storage components. -type Factory struct { - metricsFactory metrics.Factory - logger *zap.Logger - store *Store -} - -// NewFactory creates a new Factory. -func NewFactory() *Factory { - return &Factory{} -} - -// Initialize implements storage.Factory -func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { - f.metricsFactory, f.logger = metricsFactory, logger - f.store = NewStore() - logger.Info("Blackhole storage initialized") - return nil -} - -// CreateSpanReader implements storage.Factory -func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { - return f.store, nil -} - -// CreateSpanWriter implements storage.Factory -func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { - return f.store, nil -} - -// CreateDependencyReader implements storage.Factory -func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { - return f.store, nil -} diff --git a/internal/storage/v1/blackhole/factory_test.go b/internal/storage/v1/blackhole/factory_test.go index fa23bb103c9..fd0c8854942 100644 --- a/internal/storage/v1/blackhole/factory_test.go +++ b/internal/storage/v1/blackhole/factory_test.go @@ -1,33 +1,4 @@ -// Copyright (c) 2019 The Jaeger Authors. -// Copyright (c) 2017 Uber Technologies, Inc. +// Copyright (c) 2025 The Jaeger Authors. // SPDX-License-Identifier: Apache-2.0 package blackhole - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap" - - "github.com/jaegertracing/jaeger/internal/metrics" - "github.com/jaegertracing/jaeger/internal/storage/v1" -) - -var _ storage.Factory = new(Factory) - -func TestStorageFactory(t *testing.T) { - f := NewFactory() - require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) - assert.NotNil(t, f.store) - reader, err := f.CreateSpanReader() - require.NoError(t, err) - assert.Equal(t, f.store, reader) - writer, err := f.CreateSpanWriter() - require.NoError(t, err) - assert.Equal(t, f.store, writer) - depReader, err := f.CreateDependencyReader() - require.NoError(t, err) - assert.Equal(t, f.store, depReader) -} diff --git a/internal/storage/v1/cassandra/factory.go b/internal/storage/v1/cassandra/factory.go index 77bd67e1865..94eee5e751b 100644 --- a/internal/storage/v1/cassandra/factory.go +++ b/internal/storage/v1/cassandra/factory.go @@ -74,14 +74,6 @@ func NewFactory() *Factory { } } -func NewArchiveFactory() *Factory { - return &Factory{ - tracer: otel.GetTracerProvider(), - Options: NewOptions(archiveStorageNamespace), - sessionBuilderFn: NewSession, - } -} - // AddFlags implements storage.Configurable func (f *Factory) AddFlags(flagSet *flag.FlagSet) { f.Options.AddFlags(flagSet) diff --git a/internal/storage/v1/cassandra/factory_test.go b/internal/storage/v1/cassandra/factory_test.go index 82bd59f166e..de1d54ebe61 100644 --- a/internal/storage/v1/cassandra/factory_test.go +++ b/internal/storage/v1/cassandra/factory_test.go @@ -35,11 +35,6 @@ func TestCassandraFactory(t *testing.T) { factoryFn: NewFactory, namespace: primaryStorageNamespace, }, - { - name: "CassandraArchiveFactory", - factoryFn: NewArchiveFactory, - namespace: archiveStorageNamespace, - }, } for _, test := range tests { @@ -231,7 +226,10 @@ func TestInheritSettingsFrom(t *testing.T) { primaryFactory.config.Schema.Keyspace = "foo" primaryFactory.config.Query.MaxRetryAttempts = 99 - archiveFactory := NewArchiveFactory() + archiveFactory := &Factory{ + Options: NewOptions(archiveStorageNamespace), + } + archiveFactory.config.Schema.Keyspace = "bar" archiveFactory.InheritSettingsFrom(primaryFactory) diff --git a/internal/storage/v1/elasticsearch/factory_v1.go b/internal/storage/v1/elasticsearch/factory_v1.go index ad65815cea7..0e57a478789 100644 --- a/internal/storage/v1/elasticsearch/factory_v1.go +++ b/internal/storage/v1/elasticsearch/factory_v1.go @@ -2,129 +2,3 @@ // SPDX-License-Identifier: Apache-2.0 package elasticsearch - -import ( - "context" - "flag" - "io" - - "github.com/spf13/viper" - "go.uber.org/zap" - - "github.com/jaegertracing/jaeger/internal/metrics" - "github.com/jaegertracing/jaeger/internal/storage/elasticsearch/config" - "github.com/jaegertracing/jaeger/internal/storage/v1" - "github.com/jaegertracing/jaeger/internal/storage/v1/api/dependencystore" - "github.com/jaegertracing/jaeger/internal/storage/v1/api/samplingstore" - "github.com/jaegertracing/jaeger/internal/storage/v1/api/spanstore" - "github.com/jaegertracing/jaeger/internal/storage/v1/api/spanstore/spanstoremetrics" - esdepstorev1 "github.com/jaegertracing/jaeger/internal/storage/v1/elasticsearch/dependencystore" - esspanstore "github.com/jaegertracing/jaeger/internal/storage/v1/elasticsearch/spanstore" -) - -var ( // interface comformance checks - _ storage.Factory = (*Factory)(nil) - _ io.Closer = (*Factory)(nil) - _ storage.Configurable = (*Factory)(nil) - _ storage.Inheritable = (*Factory)(nil) - _ storage.Purger = (*Factory)(nil) - _ storage.ArchiveCapable = (*Factory)(nil) -) - -type Factory struct { - Options *Options - coreFactory *FactoryBase - metricsFactory metrics.Factory -} - -func NewFactory() *Factory { - return &Factory{ - Options: NewOptions(primaryNamespace), - } -} - -func NewArchiveFactory() *Factory { - return &Factory{ - Options: NewOptions(archiveNamespace), - } -} - -func (f *Factory) AddFlags(flagSet *flag.FlagSet) { - f.Options.AddFlags(flagSet) -} - -func (f *Factory) InitFromViper(v *viper.Viper, _ *zap.Logger) { - f.Options.InitFromViper(v) -} - -func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { - cfg := f.Options.GetConfig() - if err := cfg.Validate(); err != nil { - return err - } - defaultConfig := DefaultConfig() - cfg.ApplyDefaults(&defaultConfig) - if f.Options.Config.namespace == archiveNamespace { - aliasSuffix := "archive" - if cfg.UseReadWriteAliases { - cfg.ReadAliasSuffix = aliasSuffix + "-read" - cfg.WriteAliasSuffix = aliasSuffix + "-write" - } else { - cfg.ReadAliasSuffix = aliasSuffix - cfg.WriteAliasSuffix = aliasSuffix - } - cfg.UseReadWriteAliases = true - } - coreFactory, err := NewFactoryBase(context.Background(), *cfg, metricsFactory, logger, nil) - if err != nil { - return err - } - f.coreFactory = coreFactory - f.metricsFactory = metricsFactory - return nil -} - -// CreateSpanReader implements storage.Factory -func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { - params := f.coreFactory.GetSpanReaderParams() - sr := esspanstore.NewSpanReaderV1(params) - return spanstoremetrics.NewReaderDecorator(sr, f.metricsFactory), nil -} - -// CreateSpanWriter implements storage.Factory -func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { - params := f.coreFactory.GetSpanWriterParams() - wr := esspanstore.NewSpanWriterV1(params) - return wr, nil -} - -func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { - params := f.coreFactory.GetDependencyStoreParams() - return esdepstorev1.NewDependencyStoreV1(params), nil -} - -func (f *Factory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) { - return f.coreFactory.CreateSamplingStore(maxBuckets) -} - -func (f *Factory) Close() error { - return f.coreFactory.Close() -} - -func (f *Factory) Purge(ctx context.Context) error { - return f.coreFactory.Purge(ctx) -} - -func (f *Factory) InheritSettingsFrom(other storage.Factory) { - if otherFactory, ok := other.(*Factory); ok { - f.getConfig().ApplyDefaults(otherFactory.getConfig()) - } -} - -func (f *Factory) IsArchiveCapable() bool { - return f.Options.Config.namespace == archiveNamespace && f.Options.Config.Enabled -} - -func (f *Factory) getConfig() *config.Configuration { - return f.Options.GetConfig() -} diff --git a/internal/storage/v1/elasticsearch/factoryv1_test.go b/internal/storage/v1/elasticsearch/factoryv1_test.go index 25d20e05217..0e57a478789 100644 --- a/internal/storage/v1/elasticsearch/factoryv1_test.go +++ b/internal/storage/v1/elasticsearch/factoryv1_test.go @@ -2,188 +2,3 @@ // SPDX-License-Identifier: Apache-2.0 package elasticsearch - -import ( - "net/http" - "net/http/httptest" - "testing" - - "github.com/stretchr/testify/require" - "go.uber.org/zap" - "go.uber.org/zap/zaptest" - - "github.com/jaegertracing/jaeger/internal/config" - "github.com/jaegertracing/jaeger/internal/metrics" - escfg "github.com/jaegertracing/jaeger/internal/storage/elasticsearch/config" -) - -func TestElasticsearchFactory(t *testing.T) { - f := NewFactory() - f.coreFactory = getTestingFactoryBase(t, &escfg.Configuration{}) - f.metricsFactory = metrics.NullFactory - v, command := config.Viperize(f.AddFlags) - command.ParseFlags([]string{}) - f.InitFromViper(v, zap.NewNop()) - _, err := f.CreateSpanReader() - require.NoError(t, err) - - _, err = f.CreateSpanWriter() - require.NoError(t, err) - - _, err = f.CreateDependencyReader() - require.NoError(t, err) - - _, err = f.CreateSamplingStore(1) - require.NoError(t, err) - - require.NoError(t, f.Close()) -} - -func TestInheritSettingsFrom(t *testing.T) { - primaryConfig := escfg.Configuration{ - MaxDocCount: 99, - } - primaryFactory := NewFactory() - primaryFactory.Options.Config.Configuration = primaryConfig - archiveConfig := escfg.Configuration{ - SendGetBodyAs: "PUT", - } - archiveFactory := NewFactory() - archiveFactory.Options = NewOptions(archiveNamespace) - archiveFactory.Options.Config.Configuration = archiveConfig - archiveFactory.InheritSettingsFrom(primaryFactory) - require.Equal(t, "PUT", archiveFactory.getConfig().SendGetBodyAs) - require.Equal(t, 99, archiveFactory.getConfig().MaxDocCount) -} - -func TestArchiveFactory(t *testing.T) { - tests := []struct { - name string - args []string - expectedReadAlias string - expectedWriteAlias string - }{ - { - name: "default settings", - args: []string{}, - expectedReadAlias: "archive", - expectedWriteAlias: "archive", - }, - { - name: "use read write aliases", - args: []string{"--es-archive.use-aliases=true"}, - expectedReadAlias: "archive-read", - expectedWriteAlias: "archive-write", - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - f := NewArchiveFactory() - v, command := config.Viperize(f.AddFlags) - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { - w.Write(mockEsServerResponse) - })) - t.Cleanup(server.Close) - serverArg := "--es-archive.server-urls=" + server.URL - testArgs := append(test.args, serverArg) - command.ParseFlags(testArgs) - f.InitFromViper(v, zap.NewNop()) - err := f.Initialize(metrics.NullFactory, zaptest.NewLogger(t)) - require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, f.Close()) - }) - require.Equal(t, test.expectedReadAlias, f.Options.GetConfig().ReadAliasSuffix) - require.Equal(t, test.expectedWriteAlias, f.Options.GetConfig().WriteAliasSuffix) - require.True(t, f.Options.Config.UseReadWriteAliases) - require.Equal(t, DefaultConfig().BulkProcessing, f.Options.GetConfig().BulkProcessing) - }) - } -} - -func TestFactoryInitializeErr(t *testing.T) { - t.Parallel() - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path == "/" { - w.WriteHeader(http.StatusInternalServerError) - } - })) - defer server.Close() - tests := []struct { - name string - factory *Factory - expectedErr string - }{ - { - name: "cfg validation err", - factory: &Factory{Options: &Options{Config: namespaceConfig{Configuration: escfg.Configuration{}}}}, - expectedErr: "Servers: non zero value required", - }, - { - name: "server error", - factory: &Factory{Options: &Options{Config: namespaceConfig{Configuration: escfg.Configuration{ - Servers: []string{server.URL}, - DisableHealthCheck: true, - }}}}, - expectedErr: "failed to create Elasticsearch client", - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - err := test.factory.Initialize(metrics.NullFactory, zaptest.NewLogger(t)) - require.ErrorContains(t, err, test.expectedErr) - }) - } -} - -func TestIsArchiveCapable(t *testing.T) { - tests := []struct { - name string - namespace string - enabled bool - expected bool - }{ - { - name: "archive capable", - namespace: "es-archive", - enabled: true, - expected: true, - }, - { - name: "not capable", - namespace: "es-archive", - enabled: false, - expected: false, - }, - { - name: "capable + wrong namespace", - namespace: "es", - enabled: true, - expected: false, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - factory := &Factory{ - Options: &Options{ - Config: namespaceConfig{ - namespace: test.namespace, - Configuration: escfg.Configuration{ - Enabled: test.enabled, - }, - }, - }, - } - result := factory.IsArchiveCapable() - require.Equal(t, test.expected, result) - }) - } -} - -func getTestingFactoryBase(t *testing.T, cfg *escfg.Configuration) *FactoryBase { - f := &FactoryBase{} - err := SetFactoryForTest(f, zaptest.NewLogger(t), metrics.NullFactory, cfg) - require.NoError(t, err) - return f -} diff --git a/internal/storage/v1/elasticsearch/helper.go b/internal/storage/v1/elasticsearch/helper.go index bcdc42a90b5..0e57a478789 100644 --- a/internal/storage/v1/elasticsearch/helper.go +++ b/internal/storage/v1/elasticsearch/helper.go @@ -2,58 +2,3 @@ // SPDX-License-Identifier: Apache-2.0 package elasticsearch - -import ( - "context" - - "github.com/stretchr/testify/mock" - "go.opentelemetry.io/collector/extension/extensionauth" - "go.opentelemetry.io/otel" - "go.uber.org/zap" - - "github.com/jaegertracing/jaeger/internal/metrics" - es "github.com/jaegertracing/jaeger/internal/storage/elasticsearch" - escfg "github.com/jaegertracing/jaeger/internal/storage/elasticsearch/config" - "github.com/jaegertracing/jaeger/internal/storage/elasticsearch/mocks" -) - -type mockClientBuilder struct { - err error - createTemplateError error -} - -func (m *mockClientBuilder) NewClient(context.Context, *escfg.Configuration, *zap.Logger, metrics.Factory, extensionauth.HTTPClient) (es.Client, error) { - if m.err == nil { - c := &mocks.Client{} - tService := &mocks.TemplateCreateService{} - dService := &mocks.IndicesDeleteService{} - tService.On("Body", mock.Anything).Return(tService) - tService.On("Do", context.Background()).Return(nil, m.createTemplateError) - c.On("CreateTemplate", mock.Anything).Return(tService) - c.On("GetVersion").Return(uint(6)) - c.On("Close").Return(nil) - c.On("DeleteIndex", mock.Anything).Return(dService) - dService.On("Do", mock.Anything).Return(nil, nil) - return c, nil - } - return nil, m.err -} - -func SetFactoryForTest(f *FactoryBase, logger *zap.Logger, metricsFactory metrics.Factory, cfg *escfg.Configuration) error { - return SetFactoryForTestWithCreateTemplateErr(f, logger, metricsFactory, cfg, nil) -} - -func SetFactoryForTestWithCreateTemplateErr(f *FactoryBase, logger *zap.Logger, metricsFactory metrics.Factory, cfg *escfg.Configuration, templateErr error) error { - f.newClientFn = (&mockClientBuilder{createTemplateError: templateErr}).NewClient - f.logger = logger - f.metricsFactory = metricsFactory - f.config = cfg - f.tracer = otel.GetTracerProvider() - client, err := f.newClientFn(context.Background(), cfg, logger, metricsFactory, nil) - if err != nil { - return err - } - f.client.Store(&client) - f.templateBuilder = es.TextTemplateBuilder{} - return nil -} diff --git a/internal/storage/v1/factory/factory.go b/internal/storage/v1/factory/factory.go index 2875b358575..70ff22eedd3 100644 --- a/internal/storage/v1/factory/factory.go +++ b/internal/storage/v1/factory/factory.go @@ -5,25 +5,7 @@ package factory import ( - "errors" - "flag" - "fmt" - "io" - - "github.com/spf13/viper" - "go.uber.org/zap" - - "github.com/jaegertracing/jaeger/internal/metrics" - "github.com/jaegertracing/jaeger/internal/safeexpvar" - "github.com/jaegertracing/jaeger/internal/storage/v1" - "github.com/jaegertracing/jaeger/internal/storage/v1/api/dependencystore" "github.com/jaegertracing/jaeger/internal/storage/v1/api/spanstore" - "github.com/jaegertracing/jaeger/internal/storage/v1/badger" - "github.com/jaegertracing/jaeger/internal/storage/v1/blackhole" - "github.com/jaegertracing/jaeger/internal/storage/v1/cassandra" - es "github.com/jaegertracing/jaeger/internal/storage/v1/elasticsearch" - "github.com/jaegertracing/jaeger/internal/storage/v1/grpc" - "github.com/jaegertracing/jaeger/internal/storage/v1/memory" ) const ( @@ -34,17 +16,14 @@ const ( grpcStorageType = "grpc" badgerStorageType = "badger" blackholeStorageType = "blackhole" - - downsamplingRatio = "downsampling.ratio" - downsamplingHashSalt = "downsampling.hashsalt" - spanStorageType = "span-storage-type" - - // defaultDownsamplingRatio is the default downsampling ratio. - defaultDownsamplingRatio = 1.0 - // defaultDownsamplingHashSalt is the default downsampling hashsalt. - defaultDownsamplingHashSalt = "" ) +// ArchiveStorage holds archive span reader and writer. +type ArchiveStorage struct { + Reader spanstore.Reader + Writer spanstore.Writer +} + // AllStorageTypes defines all available storage backends var AllStorageTypes = []string{ cassandraStorageType, @@ -55,337 +34,3 @@ var AllStorageTypes = []string{ blackholeStorageType, grpcStorageType, } - -// AllSamplingStorageTypes returns all storage backends that implement adaptive sampling -func AllSamplingStorageTypes() []string { - f := &Factory{} - var backends []string - for _, st := range AllStorageTypes { - f, _ := f.getFactoryOfType(st) // no errors since we're looping through supported types - if _, ok := f.(storage.SamplingStoreFactory); ok { - backends = append(backends, st) - } - } - return backends -} - -var ( // interface comformance checks - _ storage.Factory = (*Factory)(nil) - _ io.Closer = (*Factory)(nil) - _ storage.Configurable = (*Factory)(nil) -) - -// Factory implements storage.Factory interface as a meta-factory for storage components. -type Factory struct { - Config - metricsFactory metrics.Factory - factories map[string]storage.Factory - archiveFactories map[string]storage.Factory - downsamplingFlagsAdded bool -} - -// NewFactory creates the meta-factory. -func NewFactory(config Config) (*Factory, error) { - f := &Factory{Config: config} - uniqueTypes := map[string]struct{}{ - f.SpanReaderType: {}, - f.DependenciesStorageType: {}, - } - for _, storageType := range f.SpanWriterTypes { - uniqueTypes[storageType] = struct{}{} - } - // skip SamplingStorageType if it is empty. See CreateSamplingStoreFactory for details - if f.SamplingStorageType != "" { - uniqueTypes[f.SamplingStorageType] = struct{}{} - } - f.factories = make(map[string]storage.Factory) - f.archiveFactories = make(map[string]storage.Factory) - for t := range uniqueTypes { - ff, err := f.getFactoryOfType(t) - if err != nil { - return nil, err - } - f.factories[t] = ff - - if af, ok := f.getArchiveFactoryOfType(t); ok { - f.archiveFactories[t] = af - } - } - return f, nil -} - -func (*Factory) getFactoryOfType(factoryType string) (storage.Factory, error) { - switch factoryType { - case cassandraStorageType: - return cassandra.NewFactory(), nil - case elasticsearchStorageType, opensearchStorageType: - return es.NewFactory(), nil - case memoryStorageType: - return memory.NewFactory(), nil - case badgerStorageType: - return badger.NewFactory(), nil - case grpcStorageType: - return grpc.NewFactory(), nil - case blackholeStorageType: - return blackhole.NewFactory(), nil - default: - return nil, fmt.Errorf("unknown storage type %s. Valid types are %v", factoryType, AllStorageTypes) - } -} - -func (*Factory) getArchiveFactoryOfType(factoryType string) (storage.Factory, bool) { - switch factoryType { - case cassandraStorageType: - return cassandra.NewArchiveFactory(), true - case elasticsearchStorageType, opensearchStorageType: - return es.NewArchiveFactory(), true - case grpcStorageType: - return grpc.NewArchiveFactory(), true - default: - return nil, false - } -} - -func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { - f.metricsFactory = metricsFactory - - initializeFactory := func(kind string, factory storage.Factory, role string) error { - mf := metricsFactory.Namespace(metrics.NSOptions{ - Name: "storage", - Tags: map[string]string{ - "kind": kind, - "role": role, - }, - }) - return factory.Initialize(mf, logger) - } - - for kind, factory := range f.factories { - if err := initializeFactory(kind, factory, "primary"); err != nil { - return err - } - } - - for kind, factory := range f.archiveFactories { - if archivable, ok := factory.(storage.ArchiveCapable); ok && archivable.IsArchiveCapable() { - if err := initializeFactory(kind, factory, "archive"); err != nil { - return err - } - } else { - delete(f.archiveFactories, kind) - } - } - - f.publishOpts() - return nil -} - -// CreateSpanReader implements storage.Factory. -func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { - factory, ok := f.factories[f.SpanReaderType] - if !ok { - return nil, fmt.Errorf("no %s backend registered for span store", f.SpanReaderType) - } - return factory.CreateSpanReader() -} - -// CreateSpanWriter implements storage.Factory. -func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { - var writers []spanstore.Writer - for _, storageType := range f.SpanWriterTypes { - factory, ok := f.factories[storageType] - if !ok { - return nil, fmt.Errorf("no %s backend registered for span store", storageType) - } - writer, err := factory.CreateSpanWriter() - if err != nil { - return nil, err - } - writers = append(writers, writer) - } - var spanWriter spanstore.Writer - if len(f.SpanWriterTypes) == 1 { - spanWriter = writers[0] - } else { - spanWriter = spanstore.NewCompositeWriter(writers...) - } - // Turn off DownsamplingWriter entirely if ratio == defaultDownsamplingRatio. - if f.DownsamplingRatio == defaultDownsamplingRatio { - return spanWriter, nil - } - return spanstore.NewDownsamplingWriter(spanWriter, spanstore.DownsamplingOptions{ - Ratio: f.DownsamplingRatio, - HashSalt: f.DownsamplingHashSalt, - MetricsFactory: f.metricsFactory.Namespace(metrics.NSOptions{Name: "downsampling_writer"}), - }), nil -} - -// CreateSamplingStoreFactory creates a distributedlock.Lock and samplingstore.Store for use with adaptive sampling -func (f *Factory) CreateSamplingStoreFactory() (storage.SamplingStoreFactory, error) { - // if a sampling storage type was specified then use it, otherwise search all factories - // for compatibility - if f.SamplingStorageType != "" { - factory, ok := f.factories[f.SamplingStorageType] - if !ok { - return nil, fmt.Errorf("no %s backend registered for sampling store", f.SamplingStorageType) - } - ss, ok := factory.(storage.SamplingStoreFactory) - if !ok { - return nil, fmt.Errorf("storage factory of type %s does not support sampling store", f.SamplingStorageType) - } - return ss, nil - } - - for _, factory := range f.factories { - ss, ok := factory.(storage.SamplingStoreFactory) - if ok { - return ss, nil - } - } - - // returning nothing is valid here. it's quite possible that the user has no backend that can support adaptive sampling - // this is fine as long as adaptive sampling is also not configured - return nil, nil -} - -// CreateDependencyReader implements storage.Factory -func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { - factory, ok := f.factories[f.DependenciesStorageType] - if !ok { - return nil, fmt.Errorf("no %s backend registered for span store", f.DependenciesStorageType) - } - return factory.CreateDependencyReader() -} - -// AddFlags implements storage.Configurable -func (f *Factory) AddFlags(flagSet *flag.FlagSet) { - addFlags := func(factories map[string]storage.Factory) { - for _, factory := range factories { - if conf, ok := factory.(storage.Configurable); ok { - conf.AddFlags(flagSet) - } - } - } - addFlags(f.factories) - addFlags(f.archiveFactories) -} - -// AddPipelineFlags adds all the standard flags as well as the downsampling -// flags. This is intended to be used in Jaeger pipeline services such as -// the collector or ingester. -func (f *Factory) AddPipelineFlags(flagSet *flag.FlagSet) { - f.AddFlags(flagSet) - f.addDownsamplingFlags(flagSet) -} - -// addDownsamplingFlags add flags for Downsampling params -func (f *Factory) addDownsamplingFlags(flagSet *flag.FlagSet) { - f.downsamplingFlagsAdded = true - flagSet.Float64( - downsamplingRatio, - defaultDownsamplingRatio, - "Ratio of spans passed to storage after downsampling (between 0 and 1), e.g ratio = 0.3 means we are keeping 30% of spans and dropping 70% of spans; ratio = 1.0 disables downsampling.", - ) - flagSet.String( - downsamplingHashSalt, - defaultDownsamplingHashSalt, - "Salt used when hashing trace id for downsampling.", - ) -} - -// InitFromViper implements storage.Configurable -func (f *Factory) InitFromViper(v *viper.Viper, logger *zap.Logger) { - initializeConfigurable := func(factory storage.Factory) { - if conf, ok := factory.(storage.Configurable); ok { - conf.InitFromViper(v, logger) - } - } - for _, factory := range f.factories { - initializeConfigurable(factory) - } - for kind, factory := range f.archiveFactories { - initializeConfigurable(factory) - - if primaryFactory, ok := f.factories[kind]; ok { - if inheritable, ok := factory.(storage.Inheritable); ok { - inheritable.InheritSettingsFrom(primaryFactory) - } - } - } - f.initDownsamplingFromViper(v) -} - -func (f *Factory) initDownsamplingFromViper(v *viper.Viper) { - // if the downsampling flag isn't set then this component used the standard "AddFlags" method - // and has no use for downsampling. the default settings effectively disable downsampling - if !f.downsamplingFlagsAdded { - f.Config.DownsamplingRatio = defaultDownsamplingRatio - f.Config.DownsamplingHashSalt = defaultDownsamplingHashSalt - return - } - - f.Config.DownsamplingRatio = v.GetFloat64(downsamplingRatio) - if f.Config.DownsamplingRatio < 0 || f.Config.DownsamplingRatio > 1 { - // Values not in the range of 0 ~ 1.0 will be set to default. - f.Config.DownsamplingRatio = 1.0 - } - f.Config.DownsamplingHashSalt = v.GetString(downsamplingHashSalt) -} - -type ArchiveStorage struct { - Reader spanstore.Reader - Writer spanstore.Writer -} - -func (f *Factory) InitArchiveStorage() (*ArchiveStorage, error) { - factory, ok := f.archiveFactories[f.SpanReaderType] - if !ok { - return nil, nil - } - reader, err := factory.CreateSpanReader() - if err != nil { - return nil, err - } - - factory, ok = f.archiveFactories[f.SpanWriterTypes[0]] - if !ok { - return nil, nil - } - writer, err := factory.CreateSpanWriter() - if err != nil { - return nil, err - } - - return &ArchiveStorage{ - Reader: reader, - Writer: writer, - }, nil -} - -var _ io.Closer = (*Factory)(nil) - -// Close closes the resources held by the factory -func (f *Factory) Close() error { - var errs []error - closeFactory := func(factory storage.Factory) { - if closer, ok := factory.(io.Closer); ok { - if err := closer.Close(); err != nil { - errs = append(errs, err) - } - } - } - for _, storageType := range f.SpanWriterTypes { - if factory, ok := f.factories[storageType]; ok { - closeFactory(factory) - } - if factory, ok := f.archiveFactories[storageType]; ok { - closeFactory(factory) - } - } - return errors.Join(errs...) -} - -func (f *Factory) publishOpts() { - safeexpvar.SetInt(downsamplingRatio, int64(f.Config.DownsamplingRatio)) - safeexpvar.SetInt(spanStorageType+"-"+f.Config.SpanReaderType, 1) -} diff --git a/internal/storage/v1/factory/factory_test.go b/internal/storage/v1/factory/factory_test.go index a68cf090a66..a9769d8a547 100644 --- a/internal/storage/v1/factory/factory_test.go +++ b/internal/storage/v1/factory/factory_test.go @@ -1,581 +1,4 @@ -// Copyright (c) 2019 The Jaeger Authors. -// Copyright (c) 2018 Uber Technologies, Inc. +// Copyright (c) 2025 The Jaeger Authors. // SPDX-License-Identifier: Apache-2.0 package factory - -import ( - "errors" - "expvar" - "flag" - "io" - "reflect" - "strings" - "testing" - - "github.com/spf13/viper" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap" - - "github.com/jaegertracing/jaeger/internal/config" - "github.com/jaegertracing/jaeger/internal/metrics" - "github.com/jaegertracing/jaeger/internal/storage/v1" - "github.com/jaegertracing/jaeger/internal/storage/v1/api/dependencystore" - depstoremocks "github.com/jaegertracing/jaeger/internal/storage/v1/api/dependencystore/mocks" - "github.com/jaegertracing/jaeger/internal/storage/v1/api/spanstore" - spanstoremocks "github.com/jaegertracing/jaeger/internal/storage/v1/api/spanstore/mocks" - "github.com/jaegertracing/jaeger/internal/storage/v1/mocks" -) - -func defaultCfg() Config { - return Config{ - SpanWriterTypes: []string{cassandraStorageType}, - SpanReaderType: cassandraStorageType, - DependenciesStorageType: cassandraStorageType, - DownsamplingRatio: 1.0, - DownsamplingHashSalt: "", - } -} - -func TestNewFactory(t *testing.T) { - f, err := NewFactory(defaultCfg()) - require.NoError(t, err) - assert.NotEmpty(t, f.factories) - assert.NotEmpty(t, f.factories[cassandraStorageType]) - assert.Equal(t, cassandraStorageType, f.SpanWriterTypes[0]) - assert.Equal(t, cassandraStorageType, f.SpanReaderType) - assert.Equal(t, cassandraStorageType, f.DependenciesStorageType) - - f, err = NewFactory(Config{ - SpanWriterTypes: []string{cassandraStorageType, badgerStorageType}, - SpanReaderType: elasticsearchStorageType, - DependenciesStorageType: memoryStorageType, - }) - require.NoError(t, err) - assert.NotEmpty(t, f.factories) - assert.NotEmpty(t, f.factories[cassandraStorageType]) - assert.NotEmpty(t, f.factories[elasticsearchStorageType]) - assert.NotNil(t, f.factories[memoryStorageType]) - assert.Equal(t, []string{cassandraStorageType, badgerStorageType}, f.SpanWriterTypes) - assert.Equal(t, elasticsearchStorageType, f.SpanReaderType) - assert.Equal(t, memoryStorageType, f.DependenciesStorageType) - - _, err = NewFactory(Config{SpanWriterTypes: []string{"x"}, DependenciesStorageType: "y", SpanReaderType: "z"}) - require.Error(t, err) - expected := "unknown storage type" // could be 'x' or 'y' since code iterates through map. - assert.Equal(t, expected, err.Error()[0:len(expected)]) - - require.NoError(t, f.Close()) -} - -func TestClose(t *testing.T) { - storageType := "foo" - err := errors.New("some error") - f := Factory{ - factories: map[string]storage.Factory{ - storageType: &errorFactory{closeErr: err}, - }, - Config: Config{SpanWriterTypes: []string{storageType}}, - } - require.EqualError(t, f.Close(), err.Error()) -} - -func TestInitialize(t *testing.T) { - f, err := NewFactory(defaultCfg()) - require.NoError(t, err) - assert.NotEmpty(t, f.factories) - assert.NotEmpty(t, f.factories[cassandraStorageType]) - - mock := new(mocks.Factory) - f.factories[cassandraStorageType] = mock - f.archiveFactories[cassandraStorageType] = mock - - m := metrics.NullFactory - l := zap.NewNop() - mock.On("Initialize", m, l).Return(nil) - require.NoError(t, f.Initialize(m, l)) - - mock = new(mocks.Factory) - f.factories[cassandraStorageType] = mock - mock.On("Initialize", m, l).Return(errors.New("init-error")) - require.EqualError(t, f.Initialize(m, l), "init-error") -} - -func TestCreate(t *testing.T) { - f, err := NewFactory(defaultCfg()) - require.NoError(t, err) - assert.NotEmpty(t, f.factories) - assert.NotEmpty(t, f.factories[cassandraStorageType]) - - mock := new(mocks.Factory) - f.factories[cassandraStorageType] = mock - - spanReader := new(spanstoremocks.Reader) - spanWriter := new(spanstoremocks.Writer) - depReader := new(depstoremocks.Reader) - - mock.On("CreateSpanReader").Return(spanReader, errors.New("span-reader-error")) - mock.On("CreateSpanWriter").Once().Return(spanWriter, errors.New("span-writer-error")) - mock.On("CreateDependencyReader").Return(depReader, errors.New("dep-reader-error")) - - r, err := f.CreateSpanReader() - assert.Equal(t, spanReader, r) - require.EqualError(t, err, "span-reader-error") - - w, err := f.CreateSpanWriter() - assert.Nil(t, w) - require.EqualError(t, err, "span-writer-error") - - d, err := f.CreateDependencyReader() - assert.Equal(t, depReader, d) - require.EqualError(t, err, "dep-reader-error") - - mock.On("CreateSpanWriter").Return(spanWriter, nil) - m := metrics.NullFactory - l := zap.NewNop() - mock.On("Initialize", m, l).Return(nil) - f.Initialize(m, l) - w, err = f.CreateSpanWriter() - require.NoError(t, err) - assert.Equal(t, spanWriter, w) -} - -func TestCreateDownsamplingWriter(t *testing.T) { - f, err := NewFactory(defaultCfg()) - require.NoError(t, err) - assert.NotEmpty(t, f.factories[cassandraStorageType]) - mock := new(mocks.Factory) - f.factories[cassandraStorageType] = mock - spanWriter := new(spanstoremocks.Writer) - mock.On("CreateSpanWriter").Return(spanWriter, nil) - - m := metrics.NullFactory - l := zap.NewNop() - mock.On("Initialize", m, l).Return(nil) - - testParams := []struct { - ratio float64 - writerType string - }{ - {0.5, "*spanstore.DownsamplingWriter"}, - {1.0, "*mocks.Writer"}, - } - - for _, param := range testParams { - t.Run(param.writerType, func(t *testing.T) { - f.DownsamplingRatio = param.ratio - f.Initialize(m, l) - newWriter, err := f.CreateSpanWriter() - require.NoError(t, err) - // Currently directly assertEqual doesn't work since DownsamplingWriter initializes with different - // address for hashPool. The following workaround checks writer type instead - assert.True(t, strings.HasPrefix(reflect.TypeOf(newWriter).String(), param.writerType)) - }) - } -} - -func TestCreateMulti(t *testing.T) { - cfg := defaultCfg() - cfg.SpanWriterTypes = append(cfg.SpanWriterTypes, elasticsearchStorageType) - f, err := NewFactory(cfg) - require.NoError(t, err) - - mock := new(mocks.Factory) - mock2 := new(mocks.Factory) - f.factories[cassandraStorageType] = mock - f.archiveFactories[cassandraStorageType] = mock - f.factories[elasticsearchStorageType] = mock2 - f.archiveFactories[elasticsearchStorageType] = mock2 - - spanWriter := new(spanstoremocks.Writer) - spanWriter2 := new(spanstoremocks.Writer) - - mock.On("CreateSpanWriter").Once().Return(spanWriter, errors.New("span-writer-error")) - - w, err := f.CreateSpanWriter() - assert.Nil(t, w) - require.EqualError(t, err, "span-writer-error") - - mock.On("CreateSpanWriter").Return(spanWriter, nil) - mock2.On("CreateSpanWriter").Return(spanWriter2, nil) - m := metrics.NullFactory - l := zap.NewNop() - mock.On("Initialize", m, l).Return(nil) - mock2.On("Initialize", m, l).Return(nil) - f.Initialize(m, l) - w, err = f.CreateSpanWriter() - require.NoError(t, err) - assert.Equal(t, spanstore.NewCompositeWriter(spanWriter, spanWriter2), w) -} - -func TestCreateError(t *testing.T) { - f, err := NewFactory(defaultCfg()) - require.NoError(t, err) - assert.NotEmpty(t, f.factories) - assert.NotEmpty(t, f.factories[cassandraStorageType]) - delete(f.factories, cassandraStorageType) - - expectedErr := "no cassandra backend registered for span store" - // scope the vars to avoid bugs in the test - { - r, err := f.CreateSpanReader() - assert.Nil(t, r) - require.EqualError(t, err, expectedErr) - } - - { - w, err := f.CreateSpanWriter() - assert.Nil(t, w) - require.EqualError(t, err, expectedErr) - } - - { - d, err := f.CreateDependencyReader() - assert.Nil(t, d) - require.EqualError(t, err, expectedErr) - } -} - -func TestAllSamplingStorageTypes(t *testing.T) { - assert.Equal(t, []string{"cassandra", "memory", "badger"}, AllSamplingStorageTypes()) -} - -func TestCreateSamplingStoreFactory(t *testing.T) { - f, err := NewFactory(defaultCfg()) - require.NoError(t, err) - assert.NotEmpty(t, f.factories) - assert.NotEmpty(t, f.factories[cassandraStorageType]) - - // if not specified sampling store is chosen from available factories - ssFactory, err := f.CreateSamplingStoreFactory() - assert.Equal(t, f.factories[cassandraStorageType], ssFactory) - require.NoError(t, err) - - // if not specified and there's no compatible factories then return nil - delete(f.factories, cassandraStorageType) - ssFactory, err = f.CreateSamplingStoreFactory() - assert.Nil(t, ssFactory) - require.NoError(t, err) - - // if an incompatible factory is specified return err - cfg := defaultCfg() - cfg.SamplingStorageType = "elasticsearch" - f, err = NewFactory(cfg) - require.NoError(t, err) - ssFactory, err = f.CreateSamplingStoreFactory() - assert.Nil(t, ssFactory) - require.EqualError(t, err, "storage factory of type elasticsearch does not support sampling store") - - // if a compatible factory is specified then return it - cfg.SamplingStorageType = "cassandra" - f, err = NewFactory(cfg) - require.NoError(t, err) - ssFactory, err = f.CreateSamplingStoreFactory() - assert.Equal(t, ssFactory, f.factories["cassandra"]) - require.NoError(t, err) -} - -type configurable struct { - mocks.Factory - flagSet *flag.FlagSet - viper *viper.Viper - logger *zap.Logger -} - -// AddFlags implements storage.Configurable -func (f *configurable) AddFlags(flagSet *flag.FlagSet) { - f.flagSet = flagSet -} - -// InitFromViper implements storage.Configurable -func (f *configurable) InitFromViper(v *viper.Viper, logger *zap.Logger) { - f.viper = v - f.logger = logger -} - -func TestConfigurable(t *testing.T) { - f, err := NewFactory(defaultCfg()) - require.NoError(t, err) - assert.NotEmpty(t, f.factories) - assert.NotEmpty(t, f.factories[cassandraStorageType]) - - mock := new(configurable) - f.factories[cassandraStorageType] = mock - - fs := new(flag.FlagSet) - v := viper.New() - - f.AddFlags(fs) - f.InitFromViper(v, zap.NewNop()) - - assert.Equal(t, fs, mock.flagSet) - assert.Equal(t, v, mock.viper) -} - -type inheritable struct { - mocks.Factory - calledWith storage.Factory -} - -func (i *inheritable) InheritSettingsFrom(other storage.Factory) { - i.calledWith = other -} - -func TestInheritable(t *testing.T) { - f, err := NewFactory(defaultCfg()) - require.NoError(t, err) - assert.NotEmpty(t, f.factories) - assert.NotEmpty(t, f.factories[cassandraStorageType]) - assert.NotEmpty(t, f.archiveFactories[cassandraStorageType]) - - mockInheritable := new(inheritable) - f.factories[cassandraStorageType] = &mocks.Factory{} - f.archiveFactories[cassandraStorageType] = mockInheritable - - fs := new(flag.FlagSet) - v := viper.New() - - f.AddFlags(fs) - f.InitFromViper(v, zap.NewNop()) - - assert.Equal(t, f.factories[cassandraStorageType], mockInheritable.calledWith) -} - -type archiveConfigurable struct { - isConfigurable bool - *mocks.Factory -} - -func (ac *archiveConfigurable) IsArchiveCapable() bool { - return ac.isConfigurable -} - -func TestArchiveConfigurable(t *testing.T) { - tests := []struct { - name string - isArchiveCapable bool - archiveInitError error - expectedError error - expectedArchiveSize int - }{ - { - name: "Archive factory initializes successfully", - isArchiveCapable: true, - archiveInitError: nil, - expectedError: nil, - expectedArchiveSize: 1, - }, - { - name: "Archive factory initialization fails", - isArchiveCapable: true, - archiveInitError: assert.AnError, - expectedError: assert.AnError, - expectedArchiveSize: 1, - }, - { - name: "Archive factory is not archive-capable", - isArchiveCapable: false, - archiveInitError: nil, - expectedError: nil, - expectedArchiveSize: 0, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - f, err := NewFactory(defaultCfg()) - require.NoError(t, err) - - primaryFactory := &mocks.Factory{} - archiveFactory := &mocks.Factory{} - archiveConfigurable := &archiveConfigurable{ - isConfigurable: test.isArchiveCapable, - Factory: archiveFactory, - } - - f.factories[cassandraStorageType] = primaryFactory - f.archiveFactories[cassandraStorageType] = archiveConfigurable - - m := metrics.NullFactory - l := zap.NewNop() - primaryFactory.On("Initialize", m, l).Return(nil).Once() - archiveFactory.On("Initialize", m, l).Return(test.archiveInitError).Once() - - err = f.Initialize(m, l) - if test.expectedError != nil { - require.ErrorIs(t, err, test.expectedError) - } else { - require.NoError(t, err) - } - - assert.Len(t, f.archiveFactories, test.expectedArchiveSize) - }) - } -} - -func TestParsingDownsamplingRatio(t *testing.T) { - f := Factory{} - v, command := config.Viperize(f.AddPipelineFlags) - err := command.ParseFlags([]string{ - "--downsampling.ratio=1.5", - "--downsampling.hashsalt=jaeger", - }) - require.NoError(t, err) - f.InitFromViper(v, zap.NewNop()) - - assert.InDelta(t, 1.0, f.Config.DownsamplingRatio, 0.01) - assert.Equal(t, "jaeger", f.Config.DownsamplingHashSalt) - - err = command.ParseFlags([]string{ - "--downsampling.ratio=0.5", - }) - require.NoError(t, err) - f.InitFromViper(v, zap.NewNop()) - assert.InDelta(t, 0.5, f.Config.DownsamplingRatio, 0.01) -} - -func TestDefaultDownsamplingWithAddFlags(t *testing.T) { - f := Factory{} - v, command := config.Viperize(f.AddFlags) - err := command.ParseFlags([]string{}) - require.NoError(t, err) - f.InitFromViper(v, zap.NewNop()) - - assert.InDelta(t, defaultDownsamplingRatio, f.Config.DownsamplingRatio, 0.01) - assert.Equal(t, defaultDownsamplingHashSalt, f.Config.DownsamplingHashSalt) - - err = command.ParseFlags([]string{ - "--downsampling.ratio=0.5", - }) - require.Error(t, err) -} - -func TestPublishOpts(t *testing.T) { - f, err := NewFactory(defaultCfg()) - require.NoError(t, err) - f.publishOpts() - - assert.EqualValues(t, 1, expvar.Get(downsamplingRatio).(*expvar.Int).Value()) - assert.EqualValues(t, 1, expvar.Get(spanStorageType+"-"+f.SpanReaderType).(*expvar.Int).Value()) -} - -func TestInitArchiveStorage(t *testing.T) { - tests := []struct { - name string - setupMock func(*mocks.Factory) - factoryCfg func() Config - expectedStorage *ArchiveStorage - expectedError error - }{ - { - name: "successful initialization", - setupMock: func(mock *mocks.Factory) { - spanReader := &spanstoremocks.Reader{} - spanWriter := &spanstoremocks.Writer{} - mock.On("CreateSpanReader").Return(spanReader, nil) - mock.On("CreateSpanWriter").Return(spanWriter, nil) - }, - factoryCfg: defaultCfg, - expectedStorage: &ArchiveStorage{ - Reader: &spanstoremocks.Reader{}, - Writer: &spanstoremocks.Writer{}, - }, - }, - { - name: "no archive span reader", - setupMock: func(mock *mocks.Factory) { - spanReader := &spanstoremocks.Reader{} - spanWriter := &spanstoremocks.Writer{} - mock.On("CreateSpanReader").Return(spanReader, nil) - mock.On("CreateSpanWriter").Return(spanWriter, nil) - }, - factoryCfg: func() Config { - cfg := defaultCfg() - cfg.SpanReaderType = "blackhole" - return cfg - }, - expectedStorage: nil, - }, - { - name: "no archive span writer", - setupMock: func(mock *mocks.Factory) { - spanReader := &spanstoremocks.Reader{} - spanWriter := &spanstoremocks.Writer{} - mock.On("CreateSpanReader").Return(spanReader, nil) - mock.On("CreateSpanWriter").Return(spanWriter, nil) - }, - factoryCfg: func() Config { - cfg := defaultCfg() - cfg.SpanWriterTypes = []string{"blackhole"} - return cfg - }, - expectedStorage: nil, - }, - { - name: "error initializing reader", - setupMock: func(mock *mocks.Factory) { - mock.On("CreateSpanReader").Return(nil, assert.AnError) - }, - factoryCfg: defaultCfg, - expectedStorage: nil, - expectedError: assert.AnError, - }, - { - name: "error initializing writer", - setupMock: func(mock *mocks.Factory) { - spanReader := new(spanstoremocks.Reader) - mock.On("CreateSpanReader").Return(spanReader, nil) - mock.On("CreateSpanWriter").Return(nil, assert.AnError) - }, - factoryCfg: defaultCfg, - expectedStorage: nil, - expectedError: assert.AnError, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - cfg := test.factoryCfg() - f, err := NewFactory(cfg) - require.NoError(t, err) - - mock := new(mocks.Factory) - f.archiveFactories[cassandraStorageType] = mock - test.setupMock(mock) - - storage, err := f.InitArchiveStorage() - require.Equal(t, test.expectedStorage, storage) - require.ErrorIs(t, err, test.expectedError) - }) - } -} - -type errorFactory struct { - closeErr error -} - -var ( - _ storage.Factory = (*errorFactory)(nil) - _ io.Closer = (*errorFactory)(nil) -) - -func (errorFactory) Initialize(metrics.Factory, *zap.Logger) error { - panic("implement me") -} - -func (errorFactory) CreateSpanReader() (spanstore.Reader, error) { - panic("implement me") -} - -func (errorFactory) CreateSpanWriter() (spanstore.Writer, error) { - panic("implement me") -} - -func (errorFactory) CreateDependencyReader() (dependencystore.Reader, error) { - panic("implement me") -} - -func (e errorFactory) Close() error { - return e.closeErr -} diff --git a/internal/storage/v1/grpc/factory.go b/internal/storage/v1/grpc/factory.go index c44bd64464c..ea73d215dfa 100644 --- a/internal/storage/v1/grpc/factory.go +++ b/internal/storage/v1/grpc/factory.go @@ -1,216 +1,4 @@ -// Copyright (c) 2019 The Jaeger Authors. +// Copyright (c) 2025 The Jaeger Authors. // SPDX-License-Identifier: Apache-2.0 package grpc - -import ( - "context" - "errors" - "flag" - "fmt" - "io" - - "github.com/spf13/viper" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configgrpc" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/trace" - "go.opentelemetry.io/otel/trace/noop" - "go.uber.org/zap" - "google.golang.org/grpc" - - "github.com/jaegertracing/jaeger/internal/auth/bearertoken" - "github.com/jaegertracing/jaeger/internal/metrics" - "github.com/jaegertracing/jaeger/internal/storage/v1" - "github.com/jaegertracing/jaeger/internal/storage/v1/api/dependencystore" - "github.com/jaegertracing/jaeger/internal/storage/v1/api/spanstore" - "github.com/jaegertracing/jaeger/internal/storage/v1/api/spanstore/spanstoremetrics" - "github.com/jaegertracing/jaeger/internal/storage/v1/grpc/shared" - "github.com/jaegertracing/jaeger/internal/telemetry" - "github.com/jaegertracing/jaeger/internal/tenancy" -) - -var ( // interface comformance checks - _ storage.Factory = (*Factory)(nil) - _ io.Closer = (*Factory)(nil) - _ storage.Configurable = (*Factory)(nil) - _ storage.ArchiveCapable = (*Factory)(nil) -) - -// Factory implements storage.Factory and creates storage components backed by a storage plugin. -type Factory struct { - options *options - telset telemetry.Settings - services *ClientPluginServices - tracedRemoteConn *grpc.ClientConn - untracedRemoteConn *grpc.ClientConn -} - -// NewFactory creates a new Factory. -func NewFactory() *Factory { - return &Factory{ - options: newOptions(remotePrefix), - telset: telemetry.NoopSettings(), - } -} - -func NewArchiveFactory() *Factory { - return &Factory{ - options: newOptions(archiveRemotePrefix), - telset: telemetry.NoopSettings(), - } -} - -// NewFactoryWithConfig is used from jaeger(v2). -func NewFactoryWithConfig( - cfg Config, - telset telemetry.Settings, -) (*Factory, error) { - f := NewFactory() - f.options.Config = cfg - f.telset = telset - if err := f.Initialize(telset.Metrics, telset.Logger); err != nil { - return nil, err - } - return f, nil -} - -// AddFlags implements storage.Configurable -func (f *Factory) AddFlags(flagSet *flag.FlagSet) { - f.options.addFlags(flagSet) -} - -// InitFromViper implements storage.Configurable -func (f *Factory) InitFromViper(v *viper.Viper, logger *zap.Logger) { - if err := f.options.initFromViper(&f.options.Config, v); err != nil { - logger.Fatal("unable to initialize gRPC storage factory", zap.Error(err)) - } -} - -// Initialize implements storage.Factory -func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { - f.telset.Metrics = metricsFactory - f.telset.Logger = logger - f.telset.TracerProvider = otel.GetTracerProvider() - - tracedTelset := getTelset(logger, f.telset.TracerProvider, f.telset.MeterProvider) - untracedTelset := getTelset(logger, noop.NewTracerProvider(), f.telset.MeterProvider) - newClientFn := func(telset component.TelemetrySettings, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) { - clientOpts := make([]configgrpc.ToClientConnOption, 0) - for _, opt := range opts { - clientOpts = append(clientOpts, configgrpc.WithGrpcDialOption(opt)) - } - return f.options.Config.ToClientConn(context.Background(), f.telset.Host.GetExtensions(), telset, clientOpts...) - } - - var err error - f.services, err = f.newRemoteStorage(tracedTelset, untracedTelset, newClientFn) - if err != nil { - return fmt.Errorf("grpc storage builder failed to create a store: %w", err) - } - logger.Info("Remote storage configuration", zap.Any("configuration", f.options.Config)) - return nil -} - -type newClientFn func(telset component.TelemetrySettings, opts ...grpc.DialOption) (*grpc.ClientConn, error) - -func (f *Factory) newRemoteStorage( - tracedTelset component.TelemetrySettings, - untracedTelset component.TelemetrySettings, - newClient newClientFn, -) (*ClientPluginServices, error) { - c := f.options.Config - if c.Auth.HasValue() { - return nil, errors.New("authenticator is not supported") - } - unaryInterceptors := []grpc.UnaryClientInterceptor{ - bearertoken.NewUnaryClientInterceptor(), - } - streamInterceptors := []grpc.StreamClientInterceptor{ - bearertoken.NewStreamClientInterceptor(), - } - tenancyMgr := tenancy.NewManager(&c.Tenancy) - if tenancyMgr.Enabled { - unaryInterceptors = append(unaryInterceptors, tenancy.NewClientUnaryInterceptor(tenancyMgr)) - streamInterceptors = append(streamInterceptors, tenancy.NewClientStreamInterceptor(tenancyMgr)) - } - - baseOpts := append( - []grpc.DialOption{}, - grpc.WithChainUnaryInterceptor(unaryInterceptors...), - grpc.WithChainStreamInterceptor(streamInterceptors...), - ) - opts := append([]grpc.DialOption{}, baseOpts...) - opts = append(opts, grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(tracedTelset.TracerProvider)))) - - tracedRemoteConn, err := newClient(tracedTelset, opts...) - if err != nil { - return nil, fmt.Errorf("error creating traced remote storage client: %w", err) - } - f.tracedRemoteConn = tracedRemoteConn - untracedOpts := append([]grpc.DialOption{}, baseOpts...) - untracedOpts = append( - untracedOpts, - grpc.WithStatsHandler( - otelgrpc.NewClientHandler( - otelgrpc.WithTracerProvider(untracedTelset.TracerProvider)))) - untracedRemoteConn, err := newClient(tracedTelset, untracedOpts...) - if err != nil { - return nil, fmt.Errorf("error creating untraced remote storage client: %w", err) - } - f.untracedRemoteConn = untracedRemoteConn - grpcClient := shared.NewGRPCClient(tracedRemoteConn, untracedRemoteConn) - return &ClientPluginServices{ - PluginServices: shared.PluginServices{ - Store: grpcClient, - StreamingSpanWriter: grpcClient, - }, - Capabilities: grpcClient, - }, nil -} - -// CreateSpanReader implements storage.Factory -func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { - return spanstoremetrics.NewReaderDecorator(f.services.Store.SpanReader(), f.telset.Metrics), nil -} - -// CreateSpanWriter implements storage.Factory -func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { - if f.services.Capabilities != nil && f.services.StreamingSpanWriter != nil { - if capabilities, err := f.services.Capabilities.Capabilities(); err == nil && capabilities.StreamingSpanWriter { - return f.services.StreamingSpanWriter.StreamingSpanWriter(), nil - } - } - return f.services.Store.SpanWriter(), nil -} - -// CreateDependencyReader implements storage.Factory -func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { - return f.services.Store.DependencyReader(), nil -} - -// Close closes the resources held by the factory -func (f *Factory) Close() error { - var errs []error - if f.tracedRemoteConn != nil { - errs = append(errs, f.tracedRemoteConn.Close()) - } - if f.untracedRemoteConn != nil { - errs = append(errs, f.untracedRemoteConn.Close()) - } - return errors.Join(errs...) -} - -func getTelset(logger *zap.Logger, tracerProvider trace.TracerProvider, meterProvider metric.MeterProvider) component.TelemetrySettings { - return component.TelemetrySettings{ - Logger: logger, - TracerProvider: tracerProvider, - MeterProvider: meterProvider, - } -} - -func (f *Factory) IsArchiveCapable() bool { - return f.options.namespace == archiveRemotePrefix && f.options.enabled -} diff --git a/internal/storage/v1/grpc/factory_test.go b/internal/storage/v1/grpc/factory_test.go index 18d17f2d112..ea73d215dfa 100644 --- a/internal/storage/v1/grpc/factory_test.go +++ b/internal/storage/v1/grpc/factory_test.go @@ -1,333 +1,4 @@ -// Copyright (c) 2019 The Jaeger Authors. +// Copyright (c) 2025 The Jaeger Authors. // SPDX-License-Identifier: Apache-2.0 package grpc - -import ( - "context" - "errors" - "log" - "net" - "testing" - "time" - - "github.com/spf13/viper" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configauth" - "go.opentelemetry.io/collector/config/configgrpc" - "go.opentelemetry.io/collector/config/configoptional" - "go.opentelemetry.io/collector/exporter/exporterhelper" - "go.uber.org/zap" - "google.golang.org/grpc" - - "github.com/jaegertracing/jaeger/internal/config" - "github.com/jaegertracing/jaeger/internal/metrics" - "github.com/jaegertracing/jaeger/internal/storage/v1/api/dependencystore" - dependencystoremocks "github.com/jaegertracing/jaeger/internal/storage/v1/api/dependencystore/mocks" - "github.com/jaegertracing/jaeger/internal/storage/v1/api/spanstore" - spanstoremocks "github.com/jaegertracing/jaeger/internal/storage/v1/api/spanstore/mocks" - "github.com/jaegertracing/jaeger/internal/storage/v1/grpc/shared" - "github.com/jaegertracing/jaeger/internal/storage/v1/grpc/shared/mocks" - "github.com/jaegertracing/jaeger/internal/telemetry" - "github.com/jaegertracing/jaeger/internal/tenancy" -) - -type store struct { - reader spanstore.Reader - writer spanstore.Writer - deps dependencystore.Reader -} - -func (s *store) SpanReader() spanstore.Reader { - return s.reader -} - -func (s *store) SpanWriter() spanstore.Writer { - return s.writer -} - -func (s *store) ArchiveSpanReader() spanstore.Reader { - return s.reader -} - -func (s *store) ArchiveSpanWriter() spanstore.Writer { - return s.writer -} - -func (s *store) DependencyReader() dependencystore.Reader { - return s.deps -} - -func (s *store) StreamingSpanWriter() spanstore.Writer { - return s.writer -} - -func makeMockServices() *ClientPluginServices { - return &ClientPluginServices{ - PluginServices: shared.PluginServices{ - Store: &store{ - writer: new(spanstoremocks.Writer), - reader: new(spanstoremocks.Reader), - deps: new(dependencystoremocks.Reader), - }, - StreamingSpanWriter: &store{ - writer: new(spanstoremocks.Writer), - }, - }, - Capabilities: new(mocks.PluginCapabilities), - } -} - -func makeFactory(t *testing.T) *Factory { - f := NewFactory() - f.InitFromViper(viper.New(), zap.NewNop()) - f.options.ClientConfig.Endpoint = ":0" - require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) - - t.Cleanup(func() { - f.Close() - }) - - f.services = makeMockServices() - return f -} - -func TestNewFactoryError(t *testing.T) { - cfg := &Config{ - ClientConfig: configgrpc.ClientConfig{ - // non-empty Auth is currently not supported - Auth: configoptional.Some(configauth.Config{}), - }, - } - telset := telemetry.NoopSettings() - t.Run("with_config", func(t *testing.T) { - _, err := NewFactoryWithConfig(*cfg, telset) - assert.ErrorContains(t, err, "authenticator") - }) - - t.Run("viper", func(t *testing.T) { - f := NewFactory() - f.InitFromViper(viper.New(), zap.NewNop()) - f.options.Config = *cfg - err := f.Initialize(metrics.NullFactory, zap.NewNop()) - assert.ErrorContains(t, err, "authenticator") - }) - - t.Run("client", func(t *testing.T) { - // this is a silly test to verify handling of error from grpc.NewClient, which cannot be induced via params. - f, err := NewFactoryWithConfig(Config{ - ClientConfig: configgrpc.ClientConfig{ - Endpoint: ":0", - }, - }, telset) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, f.Close()) }) - newClientFn := func(_ component.TelemetrySettings, _ ...grpc.DialOption) (conn *grpc.ClientConn, err error) { - return nil, errors.New("test error") - } - _, err = f.newRemoteStorage(component.TelemetrySettings{}, component.TelemetrySettings{}, newClientFn) - assert.ErrorContains(t, err, "error creating traced remote storage client") - }) -} - -func TestInitFactory(t *testing.T) { - f := makeFactory(t) - f.services.Capabilities = nil - - reader, err := f.CreateSpanReader() - require.NoError(t, err) - assert.NotNil(t, reader) - - writer, err := f.CreateSpanWriter() - require.NoError(t, err) - assert.Equal(t, f.services.Store.SpanWriter(), writer) - - depReader, err := f.CreateDependencyReader() - require.NoError(t, err) - assert.Equal(t, f.services.Store.DependencyReader(), depReader) -} - -func TestGRPCStorageFactoryWithConfig(t *testing.T) { - lis, err := net.Listen("tcp", ":0") - require.NoError(t, err, "failed to listen") - - s := grpc.NewServer() - go func() { - if err := s.Serve(lis); err != nil { - log.Fatalf("Server exited with error: %v", err) - } - }() - defer s.Stop() - - cfg := Config{ - ClientConfig: configgrpc.ClientConfig{ - Endpoint: lis.Addr().String(), - }, - TimeoutConfig: exporterhelper.TimeoutConfig{ - Timeout: 1 * time.Second, - }, - Tenancy: tenancy.Options{ - Enabled: true, - }, - } - telset := telemetry.NoopSettings() - f, err := NewFactoryWithConfig(cfg, telset) - require.NoError(t, err) - require.NoError(t, f.Close()) -} - -func TestGRPCStorageFactory_Capabilities(t *testing.T) { - f := makeFactory(t) - - capabilities := f.services.Capabilities.(*mocks.PluginCapabilities) - capabilities.On("Capabilities"). - Return(&shared.Capabilities{ - StreamingSpanWriter: true, - }, nil).Times(1) - - writer, err := f.CreateSpanWriter() - require.NoError(t, err) - assert.NotNil(t, writer) -} - -func TestGRPCStorageFactory_CapabilitiesDisabled(t *testing.T) { - f := makeFactory(t) - - capabilities := f.services.Capabilities.(*mocks.PluginCapabilities) - capabilities.On("Capabilities"). - Return(&shared.Capabilities{ - StreamingSpanWriter: false, - }, nil).Times(1) - - writer, err := f.CreateSpanWriter() - require.NoError(t, err) - assert.NotNil(t, writer, "regular span writer is available") -} - -func TestGRPCStorageFactory_CapabilitiesError(t *testing.T) { - f := makeFactory(t) - - capabilities := f.services.Capabilities.(*mocks.PluginCapabilities) - customError := errors.New("made-up error") - capabilities.On("Capabilities").Return(nil, customError) - - writer, err := f.CreateSpanWriter() - require.NoError(t, err) - assert.NotNil(t, writer, "regular span writer is available") -} - -func TestGRPCStorageFactory_CapabilitiesNil(t *testing.T) { - f := makeFactory(t) - f.services.Capabilities = nil - - writer, err := f.CreateSpanWriter() - require.NoError(t, err) - assert.NotNil(t, writer, "regular span writer is available") -} - -func TestWithCLIFlags(t *testing.T) { - f := NewFactory() - v, command := config.Viperize(f.AddFlags) - err := command.ParseFlags([]string{ - "--grpc-storage.server=foo:1234", - }) - require.NoError(t, err) - f.InitFromViper(v, zap.NewNop()) - assert.Equal(t, "foo:1234", f.options.Config.ClientConfig.Endpoint) - require.NoError(t, f.Close()) -} - -func TestStreamingSpanWriterFactory_CapabilitiesNil(t *testing.T) { - f := makeFactory(t) - - f.services.Capabilities = nil - mockWriter := f.services.Store.SpanWriter().(*spanstoremocks.Writer) - mockWriter.On("WriteSpan", mock.Anything, mock.Anything).Return(errors.New("not streaming writer")) - mockWriter2 := f.services.StreamingSpanWriter.StreamingSpanWriter().(*spanstoremocks.Writer) - mockWriter2.On("WriteSpan", mock.Anything, mock.Anything).Return(errors.New("I am streaming writer")) - - writer, err := f.CreateSpanWriter() - require.NoError(t, err) - err = writer.WriteSpan(context.Background(), nil) - assert.ErrorContains(t, err, "not streaming writer") -} - -func TestStreamingSpanWriterFactory_Capabilities(t *testing.T) { - f := makeFactory(t) - - capabilities := f.services.Capabilities.(*mocks.PluginCapabilities) - customError := errors.New("made-up error") - capabilities. - // return error on the first call - On("Capabilities").Return(nil, customError).Once(). - // then return false on the second call - On("Capabilities").Return(&shared.Capabilities{}, nil).Once(). - // then return true on the second call - On("Capabilities").Return(&shared.Capabilities{StreamingSpanWriter: true}, nil).Once() - - mockWriter := f.services.Store.SpanWriter().(*spanstoremocks.Writer) - mockWriter.On("WriteSpan", mock.Anything, mock.Anything).Return(errors.New("not streaming writer")) - mockWriter2 := f.services.StreamingSpanWriter.StreamingSpanWriter().(*spanstoremocks.Writer) - mockWriter2.On("WriteSpan", mock.Anything, mock.Anything).Return(errors.New("I am streaming writer")) - - writer, err := f.CreateSpanWriter() - require.NoError(t, err) - err = writer.WriteSpan(context.Background(), nil) - require.ErrorContains(t, err, "not streaming writer", "unary writer when Capabilities return error") - - writer, err = f.CreateSpanWriter() - require.NoError(t, err) - err = writer.WriteSpan(context.Background(), nil) - require.ErrorContains(t, err, "not streaming writer", "unary writer when Capabilities return false") - - writer, err = f.CreateSpanWriter() - require.NoError(t, err) - err = writer.WriteSpan(context.Background(), nil) - assert.ErrorContains(t, err, "I am streaming writer", "streaming writer when Capabilities return true") -} - -func TestIsArchiveCapable(t *testing.T) { - tests := []struct { - name string - namespace string - enabled bool - expected bool - }{ - { - name: "archive capable", - namespace: "grpc-storage-archive", - enabled: true, - expected: true, - }, - { - name: "not capable", - namespace: "grpc-storage-archive", - enabled: false, - expected: false, - }, - { - name: "capable + wrong namespace", - namespace: "grpc-storage", - enabled: true, - expected: false, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - factory := &Factory{ - options: &options{ - namespace: test.namespace, - Config: Config{ - enabled: test.enabled, - }, - }, - } - result := factory.IsArchiveCapable() - require.Equal(t, test.expected, result) - }) - } -} diff --git a/internal/storage/v1/grpc/options_test.go b/internal/storage/v1/grpc/options_test.go index b3ca4368777..ddc63894286 100644 --- a/internal/storage/v1/grpc/options_test.go +++ b/internal/storage/v1/grpc/options_test.go @@ -9,9 +9,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "go.uber.org/zap/zaptest/observer" "github.com/jaegertracing/jaeger/internal/config" "github.com/jaegertracing/jaeger/internal/tenancy" @@ -66,20 +63,3 @@ func TestRemoteOptionsNoTLSWithFlags(t *testing.T) { assert.True(t, cfg.ClientConfig.TLS.Insecure) assert.Equal(t, 60*time.Second, cfg.TimeoutConfig.Timeout) } - -func TestFailedTLSFlags(t *testing.T) { - opts := newOptions("grpc-storage") - v, command := config.Viperize(opts.addFlags) - err := command.ParseFlags([]string{ - "--grpc-storage.tls.enabled=false", - "--grpc-storage.tls.cert=blah", // invalid unless tls.enabled=true - }) - require.NoError(t, err) - f := NewFactory() - core, logs := observer.New(zap.NewAtomicLevelAt(zapcore.ErrorLevel)) - logger := zap.New(core, zap.WithFatalHook(zapcore.WriteThenPanic)) - require.Panics(t, func() { f.InitFromViper(v, logger) }) - require.Len(t, logs.All(), 1) - assert.Contains(t, logs.All()[0].Message, "unable to initialize gRPC storage factory") - assert.Contains(t, logs.All()[0].ContextMap()["error"], "failed to parse gRPC storage TLS options") -} diff --git a/internal/storage/v1/memory/factory.go b/internal/storage/v1/memory/factory.go index dab3d961f15..4ccc73dcc44 100644 --- a/internal/storage/v1/memory/factory.go +++ b/internal/storage/v1/memory/factory.go @@ -1,116 +1,4 @@ -// Copyright (c) 2019 The Jaeger Authors. -// Copyright (c) 2017 Uber Technologies, Inc. +// Copyright (c) 2025 The Jaeger Authors. // SPDX-License-Identifier: Apache-2.0 package memory - -import ( - "context" - "flag" - - "github.com/spf13/viper" - "go.uber.org/zap" - - "github.com/jaegertracing/jaeger/internal/distributedlock" - "github.com/jaegertracing/jaeger/internal/metrics" - "github.com/jaegertracing/jaeger/internal/safeexpvar" - "github.com/jaegertracing/jaeger/internal/storage/v1" - "github.com/jaegertracing/jaeger/internal/storage/v1/api/dependencystore" - "github.com/jaegertracing/jaeger/internal/storage/v1/api/samplingstore" - "github.com/jaegertracing/jaeger/internal/storage/v1/api/spanstore" - "github.com/jaegertracing/jaeger/internal/storage/v1/api/spanstore/spanstoremetrics" -) - -var ( // interface comformance checks - _ storage.Factory = (*Factory)(nil) - _ storage.SamplingStoreFactory = (*Factory)(nil) - _ storage.Configurable = (*Factory)(nil) - _ storage.Purger = (*Factory)(nil) -) - -// Factory implements storage.Factory and creates storage components backed by memory store. -type Factory struct { - options Options - metricsFactory metrics.Factory - logger *zap.Logger - store *Store -} - -// NewFactory creates a new Factory. -func NewFactory() *Factory { - return &Factory{} -} - -// NewFactoryWithConfig is used from jaeger(v2). -func NewFactoryWithConfig( - cfg Configuration, - metricsFactory metrics.Factory, - logger *zap.Logger, -) *Factory { - f := NewFactory() - f.configureFromOptions(Options{Configuration: cfg}) - _ = f.Initialize(metricsFactory, logger) - return f -} - -// AddFlags implements storage.Configurable -func (*Factory) AddFlags(flagSet *flag.FlagSet) { - AddFlags(flagSet) -} - -// InitFromViper implements storage.Configurable -func (f *Factory) InitFromViper(v *viper.Viper, _ *zap.Logger) { - f.options.InitFromViper(v) -} - -// configureFromOptions initializes factory from the supplied options -func (f *Factory) configureFromOptions(opts Options) { - f.options = opts -} - -// Initialize implements storage.Factory -func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { - f.metricsFactory, f.logger = metricsFactory, logger - f.store = WithConfiguration(f.options.Configuration) - logger.Info("Memory storage initialized", zap.Any("configuration", f.store.defaultConfig)) - f.publishOpts() - - return nil -} - -// CreateSpanReader implements storage.Factory -func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { - return spanstoremetrics.NewReaderDecorator(f.store, f.metricsFactory), nil -} - -// CreateSpanWriter implements storage.Factory -func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { - return f.store, nil -} - -// CreateDependencyReader implements storage.Factory -func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { - return f.store, nil -} - -// CreateSamplingStore implements storage.SamplingStoreFactory -func (*Factory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) { - return NewSamplingStore(maxBuckets), nil -} - -// CreateLock implements storage.SamplingStoreFactory -func (*Factory) CreateLock() (distributedlock.Lock, error) { - return &Lock{}, nil -} - -func (f *Factory) publishOpts() { - safeexpvar.SetInt("jaeger_storage_memory_max_traces", int64(f.options.Configuration.MaxTraces)) -} - -// Purge removes all data from the Factory's underlying Memory store. -// This function is intended for testing purposes only and should not be used in production environments. -func (f *Factory) Purge(ctx context.Context) error { - f.logger.Info("Purging data from memory storage") - f.store.purge(ctx) - return nil -} diff --git a/internal/storage/v1/memory/factory_test.go b/internal/storage/v1/memory/factory_test.go index deaefa3215d..4ccc73dcc44 100644 --- a/internal/storage/v1/memory/factory_test.go +++ b/internal/storage/v1/memory/factory_test.go @@ -1,67 +1,4 @@ -// Copyright (c) 2019 The Jaeger Authors. -// Copyright (c) 2017 Uber Technologies, Inc. +// Copyright (c) 2025 The Jaeger Authors. // SPDX-License-Identifier: Apache-2.0 package memory - -import ( - "expvar" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap" - - "github.com/jaegertracing/jaeger/internal/config" - "github.com/jaegertracing/jaeger/internal/metrics" - "github.com/jaegertracing/jaeger/internal/storage/v1" -) - -var _ storage.Factory = new(Factory) - -func TestMemoryStorageFactory(t *testing.T) { - f := NewFactory() - require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) - assert.NotNil(t, f.store) - reader, err := f.CreateSpanReader() - require.NoError(t, err) - require.NotNil(t, reader) - writer, err := f.CreateSpanWriter() - require.NoError(t, err) - assert.Equal(t, f.store, writer) - depReader, err := f.CreateDependencyReader() - require.NoError(t, err) - assert.Equal(t, f.store, depReader) - samplingStore, err := f.CreateSamplingStore(2) - require.NoError(t, err) - assert.Equal(t, 2, samplingStore.(*SamplingStore).maxBuckets) - lock, err := f.CreateLock() - require.NoError(t, err) - assert.NotNil(t, lock) -} - -func TestWithConfiguration(t *testing.T) { - f := NewFactory() - v, command := config.Viperize(f.AddFlags) - command.ParseFlags([]string{"--memory.max-traces=100"}) - f.InitFromViper(v, zap.NewNop()) - assert.Equal(t, 100, f.options.Configuration.MaxTraces) -} - -func TestNewFactoryWithConfig(t *testing.T) { - cfg := Configuration{ - MaxTraces: 42, - } - f := NewFactoryWithConfig(cfg, metrics.NullFactory, zap.NewNop()) - assert.Equal(t, cfg, f.options.Configuration) -} - -func TestPublishOpts(t *testing.T) { - f := NewFactory() - v, command := config.Viperize(f.AddFlags) - command.ParseFlags([]string{"--memory.max-traces=100"}) - f.InitFromViper(v, zap.NewNop()) - - require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) - assert.EqualValues(t, 100, expvar.Get("jaeger_storage_memory_max_traces").(*expvar.Int).Value()) -} diff --git a/internal/storage/v1/memory/memory.go b/internal/storage/v1/memory/memory.go index 7c410485c95..db7c4926f2f 100644 --- a/internal/storage/v1/memory/memory.go +++ b/internal/storage/v1/memory/memory.go @@ -323,8 +323,8 @@ func flattenTags(span *model.Span) model.KeyValues { } // purge supports Purger interface. -func (st *Store) purge(context.Context) { - st.mu.Lock() - st.perTenant = make(map[string]*Tenant) - st.mu.Unlock() -} +// func (st *Store) purge(context.Context) { +// st.mu.Lock() +// st.perTenant = make(map[string]*Tenant) +// st.mu.Unlock() +// } diff --git a/internal/storage/v2/elasticsearch/factory_test.go b/internal/storage/v2/elasticsearch/factory_test.go index 9c3a1222293..513392f5818 100644 --- a/internal/storage/v2/elasticsearch/factory_test.go +++ b/internal/storage/v2/elasticsearch/factory_test.go @@ -10,12 +10,9 @@ import ( "testing" "github.com/stretchr/testify/require" - "go.uber.org/zap/zaptest" "github.com/jaegertracing/jaeger-idl/model/v1" - "github.com/jaegertracing/jaeger/internal/metrics" escfg "github.com/jaegertracing/jaeger/internal/storage/elasticsearch/config" - "github.com/jaegertracing/jaeger/internal/storage/v1/elasticsearch" "github.com/jaegertracing/jaeger/internal/telemetry" ) @@ -27,23 +24,23 @@ var mockEsServerResponse = []byte(` } `) -func TestNewFactory(t *testing.T) { - cfg := escfg.Configuration{} - coreFactory := getTestingFactoryBase(t, &cfg) - f := &Factory{coreFactory: coreFactory, config: cfg, metricsFactory: metrics.NullFactory} - _, err := f.CreateTraceReader() - require.NoError(t, err) - _, err = f.CreateTraceWriter() - require.NoError(t, err) - _, err = f.CreateDependencyReader() - require.NoError(t, err) - _, err = f.CreateSamplingStore(1) - require.NoError(t, err) - err = f.Close() - require.NoError(t, err) - err = f.Purge(context.Background()) - require.NoError(t, err) -} +// func TestNewFactory(t *testing.T) { +// cfg := escfg.Configuration{} +// coreFactory := getTestingFactoryBase(t, &cfg) +// f := &Factory{coreFactory: coreFactory, config: cfg, metricsFactory: metrics.NullFactory} +// _, err := f.CreateTraceReader() +// require.NoError(t, err) +// _, err = f.CreateTraceWriter() +// require.NoError(t, err) +// _, err = f.CreateDependencyReader() +// require.NoError(t, err) +// _, err = f.CreateSamplingStore(1) +// require.NoError(t, err) +// err = f.Close() +// require.NoError(t, err) +// err = f.Purge(context.Background()) +// require.NoError(t, err) +// } func TestESStorageFactoryWithConfig(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { @@ -65,12 +62,12 @@ func TestESStorageFactoryErr(t *testing.T) { require.Nil(t, f) } -func getTestingFactoryBase(t *testing.T, cfg *escfg.Configuration) *elasticsearch.FactoryBase { - f := &elasticsearch.FactoryBase{} - err := elasticsearch.SetFactoryForTest(f, zaptest.NewLogger(t), metrics.NullFactory, cfg) - require.NoError(t, err) - return f -} +// func getTestingFactoryBase(t *testing.T, cfg *escfg.Configuration) *elasticsearch.FactoryBase { +// f := &elasticsearch.FactoryBase{} +// err := elasticsearch.SetFactoryForTest(f, zaptest.NewLogger(t), metrics.NullFactory, cfg) +// require.NoError(t, err) +// return f +// } func TestAlwaysIncludesRequiredTags(t *testing.T) { // Set up mock Elasticsearch server diff --git a/internal/storage/v2/v1adapter/factory.go b/internal/storage/v2/v1adapter/factory.go index 8eae6354492..b0ff1c09759 100644 --- a/internal/storage/v2/v1adapter/factory.go +++ b/internal/storage/v2/v1adapter/factory.go @@ -1,83 +1,4 @@ -// Copyright (c) 2024 The Jaeger Authors. +// Copyright (c) 2025 The Jaeger Authors. // SPDX-License-Identifier: Apache-2.0 package v1adapter - -import ( - "io" - - storagev1 "github.com/jaegertracing/jaeger/internal/storage/v1" - "github.com/jaegertracing/jaeger/internal/storage/v2/api/depstore" - "github.com/jaegertracing/jaeger/internal/storage/v2/api/tracestore" -) - -type Factory struct { - ss storagev1.Factory -} - -func NewFactory(ss storagev1.Factory) tracestore.Factory { - factory := &Factory{ - ss: ss, - } - - var ( - purger, isPurger = ss.(storagev1.Purger) - sampler, isSampler = ss.(storagev1.SamplingStoreFactory) - ) - - switch { - case isSampler && isPurger: - return struct { - *Factory - storagev1.Purger - storagev1.SamplingStoreFactory - }{factory, purger, sampler} - case isPurger: - return struct { - *Factory - storagev1.Purger - }{factory, purger} - case isSampler: - return struct { - *Factory - storagev1.SamplingStoreFactory - }{factory, sampler} - default: - return factory - } -} - -// Close implements tracestore.Factory. -func (f *Factory) Close() error { - if closer, ok := f.ss.(io.Closer); ok { - return closer.Close() - } - return nil -} - -// CreateTraceReader implements tracestore.Factory. -func (f *Factory) CreateTraceReader() (tracestore.Reader, error) { - spanReader, err := f.ss.CreateSpanReader() - if err != nil { - return nil, err - } - return NewTraceReader(spanReader), nil -} - -// CreateTraceWriter implements tracestore.Factory. -func (f *Factory) CreateTraceWriter() (tracestore.Writer, error) { - spanWriter, err := f.ss.CreateSpanWriter() - if err != nil { - return nil, err - } - return NewTraceWriter(spanWriter), nil -} - -// CreateDependencyReader implements depstore.Factory. -func (f *Factory) CreateDependencyReader() (depstore.Reader, error) { - dr, err := f.ss.CreateDependencyReader() - if err != nil { - return nil, err - } - return NewDependencyReader(dr), nil -} diff --git a/internal/storage/v2/v1adapter/factory_test.go b/internal/storage/v2/v1adapter/factory_test.go index e156f695222..b0ff1c09759 100644 --- a/internal/storage/v2/v1adapter/factory_test.go +++ b/internal/storage/v2/v1adapter/factory_test.go @@ -1,167 +1,4 @@ -// Copyright (c) 2024 The Jaeger Authors. +// Copyright (c) 2025 The Jaeger Authors. // SPDX-License-Identifier: Apache-2.0 package v1adapter - -import ( - "errors" - "io" - "testing" - - "github.com/stretchr/testify/require" - - storagev1 "github.com/jaegertracing/jaeger/internal/storage/v1" - dependencystoremocks "github.com/jaegertracing/jaeger/internal/storage/v1/api/dependencystore/mocks" - spanstoremocks "github.com/jaegertracing/jaeger/internal/storage/v1/api/spanstore/mocks" - "github.com/jaegertracing/jaeger/internal/storage/v1/grpc" - factorymocks "github.com/jaegertracing/jaeger/internal/storage/v1/mocks" - "github.com/jaegertracing/jaeger/internal/storage/v2/api/depstore" - "github.com/jaegertracing/jaeger/internal/storage/v2/api/tracestore" -) - -func TestNewFactory(t *testing.T) { - mockFactory := new(factorymocks.Factory) - mockPurger := new(factorymocks.Purger) - mockSamplingStoreFactory := new(factorymocks.SamplingStoreFactory) - - tests := []struct { - name string - factory storagev1.Factory - expectedInterfaces []any - }{ - { - name: "No extra interfaces", - factory: mockFactory, - expectedInterfaces: []any{ - (*tracestore.Factory)(nil), - (*depstore.Factory)(nil), - (*io.Closer)(nil), - }, - }, - { - name: "Implements Purger", - factory: struct { - storagev1.Factory - storagev1.Purger - }{mockFactory, mockPurger}, - expectedInterfaces: []any{ - (*tracestore.Factory)(nil), - (*depstore.Factory)(nil), - (*io.Closer)(nil), - (*storagev1.Purger)(nil), - }, - }, - { - name: "Implements SamplingStoreFactory", - factory: struct { - storagev1.Factory - storagev1.SamplingStoreFactory - }{mockFactory, mockSamplingStoreFactory}, - expectedInterfaces: []any{ - (*tracestore.Factory)(nil), - (*depstore.Factory)(nil), - (*io.Closer)(nil), - (*storagev1.SamplingStoreFactory)(nil), - }, - }, - { - name: "Implements both Purger and SamplingStoreFactory", - factory: struct { - storagev1.Factory - storagev1.Purger - storagev1.SamplingStoreFactory - }{mockFactory, mockPurger, mockSamplingStoreFactory}, - expectedInterfaces: []any{ - (*tracestore.Factory)(nil), - (*depstore.Factory)(nil), - (*io.Closer)(nil), - (*storagev1.Purger)(nil), - (*storagev1.SamplingStoreFactory)(nil), - }, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - traceReader := NewFactory(test.factory) - for _, i := range test.expectedInterfaces { - require.Implements(t, i, traceReader) - } - }) - } -} - -func TestAdapterCloseNotOk(t *testing.T) { - f := NewFactory(&factorymocks.Factory{}) - closer, ok := f.(io.Closer) - require.True(t, ok) - require.NoError(t, closer.Close()) -} - -func TestAdapterClose(t *testing.T) { - f := NewFactory(grpc.NewFactory()) - closer, ok := f.(io.Closer) - require.True(t, ok) - require.NoError(t, closer.Close()) -} - -func TestAdapterCreateTraceReader(t *testing.T) { - f1 := new(factorymocks.Factory) - f1.On("CreateSpanReader").Return(new(spanstoremocks.Reader), nil) - - f := NewFactory(f1) - _, err := f.CreateTraceReader() - require.NoError(t, err) -} - -func TestAdapterCreateTraceReaderError(t *testing.T) { - f1 := new(factorymocks.Factory) - f1.On("CreateSpanReader").Return(nil, errors.New("mock error")) - - f := NewFactory(f1) - _, err := f.CreateTraceReader() - require.ErrorContains(t, err, "mock error") -} - -func TestAdapterCreateTraceWriterError(t *testing.T) { - f1 := new(factorymocks.Factory) - f1.On("CreateSpanWriter").Return(nil, errors.New("mock error")) - - f := NewFactory(f1) - _, err := f.CreateTraceWriter() - require.ErrorContains(t, err, "mock error") -} - -func TestAdapterCreateTraceWriter(t *testing.T) { - f1 := new(factorymocks.Factory) - f1.On("CreateSpanWriter").Return(new(spanstoremocks.Writer), nil) - - f := NewFactory(f1) - _, err := f.CreateTraceWriter() - require.NoError(t, err) -} - -func TestAdapterCreateDependencyReader(t *testing.T) { - f1 := new(factorymocks.Factory) - f1.On("CreateDependencyReader").Return(new(dependencystoremocks.Reader), nil) - - f := NewFactory(f1) - depFactory, ok := f.(depstore.Factory) - require.True(t, ok) - r, err := depFactory.CreateDependencyReader() - require.NoError(t, err) - require.NotNil(t, r) -} - -func TestAdapterCreateDependencyReaderError(t *testing.T) { - f1 := new(factorymocks.Factory) - testErr := errors.New("test error") - f1.On("CreateDependencyReader").Return(nil, testErr) - - f := NewFactory(f1) - depFactory, ok := f.(depstore.Factory) - require.True(t, ok) - r, err := depFactory.CreateDependencyReader() - require.ErrorIs(t, err, testErr) - require.Nil(t, r) -}