diff --git a/pkg/data/gensyncmap/gensyncmap.go b/pkg/data/gensyncmap/gensyncmap.go index 00b02b68cb5..b49731d0833 100644 --- a/pkg/data/gensyncmap/gensyncmap.go +++ b/pkg/data/gensyncmap/gensyncmap.go @@ -45,3 +45,12 @@ func (m *Map[K, V]) Len() int { }) return n } + +func (m *Map[K, V]) IsEmpty() bool { + isEmpty := true + m.m.Range(func(_, _ any) bool { + isEmpty = false + return false + }) + return isEmpty +} diff --git a/services/httpd/config.go b/services/httpd/config.go index 387c0d44beb..667a467860a 100644 --- a/services/httpd/config.go +++ b/services/httpd/config.go @@ -65,6 +65,7 @@ type Config struct { MaxConcurrentWriteLimit int `toml:"max-concurrent-write-limit"` MaxEnqueuedWriteLimit int `toml:"max-enqueued-write-limit"` EnqueuedWriteTimeout time.Duration `toml:"enqueued-write-timeout"` + UserQueryBytesEnabled bool `toml:"user-query-bytes-enabled"` TLS *tls.Config `toml:"-"` } diff --git a/services/httpd/handler.go b/services/httpd/handler.go index 617d1905c2f..d027d6dc3a2 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -30,6 +30,7 @@ import ( "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/monitor" "github.com/influxdata/influxdb/monitor/diagnostics" + "github.com/influxdata/influxdb/pkg/data/gensyncmap" "github.com/influxdata/influxdb/prometheus" "github.com/influxdata/influxdb/query" "github.com/influxdata/influxdb/services/meta" @@ -143,12 +144,13 @@ type Handler struct { Controller Controller CompilerMappings flux.CompilerMappings - Config *Config - Logger *zap.Logger - CLFLogger *log.Logger - accessLog *os.File - accessLogFilters StatusFilters - stats *Statistics + Config *Config + Logger *zap.Logger + CLFLogger *log.Logger + accessLog *os.File + accessLogFilters StatusFilters + stats *Statistics + queryBytesPerUser gensyncmap.Map[string, *atomic.Int64] requestTracker *RequestTracker writeThrottler *Throttler @@ -322,7 +324,12 @@ func NewHandler(c Config) *Handler { return func(w http.ResponseWriter, r *http.Request, user meta.User) { // TODO: This is the only place we use AuthorizeUnrestricted. It would be better to use an explicit permission if user == nil || !user.AuthorizeUnrestricted() { - h.Logger.Info("Unauthorized request", zap.String("user", user.ID()), zap.String("path", r.URL.Path)) + // Don't panic + id := "" + if user != nil { + id = user.ID() + } + h.Logger.Info("Unauthorized request", zap.String("user", id), zap.String("path", r.URL.Path)) h.httpError(w, "error authorizing admin access", http.StatusForbidden) return } @@ -442,7 +449,7 @@ type Statistics struct { // Statistics returns statistics for periodic monitoring. func (h *Handler) Statistics(tags map[string]string) []models.Statistic { - return []models.Statistic{{ + stats := []models.Statistic{{ Name: "httpd", Tags: tags, Values: map[string]interface{}{ @@ -472,6 +479,35 @@ func (h *Handler) Statistics(tags map[string]string) []models.Statistic { statFluxQueryRequestBytesTransmitted: atomic.LoadInt64(&h.stats.FluxQueryRequestBytesTransmitted), }, }} + + // Add per-user query bytes as separate statistics (one per user) if enabled + if h.Config.UserQueryBytesEnabled && !h.queryBytesPerUser.IsEmpty() { + h.queryBytesPerUser.Range(func(user string, counter *atomic.Int64) bool { + userTag := user + if user == "" { + userTag = StatAnonymousUser + } + userTags := models.NewTags(tags).Merge(map[string]string{StatUserTagKey: userTag}).Map() + stats = append(stats, models.Statistic{ + Name: "userquerybytes", + Tags: userTags, + Values: map[string]interface{}{statUserQueryRespBytes: counter.Load()}, + }) + return true + }) + } + + return stats +} + +// addQueryBytesForUser atomically adds bytes to the per-user query bytes counter. +// This is a no-op if UserQueryBytesEnabled is false. +func (h *Handler) addQueryBytesForUser(user string, n int64) { + if !h.Config.UserQueryBytesEnabled { + return + } + counter, _ := h.queryBytesPerUser.LoadOrStore(user, &atomic.Int64{}) + counter.Add(n) } // AddRoutes sets the provided routes on the handler. @@ -782,6 +818,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U Results: []*query.Result{r}, }) atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(n)) + h.addQueryBytesForUser(userName, int64(n)) w.(http.Flusher).Flush() continue } @@ -873,6 +910,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U if !chunked { n, _ := rw.WriteResponse(resp) atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(n)) + h.addQueryBytesForUser(userName, int64(n)) } } @@ -2031,6 +2069,11 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met return } + var userName string + if user != nil { + userName = user.ID() + } + respond := func(resp *prompb.ReadResponse) { data, err := resp.Marshal() if err != nil { @@ -2048,6 +2091,7 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met } atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(len(compressed))) + h.addQueryBytesForUser(userName, int64(len(compressed))) } ctx := context.Background() @@ -2134,6 +2178,11 @@ func (h *Handler) serveFluxQuery(w http.ResponseWriter, r *http.Request, user me atomic.AddInt64(&h.stats.FluxQueryRequestDuration, time.Since(start).Nanoseconds()) }(time.Now()) + var userName string + if user != nil { + userName = user.ID() + } + req, err := decodeQueryRequest(r) if err != nil { h.httpError(w, err.Error(), http.StatusBadRequest) @@ -2201,14 +2250,13 @@ func (h *Handler) serveFluxQuery(w http.ResponseWriter, r *http.Request, user me defer results.Release() n, err = encoder.Encode(w, results) - if err != nil { - if n == 0 { - // If the encoder did not write anything, we can write an error header. - h.httpError(w, err.Error(), http.StatusInternalServerError) - } else { - atomic.AddInt64(&h.stats.FluxQueryRequestBytesTransmitted, int64(n)) - } + if err != nil && n == 0 { + // If the encoder did not write anything, we can write an error header. + h.httpError(w, err.Error(), http.StatusInternalServerError) + return } + atomic.AddInt64(&h.stats.FluxQueryRequestBytesTransmitted, int64(n)) + h.addQueryBytesForUser(userName, int64(n)) } } diff --git a/services/httpd/handler_test.go b/services/httpd/handler_test.go index 2c7110f3264..ea72a04188d 100644 --- a/services/httpd/handler_test.go +++ b/services/httpd/handler_test.go @@ -6,7 +6,6 @@ import ( "encoding/json" "errors" "fmt" - "github.com/stretchr/testify/require" "io" "log" "math" @@ -18,6 +17,7 @@ import ( "reflect" "sort" "strings" + "sync" "sync/atomic" "testing" "time" @@ -43,6 +43,7 @@ import ( "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxql" "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/require" ) // Ensure the handler returns results from a query (including nil results). @@ -2851,6 +2852,532 @@ func TestHandlerDebugVars(t *testing.T) { } +// TestHandler_QueryBytesPerUser tests that query response bytes are tracked per user. +func TestHandler_QueryBytesPerUser(t *testing.T) { + const ( + testUserAlice = "alice" + testUserBob = "bob" + ) + + // Helper to find user query bytes statistic by user tag + findUserStat := func(stats []models.Statistic, user string) (int64, bool) { + for _, stat := range stats { + if stat.Name == "userquerybytes" && stat.Tags[httpd.StatUserTagKey] == user { + if v, ok := stat.Values["userQueryRespBytes"]; ok { + return v.(int64), true + } + } + } + return 0, false + } + + // Helper to count userquerybytes statistics + countUserStats := func(stats []models.Statistic) int { + count := 0 + for _, stat := range stats { + if stat.Name == "userquerybytes" { + count++ + } + } + return count + } + + t.Run("disabled by default", func(t *testing.T) { + h := NewHandler(false) // no auth, default config + + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { + ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} + return nil + } + + // Make a query + w := httptest.NewRecorder() + h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil)) + require.Equal(t, http.StatusOK, w.Code) + + // Check statistics - should only have httpd, no userquerybytes + stats := h.Handler.Statistics(nil) + require.Len(t, stats, 1, "expected only httpd statistic when user-query-bytes-enabled is false") + require.Equal(t, "httpd", stats[0].Name) + }) + + t.Run("tracks bytes for authenticated user", func(t *testing.T) { + h := NewHandlerWithConfig(NewHandlerConfig(WithAuthentication(), WithUserQueryBytes())) + + h.MetaClient.AdminUserExistsFn = func() bool { return true } + h.MetaClient.UserFn = func(username string) (meta.User, error) { + if username == testUserAlice || username == testUserBob { + return &meta.UserInfo{Name: username, Hash: "pass", Admin: true}, nil + } + return nil, meta.ErrUserNotFound + } + h.MetaClient.AuthenticateFn = func(u, p string) (meta.User, error) { + return h.MetaClient.User(u) + } + h.QueryAuthorizer.AuthorizeQueryFn = func(u meta.User, query *influxql.Query, database string) error { + return nil + } + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { + ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} + return nil + } + + // Query as alice + w := httptest.NewRecorder() + h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?u="+testUserAlice+"&p=pass&db=foo&q=SELECT+*+FROM+bar", nil)) + require.Equal(t, http.StatusOK, w.Code) + aliceBytes1 := w.Body.Len() + + // Query as alice again + w = httptest.NewRecorder() + h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?u="+testUserAlice+"&p=pass&db=foo&q=SELECT+*+FROM+bar", nil)) + require.Equal(t, http.StatusOK, w.Code) + + // Query as bob + w = httptest.NewRecorder() + h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?u="+testUserBob+"&p=pass&db=foo&q=SELECT+*+FROM+bar", nil)) + require.Equal(t, http.StatusOK, w.Code) + bobBytes := w.Body.Len() + + // Check statistics - should have httpd + 2 userquerybytes (one per user) + stats := h.Handler.Statistics(nil) + require.Equal(t, "httpd", stats[0].Name) + require.Equal(t, 2, countUserStats(stats), "expected 2 userquerybytes statistics (alice and bob)") + + // Alice made 2 queries + aliceBytes, found := findUserStat(stats, testUserAlice) + require.True(t, found, "expected alice's bytes to be tracked") + require.Equal(t, int64(aliceBytes1*2), aliceBytes, "alice's bytes mismatch") + + // Bob made 1 query + bobBytesActual, found := findUserStat(stats, testUserBob) + require.True(t, found, "expected bob's bytes to be tracked") + require.Equal(t, int64(bobBytes), bobBytesActual, "bob's bytes mismatch") + }) + + t.Run("tracks bytes for anonymous user", func(t *testing.T) { + h := NewHandlerWithConfig(NewHandlerConfig(WithUserQueryBytes())) // no auth required + + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { + ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} + return nil + } + + // Query without authentication + w := httptest.NewRecorder() + h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil)) + require.Equal(t, http.StatusOK, w.Code) + anonBytes := w.Body.Len() + + // Check statistics + stats := h.Handler.Statistics(nil) + require.Equal(t, "httpd", stats[0].Name) + require.Equal(t, 1, countUserStats(stats), "expected 1 userquerybytes statistic") + + anonBytesActual, found := findUserStat(stats, httpd.StatAnonymousUser) + require.True(t, found, "expected anonymous bytes to be tracked") + require.Equal(t, int64(anonBytes), anonBytesActual, "anonymous bytes mismatch") + }) + + t.Run("all queries without auth attributed to anonymous user", func(t *testing.T) { + h := NewHandlerWithConfig(NewHandlerConfig(WithUserQueryBytes())) // no auth required + + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { + ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} + return nil + } + + // Make multiple queries without authentication + const numQueries = 5 + var totalBytes int + for i := 0; i < numQueries; i++ { + w := httptest.NewRecorder() + h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil)) + require.Equal(t, http.StatusOK, w.Code) + totalBytes += w.Body.Len() + } + + // Check statistics - all bytes should be under the anonymous user + stats := h.Handler.Statistics(nil) + require.Equal(t, "httpd", stats[0].Name) + + // Should only have one userquerybytes statistic - the anonymous user + require.Equal(t, 1, countUserStats(stats), "expected only anonymous user when auth is disabled") + + anonBytesActual, found := findUserStat(stats, httpd.StatAnonymousUser) + require.True(t, found, "expected anonymous bytes to be tracked") + require.Equal(t, int64(totalBytes), anonBytesActual, "all queries should be attributed to anonymous user") + }) + + t.Run("tracks bytes for chunked queries", func(t *testing.T) { + h := NewHandlerWithConfig(NewHandlerConfig(WithUserQueryBytes())) + + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { + // Send multiple chunks + ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} + ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})} + return nil + } + + w := httptest.NewRecorder() + h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar&chunked=true", nil)) + require.Equal(t, http.StatusOK, w.Code) + totalBytes := w.Body.Len() + + stats := h.Handler.Statistics(nil) + require.Equal(t, "httpd", stats[0].Name) + require.Equal(t, 1, countUserStats(stats), "expected 1 userquerybytes statistic") + + anonBytesActual, found := findUserStat(stats, httpd.StatAnonymousUser) + require.True(t, found, "expected anonymous bytes to be tracked") + require.Equal(t, int64(totalBytes), anonBytesActual, "chunked query bytes mismatch") + }) + + t.Run("concurrent queries", func(t *testing.T) { + h := NewHandlerWithConfig(NewHandlerConfig(WithAuthentication(), WithUserQueryBytes())) + + h.MetaClient.AdminUserExistsFn = func() bool { return true } + h.MetaClient.UserFn = func(username string) (meta.User, error) { + return &meta.UserInfo{Name: username, Hash: "pass", Admin: true}, nil + } + h.MetaClient.AuthenticateFn = func(u, p string) (meta.User, error) { + return h.MetaClient.User(u) + } + h.QueryAuthorizer.AuthorizeQueryFn = func(u meta.User, query *influxql.Query, database string) error { + return nil + } + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { + ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} + return nil + } + + const numUsers = 5 + const queriesPerUser = 10 + + // First, determine expected bytes per query by running a single query + w := httptest.NewRecorder() + h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?u=warmup&p=pass&db=foo&q=SELECT+*+FROM+bar", nil)) + require.Equal(t, http.StatusOK, w.Code) + expectedBytesPerQuery := int64(w.Body.Len()) + + // Create test data upfront (before spawning goroutines) + type queryRequest struct { + user string + req *http.Request + } + requests := make([]queryRequest, 0, numUsers*queriesPerUser) + for i := 0; i < numUsers; i++ { + user := fmt.Sprintf("user%d", i) + for j := 0; j < queriesPerUser; j++ { + req := MustNewJSONRequest("GET", fmt.Sprintf("/query?u=%s&p=pass&db=foo&q=SELECT+*+FROM+bar", user), nil) + requests = append(requests, queryRequest{user: user, req: req}) + } + } + + var mu sync.RWMutex + var concurrency, maxConcurrency atomic.Int64 + + var wg sync.WaitGroup + mu.Lock() + for _, qr := range requests { + wg.Add(1) + go func(qr queryRequest) { + mu.RLock() + defer mu.RUnlock() + defer wg.Done() + + c := concurrency.Add(1) + if old := maxConcurrency.Load(); c > old { + maxConcurrency.CompareAndSwap(old, c) + } + + w := httptest.NewRecorder() + h.ServeHTTP(w, qr.req) + + concurrency.Add(-1) + }(qr) + } + mu.Unlock() // Release to start all goroutines simultaneously + wg.Wait() + + t.Logf("max concurrency: %d", maxConcurrency.Load()) + + stats := h.Handler.Statistics(nil) + require.Equal(t, "httpd", stats[0].Name) + // Should have numUsers userquerybytes statistics (+ warmup user) + require.Equal(t, numUsers+1, countUserStats(stats), "expected %d userquerybytes statistics", numUsers+1) + + // Verify all users have exact expected byte counts + expectedBytesPerUser := expectedBytesPerQuery * queriesPerUser + for i := 0; i < numUsers; i++ { + user := fmt.Sprintf("user%d", i) + bytes, found := findUserStat(stats, user) + require.True(t, found, "expected user%d's bytes to be tracked", i) + require.Equal(t, expectedBytesPerUser, bytes, "user%d's bytes mismatch", i) + } + }) + + t.Run("statistics use user tag for per-user bytes", func(t *testing.T) { + h := NewHandlerWithConfig(NewHandlerConfig(WithAuthentication(), WithUserQueryBytes())) + + h.MetaClient.AdminUserExistsFn = func() bool { return true } + h.MetaClient.UserFn = func(username string) (meta.User, error) { + return &meta.UserInfo{Name: username, Hash: "pass", Admin: true}, nil + } + h.MetaClient.AuthenticateFn = func(u, p string) (meta.User, error) { + return h.MetaClient.User(u) + } + h.QueryAuthorizer.AuthorizeQueryFn = func(u meta.User, query *influxql.Query, database string) error { + return nil + } + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { + ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} + return nil + } + + // Make queries as different users and track expected bytes + var expectedAliceBytes, expectedBobBytes int + w := httptest.NewRecorder() + h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?u="+testUserAlice+"&p=pass&db=foo&q=SELECT+*+FROM+bar", nil)) + require.Equal(t, http.StatusOK, w.Code) + expectedAliceBytes = w.Body.Len() + + w = httptest.NewRecorder() + h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?u="+testUserBob+"&p=pass&db=foo&q=SELECT+*+FROM+bar", nil)) + require.Equal(t, http.StatusOK, w.Code) + expectedBobBytes = w.Body.Len() + + // Get statistics + stats := h.Handler.Statistics(nil) + require.Equal(t, "httpd", stats[0].Name) + require.Equal(t, 2, countUserStats(stats), "expected 2 userquerybytes statistics") + + // Verify each user has their own statistic with exact byte counts + aliceBytes, found := findUserStat(stats, testUserAlice) + require.True(t, found, "expected alice in statistics") + require.Equal(t, int64(expectedAliceBytes), aliceBytes, "alice bytes mismatch") + + bobBytes, found := findUserStat(stats, testUserBob) + require.True(t, found, "expected bob in statistics") + require.Equal(t, int64(expectedBobBytes), bobBytes, "bob bytes mismatch") + + // Verify tag key is correct + for _, stat := range stats { + if stat.Name == "userquerybytes" { + require.Contains(t, stat.Tags, httpd.StatUserTagKey, "expected user tag key") + require.Contains(t, stat.Values, "userQueryRespBytes", "expected queryRespBytes value") + } + } + }) + + // The following subtests verify the 2x2 matrix of + // (UserQueryBytesEnabled) x (admin status) for visibility of the + // userquerybytes statistic via /debug/vars. + // + // When UserQueryBytesEnabled is true, ALL users' query bytes are tracked + // internally, but the statistics are only accessible to admin users + // (via admin-only endpoints like /debug/vars). + + // hasUserQueryBytesKey returns true if any key in the parsed /debug/vars + // JSON response contains "userquerybytes". + hasUserQueryBytesKey := func(m map[string]interface{}) bool { + for k := range m { + if strings.Contains(k, "userquerybytes") { + return true + } + } + return false + } + + t.Run("enabled=true admin=true: userquerybytes visible in /debug/vars", func(t *testing.T) { + h := NewHandlerWithConfig(NewHandlerConfig(WithAuthentication(), WithPprofAuthEnabled(), WithUserQueryBytes())) + + h.MetaClient.AdminUserExistsFn = func() bool { return true } + h.MetaClient.UserFn = func(username string) (meta.User, error) { + isAdmin := username == testUserAlice // alice is admin, bob is not + return &meta.UserInfo{Name: username, Hash: "pass", Admin: isAdmin}, nil + } + h.MetaClient.AuthenticateFn = func(u, p string) (meta.User, error) { + return h.MetaClient.User(u) + } + h.QueryAuthorizer.AuthorizeQueryFn = func(u meta.User, query *influxql.Query, database string) error { + return nil + } + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { + ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} + return nil + } + // Wire mock monitor to return the handler's real statistics. + h.Monitor.StatisticsFn = func(tags map[string]string) ([]*monitor.Statistic, error) { + handlerStats := h.Handler.Statistics(tags) + out := make([]*monitor.Statistic, len(handlerStats)) + for i, s := range handlerStats { + out[i] = &monitor.Statistic{Statistic: s} + } + return out, nil + } + + // Generate traffic from both admin (alice) and non-admin (bob). + w := httptest.NewRecorder() + h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?u="+testUserAlice+"&p=pass&db=foo&q=SELECT+*+FROM+bar", nil)) + require.Equal(t, http.StatusOK, w.Code) + + w = httptest.NewRecorder() + h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?u="+testUserBob+"&p=pass&db=foo&q=SELECT+*+FROM+bar", nil)) + require.Equal(t, http.StatusOK, w.Code) + + // Admin (alice) accesses /debug/vars — should see userquerybytes. + r := MustNewJSONRequest("GET", "/debug/vars?u="+testUserAlice+"&p=pass", nil) + w = httptest.NewRecorder() + h.ServeHTTP(w, r) + require.Equal(t, http.StatusOK, w.Code) + + var result map[string]interface{} + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &result)) + require.True(t, hasUserQueryBytesKey(result), + "admin should see userquerybytes in /debug/vars when enabled") + }) + + t.Run("enabled=true admin=false: /debug/vars returns 403", func(t *testing.T) { + h := NewHandlerWithConfig(NewHandlerConfig(WithAuthentication(), WithPprofAuthEnabled(), WithUserQueryBytes())) + + h.MetaClient.AdminUserExistsFn = func() bool { return true } + h.MetaClient.UserFn = func(username string) (meta.User, error) { + isAdmin := username == testUserAlice + return &meta.UserInfo{Name: username, Hash: "pass", Admin: isAdmin}, nil + } + h.MetaClient.AuthenticateFn = func(u, p string) (meta.User, error) { + return h.MetaClient.User(u) + } + h.QueryAuthorizer.AuthorizeQueryFn = func(u meta.User, query *influxql.Query, database string) error { + return nil + } + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { + ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} + return nil + } + + // Generate traffic from non-admin bob. + w := httptest.NewRecorder() + h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?u="+testUserBob+"&p=pass&db=foo&q=SELECT+*+FROM+bar", nil)) + require.Equal(t, http.StatusOK, w.Code) + + // Non-admin (bob) accesses /debug/vars — should be rejected. + r := MustNewJSONRequest("GET", "/debug/vars?u="+testUserBob+"&p=pass", nil) + w = httptest.NewRecorder() + h.ServeHTTP(w, r) + require.Equal(t, http.StatusForbidden, w.Code, + "non-admin should not be able to access /debug/vars") + }) + + t.Run("enabled=false admin=true: no userquerybytes in /debug/vars", func(t *testing.T) { + h := NewHandlerWithConfig(NewHandlerConfig(WithAuthentication(), WithPprofAuthEnabled())) // no WithUserQueryBytes + + h.MetaClient.AdminUserExistsFn = func() bool { return true } + h.MetaClient.UserFn = func(username string) (meta.User, error) { + return &meta.UserInfo{Name: username, Hash: "pass", Admin: true}, nil + } + h.MetaClient.AuthenticateFn = func(u, p string) (meta.User, error) { + return h.MetaClient.User(u) + } + h.QueryAuthorizer.AuthorizeQueryFn = func(u meta.User, query *influxql.Query, database string) error { + return nil + } + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { + ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} + return nil + } + h.Monitor.StatisticsFn = func(tags map[string]string) ([]*monitor.Statistic, error) { + handlerStats := h.Handler.Statistics(tags) + out := make([]*monitor.Statistic, len(handlerStats)) + for i, s := range handlerStats { + out[i] = &monitor.Statistic{Statistic: s} + } + return out, nil + } + + // Generate traffic. + w := httptest.NewRecorder() + h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?u="+testUserAlice+"&p=pass&db=foo&q=SELECT+*+FROM+bar", nil)) + require.Equal(t, http.StatusOK, w.Code) + + // Admin accesses /debug/vars — should NOT see userquerybytes. + r := MustNewJSONRequest("GET", "/debug/vars?u="+testUserAlice+"&p=pass", nil) + w = httptest.NewRecorder() + h.ServeHTTP(w, r) + require.Equal(t, http.StatusOK, w.Code) + + var result map[string]interface{} + require.NoError(t, json.Unmarshal(w.Body.Bytes(), &result)) + require.False(t, hasUserQueryBytesKey(result), + "should not see userquerybytes in /debug/vars when disabled") + }) + + t.Run("enabled=false admin=false: /debug/vars returns 403", func(t *testing.T) { + h := NewHandlerWithConfig(NewHandlerConfig(WithAuthentication(), WithPprofAuthEnabled())) // no WithUserQueryBytes + + h.MetaClient.AdminUserExistsFn = func() bool { return true } + h.MetaClient.UserFn = func(username string) (meta.User, error) { + return &meta.UserInfo{Name: username, Hash: "pass", Admin: false}, nil + } + h.MetaClient.AuthenticateFn = func(u, p string) (meta.User, error) { + return h.MetaClient.User(u) + } + + // Non-admin accesses /debug/vars — should be rejected. + r := MustNewJSONRequest("GET", "/debug/vars?u="+testUserBob+"&p=pass", nil) + w := httptest.NewRecorder() + h.ServeHTTP(w, r) + require.Equal(t, http.StatusForbidden, w.Code, + "non-admin should not be able to access /debug/vars") + }) + + // Verify that when enabled, bytes are tracked for ALL users (admin and + // non-admin alike) even though only admins can view the statistics. + t.Run("enabled=true: all users bytes tracked including non-admin", func(t *testing.T) { + h := NewHandlerWithConfig(NewHandlerConfig(WithAuthentication(), WithPprofAuthEnabled(), WithUserQueryBytes())) + + h.MetaClient.AdminUserExistsFn = func() bool { return true } + h.MetaClient.UserFn = func(username string) (meta.User, error) { + isAdmin := username == testUserAlice + return &meta.UserInfo{Name: username, Hash: "pass", Admin: isAdmin}, nil + } + h.MetaClient.AuthenticateFn = func(u, p string) (meta.User, error) { + return h.MetaClient.User(u) + } + h.QueryAuthorizer.AuthorizeQueryFn = func(u meta.User, query *influxql.Query, database string) error { + return nil + } + h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *query.ExecutionContext) error { + ctx.Results <- &query.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})} + return nil + } + + // Generate traffic from admin alice and non-admin bob. + w := httptest.NewRecorder() + h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?u="+testUserAlice+"&p=pass&db=foo&q=SELECT+*+FROM+bar", nil)) + require.Equal(t, http.StatusOK, w.Code) + aliceBytes := w.Body.Len() + + w = httptest.NewRecorder() + h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?u="+testUserBob+"&p=pass&db=foo&q=SELECT+*+FROM+bar", nil)) + require.Equal(t, http.StatusOK, w.Code) + bobBytes := w.Body.Len() + + // Verify via Statistics() that both users are tracked internally. + stats := h.Handler.Statistics(nil) + require.Equal(t, 2, countUserStats(stats), "expected both admin and non-admin tracked") + + aliceBytesActual, found := findUserStat(stats, testUserAlice) + require.True(t, found, "admin alice's bytes should be tracked") + require.Equal(t, int64(aliceBytes), aliceBytesActual) + + bobBytesActual, found := findUserStat(stats, testUserBob) + require.True(t, found, "non-admin bob's bytes should also be tracked") + require.Equal(t, int64(bobBytes), bobBytesActual) + }) +} + // NewHandler represents a test wrapper for httpd.Handler. type Handler struct { *httpd.Handler @@ -2903,6 +3430,12 @@ func WithHeaders(h map[string]string) configOption { } } +func WithUserQueryBytes() configOption { + return func(c *httpd.Config) { + c.UserQueryBytesEnabled = true + } +} + // NewHandlerConfig returns a new instance of httpd.Config with // authentication configured. func NewHandlerConfig(opts ...configOption) httpd.Config { diff --git a/services/httpd/service.go b/services/httpd/service.go index 143f9bdabea..5fb97b43f99 100644 --- a/services/httpd/service.go +++ b/services/httpd/service.go @@ -48,7 +48,12 @@ const ( statFluxQueryRequests = "fluxQueryReq" // Number of flux query requests served. statFluxQueryRequestDuration = "fluxQueryReqDurationNs" // Number of (wall-time) nanoseconds spent executing Flux query requests. statFluxQueryRequestBytesTransmitted = "fluxQueryRespBytes" // Sum of all bytes returned in Flux query responses. + statUserQueryRespBytes = "userQueryRespBytes" // Value field for per-user query response bytes. + // StatUserTagKey is the tag key used to identify users in per-user statistics. + StatUserTagKey = "user" + // StatAnonymousUser is the tag value for unauthenticated users in statistics. + StatAnonymousUser = "(anonymous)" ) // Service manages the listener and handler for an HTTP endpoint. diff --git a/services/meta/query_authorizer_test.go b/services/meta/query_authorizer_test.go new file mode 100644 index 00000000000..93bed53ccdc --- /dev/null +++ b/services/meta/query_authorizer_test.go @@ -0,0 +1,60 @@ +package meta + +import ( + "testing" + + "github.com/influxdata/influxql" + "github.com/stretchr/testify/require" +) + +// newTestClient returns a *Client with the given users pre-populated, +// bypassing bcrypt and disk I/O. +func newTestClient(users []UserInfo) *Client { + c := NewClient(NewConfig()) + c.cacheData.Users = users + return c +} + +func TestQueryAuthorizer_AuthorizeQuery_ShowStats(t *testing.T) { + adminUser := UserInfo{Name: "admin", Admin: true} + regularUser := UserInfo{Name: "user1", Admin: false} + + q, err := influxql.ParseQuery("SHOW STATS") + require.NoError(t, err) + + t.Run("admin user is authorized", func(t *testing.T) { + a := NewQueryAuthorizer(newTestClient([]UserInfo{adminUser, regularUser})) + fa, err := a.AuthorizeQuery(&adminUser, q, "") + require.NoError(t, err) + require.NotNil(t, fa) + }) + + t.Run("non-admin user is denied", func(t *testing.T) { + a := NewQueryAuthorizer(newTestClient([]UserInfo{adminUser, regularUser})) + _, err := a.AuthorizeQuery(®ularUser, q, "") + require.Error(t, err) + require.Contains(t, err.Error(), "requires admin privilege") + }) + + t.Run("nil user is denied", func(t *testing.T) { + a := NewQueryAuthorizer(newTestClient([]UserInfo{adminUser})) + _, err := a.AuthorizeQuery(nil, q, "") + require.Error(t, err) + require.Contains(t, err.Error(), "no user provided") + }) + + t.Run("SHOW STATS FOR module also requires admin", func(t *testing.T) { + qm, err := influxql.ParseQuery("SHOW STATS FOR 'userquerybytes'") + require.NoError(t, err) + + a := NewQueryAuthorizer(newTestClient([]UserInfo{adminUser, regularUser})) + + fa, err := a.AuthorizeQuery(&adminUser, qm, "") + require.NoError(t, err) + require.NotNil(t, fa) + + _, err = a.AuthorizeQuery(®ularUser, qm, "") + require.Error(t, err) + require.Contains(t, err.Error(), "requires admin privilege") + }) +}