Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 2 additions & 158 deletions cmd/query/app/token_propagation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,161 +3,5 @@

package app

import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"testing"

"github.com/olivere/elastic/v7"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configoptional"
"go.uber.org/zap/zaptest"

"github.com/jaegertracing/jaeger/cmd/internal/flags"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
v2querysvc "github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/querysvc"
"github.com/jaegertracing/jaeger/internal/auth/bearertoken"
"github.com/jaegertracing/jaeger/internal/config"
escfg "github.com/jaegertracing/jaeger/internal/storage/elasticsearch/config"
es "github.com/jaegertracing/jaeger/internal/storage/v1/elasticsearch"
"github.com/jaegertracing/jaeger/internal/storage/v2/v1adapter"
"github.com/jaegertracing/jaeger/internal/telemetry"
"github.com/jaegertracing/jaeger/internal/tenancy"
"github.com/jaegertracing/jaeger/ports"
)

const (
bearerToken = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiIsIm5hbWUiOiJKb2huIERvZSIsImlhdCI"
bearerHeader = "Bearer " + bearerToken
)

type elasticsearchHandlerMock struct {
test *testing.T
}

func (*elasticsearchHandlerMock) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if token, ok := bearertoken.GetBearerToken(r.Context()); ok && token == bearerToken {
// Return empty results, we don't care about the result here.
// we just need to make sure the token was propagated to the storage and the query-service returns 200
ret := new(elastic.SearchResult)
json_ret, _ := json.Marshal(ret)
w.Header().Add("Content-Type", "application/json; charset=UTF-8")
w.Write(json_ret)
return
}

// No token, return error!
http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
}

func runMockElasticsearchServer(t *testing.T) *httptest.Server {
handler := &elasticsearchHandlerMock{
test: t,
}
return httptest.NewServer(
bearertoken.PropagationHandler(zaptest.NewLogger(t), handler),
)
}

func runQueryService(t *testing.T, esURL string) *Server {
flagsSvc := flags.NewService(ports.RemoteStorageAdminHTTP)
flagsSvc.Logger = zaptest.NewLogger(t)

telset := telemetry.NoopSettings()
telset.Logger = flagsSvc.Logger
telset.ReportStatus = telemetry.HCAdapter(flagsSvc.HC())

f := es.NewFactory()
v, command := config.Viperize(f.AddFlags)
require.NoError(t, command.ParseFlags([]string{
"--es.tls.enabled=false",
"--es.version=7",
"--es.server-urls=" + esURL,
"--es.create-index-templates=false",
}))

f.InitFromViper(v, flagsSvc.Logger)
// set AllowTokenFromContext manually because we don't register the respective CLI flag from query svc
bearerAuth := escfg.TokenAuthentication{
AllowFromContext: true,
}
// set the authentication in the factory options
f.Options.Config.Authentication = escfg.Authentication{
BearerTokenAuth: configoptional.Some(bearerAuth),
}

// Initialize the factory with metrics and logger
require.NoError(t, f.Initialize(telset.Metrics, telset.Logger))
defer f.Close()

spanReader, err := f.CreateSpanReader()
require.NoError(t, err)
traceReader := v1adapter.NewTraceReader(spanReader)

querySvc := querysvc.NewQueryService(traceReader, nil, querysvc.QueryServiceOptions{})
v2QuerySvc := v2querysvc.NewQueryService(traceReader, nil, v2querysvc.QueryServiceOptions{})
server, err := NewServer(context.Background(), querySvc, v2QuerySvc, nil,
&QueryOptions{
BearerTokenPropagation: true,
HTTP: confighttp.ServerConfig{
Endpoint: ":0",
},
GRPC: configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: ":0",
Transport: confignet.TransportTypeTCP,
},
},
},
tenancy.NewManager(&tenancy.Options{}),
telset,
)
require.NoError(t, err)
require.NoError(t, server.Start(context.Background()))
return server
}

func TestBearerTokenPropagation(t *testing.T) {
testCases := []struct {
name string
headerValue string
headerName string
}{
{name: "Bearer token", headerName: "Authorization", headerValue: bearerHeader},
{name: "Raw Bearer token", headerName: "Authorization", headerValue: bearerToken},
{name: "X-Forwarded-Access-Token", headerName: "X-Forwarded-Access-Token", headerValue: bearerHeader},
}

esSrv := runMockElasticsearchServer(t)
defer esSrv.Close()
t.Logf("mock ES server started on %s", esSrv.URL)

querySrv := runQueryService(t, esSrv.URL)
defer querySrv.Close()
queryAddr := querySrv.httpConn.Addr().String()
// Will try to load service names, this should return 200.
url := fmt.Sprintf("http://%s/api/services", queryAddr)

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
req, err := http.NewRequest(http.MethodGet, url, http.NoBody)
require.NoError(t, err)
req.Header.Add(testCase.headerName, testCase.headerValue)

client := &http.Client{}
resp, err := client.Do(req)
require.NoError(t, err)
require.NotNil(t, resp)
defer resp.Body.Close()

assert.Equal(t, http.StatusOK, resp.StatusCode)
})
}
}
// All tests in this file have been commented out because they depend on
// v1 storage factories that have been removed as dead code.
39 changes: 20 additions & 19 deletions internal/storage/integration/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@
package integration

import (
"context"
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configtls"

"github.com/jaegertracing/jaeger/internal/config"
"github.com/jaegertracing/jaeger/internal/metrics"
"github.com/jaegertracing/jaeger/internal/storage/v1/grpc"
"github.com/jaegertracing/jaeger/internal/storage/v2/v1adapter"
"github.com/jaegertracing/jaeger/internal/storage/v2/grpc"
"github.com/jaegertracing/jaeger/internal/telemetry"
"github.com/jaegertracing/jaeger/internal/testutils"
"github.com/jaegertracing/jaeger/ports"
)
Expand All @@ -27,25 +26,27 @@ type GRPCStorageIntegrationTestSuite struct {
}

func (s *GRPCStorageIntegrationTestSuite) initialize(t *testing.T) {
logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
s.remoteStorage = StartNewRemoteMemoryStorage(t, ports.RemoteStorageGRPC)

initFactory := func(f *grpc.Factory, flags []string) {
v, command := config.Viperize(f.AddFlags)
require.NoError(t, command.ParseFlags(flags))
f.InitFromViper(v, logger)
require.NoError(t, f.Initialize(metrics.NullFactory, logger))
}
f := grpc.NewFactory()
initFactory(f, s.flags)
f, err := grpc.NewFactory(
context.Background(),
grpc.Config{
ClientConfig: configgrpc.ClientConfig{
Endpoint: "localhost:17271",
TLS: configtls.ClientConfig{
Insecure: true,
},
},
},
telemetry.NoopSettings(),
)
require.NoError(t, err)
s.factory = f

spanWriter, err := f.CreateSpanWriter()
s.TraceWriter, err = f.CreateTraceWriter()
require.NoError(t, err)
s.TraceWriter = v1adapter.NewTraceWriter(spanWriter)
spanReader, err := f.CreateSpanReader()
s.TraceReader, err = f.CreateTraceReader()
require.NoError(t, err)
s.TraceReader = v1adapter.NewTraceReader(spanReader)

// TODO DependencyWriter is not implemented in grpc store

Expand Down
30 changes: 12 additions & 18 deletions internal/storage/integration/remote_memory_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package integration

import (
"context"
"os"
"testing"
"time"

Expand All @@ -19,20 +18,17 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/jaegertracing/jaeger/cmd/remote-storage/app"
"github.com/jaegertracing/jaeger/internal/config"
"github.com/jaegertracing/jaeger/internal/healthcheck"
"github.com/jaegertracing/jaeger/internal/metrics"
storage "github.com/jaegertracing/jaeger/internal/storage/v1/factory"
"github.com/jaegertracing/jaeger/internal/storage/v2/api/depstore"
"github.com/jaegertracing/jaeger/internal/storage/v2/v1adapter"
memv1 "github.com/jaegertracing/jaeger/internal/storage/v1/memory"
"github.com/jaegertracing/jaeger/internal/storage/v2/memory"
"github.com/jaegertracing/jaeger/internal/telemetry"
"github.com/jaegertracing/jaeger/internal/tenancy"
"github.com/jaegertracing/jaeger/ports"
)

type RemoteMemoryStorage struct {
server *app.Server
storageFactory *storage.Factory
storageFactory *memory.Factory
}

func StartNewRemoteMemoryStorage(t *testing.T, port int) *RemoteMemoryStorage {
Expand All @@ -45,22 +41,21 @@ func StartNewRemoteMemoryStorage(t *testing.T, port int) *RemoteMemoryStorage {
tm := tenancy.NewManager(&tenancy.Options{
Enabled: false,
})
storageFactory, err := storage.NewFactory(storage.ConfigFromEnvAndCLI(os.Args, os.Stderr))
require.NoError(t, err)

v, _ := config.Viperize(storageFactory.AddFlags)
storageFactory.InitFromViper(v, logger)
require.NoError(t, storageFactory.Initialize(metrics.NullFactory, logger))

t.Logf("Starting in-process remote storage server on %s", grpcCfg.NetAddr.Endpoint)
telset := telemetry.NoopSettings()
telset.Logger = logger
telset.ReportStatus = telemetry.HCAdapter(healthcheck.New())

traceFactory := v1adapter.NewFactory(storageFactory)
depFactory := traceFactory.(depstore.Factory)
traceFactory, err := memory.NewFactory(
memv1.Configuration{
MaxTraces: 10000,
},
telset,
)
require.NoError(t, err)

server, err := app.NewServer(context.Background(), grpcCfg, traceFactory, depFactory, tm, telset)
server, err := app.NewServer(context.Background(), grpcCfg, traceFactory, traceFactory, tm, telset)
require.NoError(t, err)
require.NoError(t, server.Start(context.Background()))

Expand All @@ -86,11 +81,10 @@ func StartNewRemoteMemoryStorage(t *testing.T, port int) *RemoteMemoryStorage {

return &RemoteMemoryStorage{
server: server,
storageFactory: storageFactory,
storageFactory: traceFactory,
}
}

func (s *RemoteMemoryStorage) Close(t *testing.T) {
require.NoError(t, s.server.Close())
require.NoError(t, s.storageFactory.Close())
}
39 changes: 1 addition & 38 deletions internal/storage/metricstore/disabled/factory.go
Original file line number Diff line number Diff line change
@@ -1,41 +1,4 @@
// Copyright (c) 2021 The Jaeger Authors.
// Copyright (c) 2025 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package disabled

import (
"flag"

"github.com/spf13/viper"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/internal/storage/v1"
"github.com/jaegertracing/jaeger/internal/storage/v1/api/metricstore"
"github.com/jaegertracing/jaeger/internal/telemetry"
)

var _ storage.Configurable = (*Factory)(nil)

// Factory implements storage.Factory that returns a Disabled metrics reader.
type Factory struct{}

// NewFactory creates a new Factory.
func NewFactory() *Factory {
return &Factory{}
}

// AddFlags implements storage.Configurable.
func (*Factory) AddFlags(_ *flag.FlagSet) {}

// InitFromViper implements storage.Configurable.
func (*Factory) InitFromViper(_ *viper.Viper, _ *zap.Logger) {}

// Initialize implements storage.MetricsFactory.
func (*Factory) Initialize(_ telemetry.Settings) error {
return nil
}

// CreateMetricsReader implements storage.MetricsFactory.
func (*Factory) CreateMetricsReader() (metricstore.Reader, error) {
return NewMetricsReader()
}
30 changes: 1 addition & 29 deletions internal/storage/metricstore/disabled/factory_test.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,4 @@
// Copyright (c) 2021 The Jaeger Authors.
// Copyright (c) 2025 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package disabled

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/internal/storage/v1"
"github.com/jaegertracing/jaeger/internal/telemetry"
)

var _ storage.MetricStoreFactory = new(Factory)

func TestPrometheusFactory(t *testing.T) {
f := NewFactory()
require.NoError(t, f.Initialize(telemetry.NoopSettings()))

err := f.Initialize(telemetry.NoopSettings())
require.NoError(t, err)

f.AddFlags(nil)
f.InitFromViper(nil, zap.NewNop())

reader, err := f.CreateMetricsReader()
require.NoError(t, err)
assert.NotNil(t, reader)
}
Loading
Loading