Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/data/gensyncmap/gensyncmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,12 @@ func (m *Map[K, V]) Len() int {
})
return n
}

func (m *Map[K, V]) IsEmpty() bool {
isEmpty := true
m.m.Range(func(_, _ any) bool {
isEmpty = false
return false
})
return isEmpty
}
114 changes: 73 additions & 41 deletions services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/monitor"
"github.com/influxdata/influxdb/monitor/diagnostics"
"github.com/influxdata/influxdb/pkg/data/gensyncmap"
"github.com/influxdata/influxdb/prometheus"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/services/meta"
Expand Down Expand Up @@ -143,12 +144,13 @@ type Handler struct {
Controller Controller
CompilerMappings flux.CompilerMappings

Config *Config
Logger *zap.Logger
CLFLogger *log.Logger
accessLog *os.File
accessLogFilters StatusFilters
stats *Statistics
Config *Config
Logger *zap.Logger
CLFLogger *log.Logger
accessLog *os.File
accessLogFilters StatusFilters
stats *Statistics
queryBytesPerUser gensyncmap.Map[string, *atomic.Int64]

requestTracker *RequestTracker
writeThrottler *Throttler
Expand Down Expand Up @@ -442,38 +444,56 @@ type Statistics struct {

// Statistics returns statistics for periodic monitoring.
func (h *Handler) Statistics(tags map[string]string) []models.Statistic {
values := map[string]interface{}{
statRequest: atomic.LoadInt64(&h.stats.Requests),
statQueryRequest: atomic.LoadInt64(&h.stats.QueryRequests),
statWriteRequest: atomic.LoadInt64(&h.stats.WriteRequests),
statPingRequest: atomic.LoadInt64(&h.stats.PingRequests),
statStatusRequest: atomic.LoadInt64(&h.stats.StatusRequests),
statWriteRequestBytesReceived: atomic.LoadInt64(&h.stats.WriteRequestBytesReceived),
statQueryRequestBytesTransmitted: atomic.LoadInt64(&h.stats.QueryRequestBytesTransmitted),
statPointsWrittenOK: atomic.LoadInt64(&h.stats.PointsWrittenOK),
statPointsWrittenDropped: atomic.LoadInt64(&h.stats.PointsWrittenDropped),
statPointsWrittenFail: atomic.LoadInt64(&h.stats.PointsWrittenFail),
statAuthFail: atomic.LoadInt64(&h.stats.AuthenticationFailures),
statRequestDuration: atomic.LoadInt64(&h.stats.RequestDuration),
statQueryRequestDuration: atomic.LoadInt64(&h.stats.QueryRequestDuration),
statWriteRequestDuration: atomic.LoadInt64(&h.stats.WriteRequestDuration),
statRequestsActive: atomic.LoadInt64(&h.stats.ActiveRequests),
statWriteRequestsActive: atomic.LoadInt64(&h.stats.ActiveWriteRequests),
statClientError: atomic.LoadInt64(&h.stats.ClientErrors),
statServerError: atomic.LoadInt64(&h.stats.ServerErrors),
statRecoveredPanics: atomic.LoadInt64(&h.stats.RecoveredPanics),
statPromWriteRequest: atomic.LoadInt64(&h.stats.PromWriteRequests),
statPromReadRequest: atomic.LoadInt64(&h.stats.PromReadRequests),
statFluxQueryRequests: atomic.LoadInt64(&h.stats.FluxQueryRequests),
statFluxQueryRequestDuration: atomic.LoadInt64(&h.stats.FluxQueryRequestDuration),
statFluxQueryRequestBytesTransmitted: atomic.LoadInt64(&h.stats.FluxQueryRequestBytesTransmitted),
}

// Add per-user query bytes with flattened keys
h.queryBytesPerUser.Range(func(user string, counter *atomic.Int64) bool {
key := "queryRespBytesUser:" + user
if user == "" {
key = "queryRespBytesUser:(anonymous)"
}
values[key] = counter.Load()
return true
})

return []models.Statistic{{
Name: "httpd",
Tags: tags,
Values: map[string]interface{}{
statRequest: atomic.LoadInt64(&h.stats.Requests),
statQueryRequest: atomic.LoadInt64(&h.stats.QueryRequests),
statWriteRequest: atomic.LoadInt64(&h.stats.WriteRequests),
statPingRequest: atomic.LoadInt64(&h.stats.PingRequests),
statStatusRequest: atomic.LoadInt64(&h.stats.StatusRequests),
statWriteRequestBytesReceived: atomic.LoadInt64(&h.stats.WriteRequestBytesReceived),
statQueryRequestBytesTransmitted: atomic.LoadInt64(&h.stats.QueryRequestBytesTransmitted),
statPointsWrittenOK: atomic.LoadInt64(&h.stats.PointsWrittenOK),
statPointsWrittenDropped: atomic.LoadInt64(&h.stats.PointsWrittenDropped),
statPointsWrittenFail: atomic.LoadInt64(&h.stats.PointsWrittenFail),
statAuthFail: atomic.LoadInt64(&h.stats.AuthenticationFailures),
statRequestDuration: atomic.LoadInt64(&h.stats.RequestDuration),
statQueryRequestDuration: atomic.LoadInt64(&h.stats.QueryRequestDuration),
statWriteRequestDuration: atomic.LoadInt64(&h.stats.WriteRequestDuration),
statRequestsActive: atomic.LoadInt64(&h.stats.ActiveRequests),
statWriteRequestsActive: atomic.LoadInt64(&h.stats.ActiveWriteRequests),
statClientError: atomic.LoadInt64(&h.stats.ClientErrors),
statServerError: atomic.LoadInt64(&h.stats.ServerErrors),
statRecoveredPanics: atomic.LoadInt64(&h.stats.RecoveredPanics),
statPromWriteRequest: atomic.LoadInt64(&h.stats.PromWriteRequests),
statPromReadRequest: atomic.LoadInt64(&h.stats.PromReadRequests),
statFluxQueryRequests: atomic.LoadInt64(&h.stats.FluxQueryRequests),
statFluxQueryRequestDuration: atomic.LoadInt64(&h.stats.FluxQueryRequestDuration),
statFluxQueryRequestBytesTransmitted: atomic.LoadInt64(&h.stats.FluxQueryRequestBytesTransmitted),
},
Name: "httpd",
Tags: tags,
Values: values,
}}
}

// addQueryBytesForUser atomically adds bytes to the per-user query bytes counter.
func (h *Handler) addQueryBytesForUser(user string, n int64) {
counter, _ := h.queryBytesPerUser.LoadOrStore(user, &atomic.Int64{})
counter.Add(n)
}

// AddRoutes sets the provided routes on the handler.
func (h *Handler) AddRoutes(routes ...Route) {
for _, r := range routes {
Expand Down Expand Up @@ -782,6 +802,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U
Results: []*query.Result{r},
})
atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(n))
h.addQueryBytesForUser(userName, int64(n))
w.(http.Flusher).Flush()
continue
}
Expand Down Expand Up @@ -873,6 +894,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U
if !chunked {
n, _ := rw.WriteResponse(resp)
atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(n))
h.addQueryBytesForUser(userName, int64(n))
}
}

Expand Down Expand Up @@ -2031,6 +2053,11 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met
return
}

var userName string
if user != nil {
userName = user.ID()
}

respond := func(resp *prompb.ReadResponse) {
data, err := resp.Marshal()
if err != nil {
Expand All @@ -2048,6 +2075,7 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met
}

atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(len(compressed)))
h.addQueryBytesForUser(userName, int64(len(compressed)))
}

ctx := context.Background()
Expand Down Expand Up @@ -2134,6 +2162,11 @@ func (h *Handler) serveFluxQuery(w http.ResponseWriter, r *http.Request, user me
atomic.AddInt64(&h.stats.FluxQueryRequestDuration, time.Since(start).Nanoseconds())
}(time.Now())

var userName string
if user != nil {
userName = user.ID()
}

req, err := decodeQueryRequest(r)
if err != nil {
h.httpError(w, err.Error(), http.StatusBadRequest)
Expand Down Expand Up @@ -2201,14 +2234,13 @@ func (h *Handler) serveFluxQuery(w http.ResponseWriter, r *http.Request, user me
defer results.Release()

n, err = encoder.Encode(w, results)
if err != nil {
if n == 0 {
// If the encoder did not write anything, we can write an error header.
h.httpError(w, err.Error(), http.StatusInternalServerError)
} else {
atomic.AddInt64(&h.stats.FluxQueryRequestBytesTransmitted, int64(n))
}
if err != nil && n == 0 {
// If the encoder did not write anything, we can write an error header.
h.httpError(w, err.Error(), http.StatusInternalServerError)
return
}
atomic.AddInt64(&h.stats.FluxQueryRequestBytesTransmitted, int64(n))
h.addQueryBytesForUser(userName, int64(n))
}
}

Expand Down
Loading
Loading