From c2b1baa7339edb7a131d86e0eb8ccce23fd50f9d Mon Sep 17 00:00:00 2001 From: devanbenz Date: Wed, 27 Aug 2025 17:27:10 -0500 Subject: [PATCH 01/10] feat: Adds statistics measurement for compact-throughput This PR is a work in progress that will calculate and return the percentage of compact-throughput limiter usage in the form of a percentage --- cmd/influxd/run/server.go | 2 ++ monitor/service.go | 22 +++++++++++++++++++++- monitor/stats.go | 33 +++++++++++++++++++++++++++++++++ pkg/limiter/writer.go | 1 + services/httpd/handler.go | 24 ++++++++++++++++++++++++ 5 files changed, 81 insertions(+), 1 deletion(-) create mode 100644 monitor/stats.go diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 8d4af811e2e..7d7b7139cf6 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -513,6 +513,8 @@ func (s *Server) Open() error { return fmt.Errorf("open points writer: %s", err) } + s.Monitor.WithLimiter(s.TSDBStore.EngineOptions.CompactionThroughputLimiter) + for _, service := range s.Services { if err := service.Open(); err != nil { return fmt.Errorf("open service: %s", err) diff --git a/monitor/service.go b/monitor/service.go index 56aa821a235..4d361f2384c 100644 --- a/monitor/service.go +++ b/monitor/service.go @@ -6,6 +6,7 @@ import ( "errors" "expvar" "fmt" + "github.com/influxdata/influxdb/pkg/limiter" "os" "runtime" "sort" @@ -100,7 +101,8 @@ type Monitor struct { // TSDB configuration for diagnostics TSDBConfig *tsdb.Config - Logger *zap.Logger + Logger *zap.Logger + Limiter limiter.Rate } // PointsWriter is a simplified interface for writing the points the monitor gathers. @@ -150,6 +152,16 @@ func (m *Monitor) Open() error { m.RegisterDiagnosticsClient("runtime", &goRuntime{}) m.RegisterDiagnosticsClient("network", &network{}) m.RegisterDiagnosticsClient("system", &system{}) + + if m.Limiter != nil { + m.RegisterDiagnosticsClient("stats", &stats{ + comp: compactThroughputStats{ + limiter: m.Limiter, + burst: m.Limiter.Burst(), + }, + }) + } + if m.TSDBConfig != nil { m.RegisterDiagnosticsClient("config", m.TSDBConfig) } @@ -200,6 +212,12 @@ func (m *Monitor) writePoints(p models.Points) error { return nil } +func (m *Monitor) WithLimiter(limiter limiter.Rate) { + m.mu.Lock() + defer m.mu.Unlock() + m.Limiter = limiter +} + // Close closes the monitor system. func (m *Monitor) Close() error { if !m.open() { @@ -222,6 +240,8 @@ func (m *Monitor) Close() error { m.DeregisterDiagnosticsClient("runtime") m.DeregisterDiagnosticsClient("network") m.DeregisterDiagnosticsClient("system") + m.DeregisterDiagnosticsClient("stats") + m.DeregisterDiagnosticsClient("config") return nil } diff --git a/monitor/stats.go b/monitor/stats.go new file mode 100644 index 00000000000..bfd5e651417 --- /dev/null +++ b/monitor/stats.go @@ -0,0 +1,33 @@ +package monitor + +import ( + "github.com/influxdata/influxdb/monitor/diagnostics" + "github.com/influxdata/influxdb/pkg/limiter" +) + +// stats captures statistics +type stats struct { + comp compactThroughputStats +} + +type compactThroughputStats struct { + limiter limiter.Rate + burst int +} + +func (s *stats) CompactThroughputUsage() float64 { + if s.comp.burst == 0 { + return 0.0 + } + available := s.comp.limiter.Tokens() + return ((float64(s.comp.burst) - available) / float64(s.comp.burst)) * 100 +} + +func (s *stats) Diagnostics() (*diagnostics.Diagnostics, error) { + compactThroughputUsage := s.CompactThroughputUsage() + d := map[string]interface{}{ + "compact-throughput-usage": compactThroughputUsage, + } + + return diagnostics.RowFromMap(d), nil +} diff --git a/pkg/limiter/writer.go b/pkg/limiter/writer.go index 4beb0dfe370..bdc00b268a4 100644 --- a/pkg/limiter/writer.go +++ b/pkg/limiter/writer.go @@ -18,6 +18,7 @@ type Writer struct { type Rate interface { WaitN(ctx context.Context, n int) error Burst() int + Tokens() float64 } func NewRate(bytesPerSec, burstLimit int) Rate { diff --git a/services/httpd/handler.go b/services/httpd/handler.go index d12e9dab857..1195216c99b 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -2327,6 +2327,30 @@ func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) { } } + if val := diags["stats"]; val != nil { + if len(val.Rows) > 0 && len(val.Columns) > 0 { + // Create a map of column names to values + statsMap := make(map[string]interface{}) + for i, col := range val.Columns { + if i < len(val.Rows[0]) { + statsMap[col] = val.Rows[0][i] + } + } + + data, err := json.Marshal(statsMap) + if err != nil { + h.httpError(w, err.Error(), http.StatusInternalServerError) + return + } + + if !first { + fmt.Fprintln(w, ",") + } + first = false + fmt.Fprintf(w, "\"stats\": %s", data) + } + } + // We're going to print some kind of crypto data, we just // need to find the proper source for it. { From 385f35d2f25c6a53d43734ba3644a312ec915050 Mon Sep 17 00:00:00 2001 From: devanbenz Date: Thu, 28 Aug 2025 11:58:10 -0500 Subject: [PATCH 02/10] feat: fmt'ing --- monitor/service.go | 3 ++- services/httpd/handler.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/monitor/service.go b/monitor/service.go index 4d361f2384c..dc23d425134 100644 --- a/monitor/service.go +++ b/monitor/service.go @@ -6,7 +6,6 @@ import ( "errors" "expvar" "fmt" - "github.com/influxdata/influxdb/pkg/limiter" "os" "runtime" "sort" @@ -17,6 +16,7 @@ import ( "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/monitor/diagnostics" + "github.com/influxdata/influxdb/pkg/limiter" "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/tsdb" "go.uber.org/zap" @@ -161,6 +161,7 @@ func (m *Monitor) Open() error { }, }) } + m.Logger.Info("stats burst", zap.Int("burst", m.Limiter.Burst())) if m.TSDBConfig != nil { m.RegisterDiagnosticsClient("config", m.TSDBConfig) diff --git a/services/httpd/handler.go b/services/httpd/handler.go index 1195216c99b..59cc9fce870 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -2336,7 +2336,7 @@ func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) { statsMap[col] = val.Rows[0][i] } } - + data, err := json.Marshal(statsMap) if err != nil { h.httpError(w, err.Error(), http.StatusInternalServerError) From f937361e967f7e673e2b81a2d4d76e3813db906d Mon Sep 17 00:00:00 2001 From: devanbenz Date: Thu, 28 Aug 2025 12:18:12 -0500 Subject: [PATCH 03/10] feat: Round up decimal to have 1 precision --- monitor/stats.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/monitor/stats.go b/monitor/stats.go index bfd5e651417..a018fb7b67f 100644 --- a/monitor/stats.go +++ b/monitor/stats.go @@ -1,6 +1,8 @@ package monitor import ( + "math" + "github.com/influxdata/influxdb/monitor/diagnostics" "github.com/influxdata/influxdb/pkg/limiter" ) @@ -20,7 +22,8 @@ func (s *stats) CompactThroughputUsage() float64 { return 0.0 } available := s.comp.limiter.Tokens() - return ((float64(s.comp.burst) - available) / float64(s.comp.burst)) * 100 + percentage := ((float64(s.comp.burst) - available) / float64(s.comp.burst)) * 100 + return math.Round(percentage*10) / 10 } func (s *stats) Diagnostics() (*diagnostics.Diagnostics, error) { From 29919257b9e661de9cc3446e7e1e6e8a730c85ba Mon Sep 17 00:00:00 2001 From: devanbenz Date: Thu, 28 Aug 2025 13:23:21 -0500 Subject: [PATCH 04/10] feat: Functionality is currently working, added some comments regarding the algorithm to be used MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit the expected output is the following ❯ curl -s -XPOST 'http://localhost:8086/debug/vars' | grep "compact-throughput-usage" "stats": {"compact-throughput-usage":42.1}, where 42.1 is a number in percentage use --- monitor/stats.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/monitor/stats.go b/monitor/stats.go index a018fb7b67f..e94d5a451a9 100644 --- a/monitor/stats.go +++ b/monitor/stats.go @@ -17,11 +17,24 @@ type compactThroughputStats struct { burst int } +// CompactThroughputUsage calculates the percentage of burst capacity currently consumed by compaction. +// +// available = current tokens in the rate limiter bucket (can be negative when in debt) +// burst = maximum tokens the bucket can hold +// usage percentage = ((burst - available) / burst) * 100 func (s *stats) CompactThroughputUsage() float64 { if s.comp.burst == 0 { return 0.0 } available := s.comp.limiter.Tokens() + + // Clamp available tokens + if available < 0 { + available = 0 + } else if available > float64(s.comp.burst) { + available = float64(s.comp.burst) + } + percentage := ((float64(s.comp.burst) - available) / float64(s.comp.burst)) * 100 return math.Round(percentage*10) / 10 } From da85112e77f452ddacfb899495a72031859cf671 Mon Sep 17 00:00:00 2001 From: devanbenz Date: Thu, 28 Aug 2025 13:34:14 -0500 Subject: [PATCH 05/10] feat: remove hanging log message --- monitor/service.go | 1 - 1 file changed, 1 deletion(-) diff --git a/monitor/service.go b/monitor/service.go index dc23d425134..bee0846873e 100644 --- a/monitor/service.go +++ b/monitor/service.go @@ -161,7 +161,6 @@ func (m *Monitor) Open() error { }, }) } - m.Logger.Info("stats burst", zap.Int("burst", m.Limiter.Burst())) if m.TSDBConfig != nil { m.RegisterDiagnosticsClient("config", m.TSDBConfig) From cdf4c13f8cf4588eeabaf303ded4843590ffcbed Mon Sep 17 00:00:00 2001 From: devanbenz Date: Thu, 28 Aug 2025 14:01:43 -0500 Subject: [PATCH 06/10] feat: Add test for stats monitor --- monitor/stats_test.go | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 monitor/stats_test.go diff --git a/monitor/stats_test.go b/monitor/stats_test.go new file mode 100644 index 00000000000..b23f0fd486d --- /dev/null +++ b/monitor/stats_test.go @@ -0,0 +1,31 @@ +package monitor_test + +import ( + "testing" + + "github.com/influxdata/influxdb/monitor" + "github.com/influxdata/influxdb/pkg/limiter" + "github.com/influxdata/influxdb/tsdb" + "github.com/stretchr/testify/require" +) + +func TestDiagnostics_Stats(t *testing.T) { + s := monitor.New(nil, monitor.Config{}, &tsdb.Config{}) + compactLimiter := limiter.NewRate(100, 100) + + s.WithLimiter(compactLimiter) + + require.NoError(t, s.Open(), "opening monitor") + defer func() { + require.NoError(t, s.Close(), "closing monitor") + }() + + d, err := s.Diagnostics() + require.NoError(t, err, "getting diagnostics") + + diags, ok := d["stats"] + require.True(t, ok, "expected stats diagnostic client to be registered") + + got, exp := diags.Columns, []string{"compact-throughput-usage"} + require.Equal(t, exp, got) +} From 899948b7fe554fd4618f7b6140db622c177739c6 Mon Sep 17 00:00:00 2001 From: Devan Date: Thu, 18 Sep 2025 16:00:45 -0500 Subject: [PATCH 07/10] feat: Modify compaction throughput usage calculation --- cmd/influxd/run/server.go | 2 +- monitor/service.go | 3 +-- monitor/stats.go | 34 ++++++++++++---------------------- monitor/stats_test.go | 2 +- pkg/limiter/writer.go | 1 + 5 files changed, 16 insertions(+), 26 deletions(-) diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 7d7b7139cf6..e6e38b0c3b1 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -513,7 +513,7 @@ func (s *Server) Open() error { return fmt.Errorf("open points writer: %s", err) } - s.Monitor.WithLimiter(s.TSDBStore.EngineOptions.CompactionThroughputLimiter) + s.Monitor.WithCompactThroughputLimiter(s.TSDBStore.EngineOptions.CompactionThroughputLimiter) for _, service := range s.Services { if err := service.Open(); err != nil { diff --git a/monitor/service.go b/monitor/service.go index bee0846873e..0283e796a72 100644 --- a/monitor/service.go +++ b/monitor/service.go @@ -157,7 +157,6 @@ func (m *Monitor) Open() error { m.RegisterDiagnosticsClient("stats", &stats{ comp: compactThroughputStats{ limiter: m.Limiter, - burst: m.Limiter.Burst(), }, }) } @@ -212,7 +211,7 @@ func (m *Monitor) writePoints(p models.Points) error { return nil } -func (m *Monitor) WithLimiter(limiter limiter.Rate) { +func (m *Monitor) WithCompactThroughputLimiter(limiter limiter.Rate) { m.mu.Lock() defer m.mu.Unlock() m.Limiter = limiter diff --git a/monitor/stats.go b/monitor/stats.go index e94d5a451a9..601e9aa1abd 100644 --- a/monitor/stats.go +++ b/monitor/stats.go @@ -1,10 +1,11 @@ package monitor import ( - "math" - + "fmt" "github.com/influxdata/influxdb/monitor/diagnostics" "github.com/influxdata/influxdb/pkg/limiter" + "golang.org/x/time/rate" + "strconv" ) // stats captures statistics @@ -14,35 +15,24 @@ type stats struct { type compactThroughputStats struct { limiter limiter.Rate - burst int } // CompactThroughputUsage calculates the percentage of burst capacity currently consumed by compaction. -// -// available = current tokens in the rate limiter bucket (can be negative when in debt) -// burst = maximum tokens the bucket can hold -// usage percentage = ((burst - available) / burst) * 100 func (s *stats) CompactThroughputUsage() float64 { - if s.comp.burst == 0 { - return 0.0 - } - available := s.comp.limiter.Tokens() - - // Clamp available tokens - if available < 0 { - available = 0 - } else if available > float64(s.comp.burst) { - available = float64(s.comp.burst) - } - - percentage := ((float64(s.comp.burst) - available) / float64(s.comp.burst)) * 100 - return math.Round(percentage*10) / 10 + percentage := 100 * (1 - rate.Limit(s.comp.limiter.Tokens())/s.comp.limiter.Limit()) + return float64(percentage) } func (s *stats) Diagnostics() (*diagnostics.Diagnostics, error) { compactThroughputUsage := s.CompactThroughputUsage() + i := fmt.Sprintf("%.2f", compactThroughputUsage) + compactThroughputUsageTrunc, err := strconv.ParseFloat(i, 2) + if err != nil { + return nil, err + } + d := map[string]interface{}{ - "compact-throughput-usage": compactThroughputUsage, + "compact-throughput-usage-percentage": compactThroughputUsageTrunc, } return diagnostics.RowFromMap(d), nil diff --git a/monitor/stats_test.go b/monitor/stats_test.go index b23f0fd486d..d1b293de56d 100644 --- a/monitor/stats_test.go +++ b/monitor/stats_test.go @@ -13,7 +13,7 @@ func TestDiagnostics_Stats(t *testing.T) { s := monitor.New(nil, monitor.Config{}, &tsdb.Config{}) compactLimiter := limiter.NewRate(100, 100) - s.WithLimiter(compactLimiter) + s.WithCompactThroughputLimiter(compactLimiter) require.NoError(t, s.Open(), "opening monitor") defer func() { diff --git a/pkg/limiter/writer.go b/pkg/limiter/writer.go index bdc00b268a4..39816550aaf 100644 --- a/pkg/limiter/writer.go +++ b/pkg/limiter/writer.go @@ -19,6 +19,7 @@ type Rate interface { WaitN(ctx context.Context, n int) error Burst() int Tokens() float64 + Limit() rate.Limit } func NewRate(bytesPerSec, burstLimit int) Rate { From d9d3a12afbb8e3ea956e196855c0189892773597 Mon Sep 17 00:00:00 2001 From: Devan Date: Thu, 18 Sep 2025 16:05:07 -0500 Subject: [PATCH 08/10] fix: checkfmt --- monitor/stats.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/monitor/stats.go b/monitor/stats.go index 601e9aa1abd..c507b17b4c6 100644 --- a/monitor/stats.go +++ b/monitor/stats.go @@ -2,10 +2,11 @@ package monitor import ( "fmt" + "strconv" + "github.com/influxdata/influxdb/monitor/diagnostics" "github.com/influxdata/influxdb/pkg/limiter" "golang.org/x/time/rate" - "strconv" ) // stats captures statistics From 538fed55aa399dbf1b16972061b9b0a68ec831fd Mon Sep 17 00:00:00 2001 From: Devan Date: Thu, 18 Sep 2025 16:13:34 -0500 Subject: [PATCH 09/10] fix: update test --- monitor/stats_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/monitor/stats_test.go b/monitor/stats_test.go index d1b293de56d..1971fa3f42b 100644 --- a/monitor/stats_test.go +++ b/monitor/stats_test.go @@ -26,6 +26,6 @@ func TestDiagnostics_Stats(t *testing.T) { diags, ok := d["stats"] require.True(t, ok, "expected stats diagnostic client to be registered") - got, exp := diags.Columns, []string{"compact-throughput-usage"} + got, exp := diags.Columns, []string{"compact-throughput-usage-percentage"} require.Equal(t, exp, got) } From c68dd6aa4221ba3f2d368c14f0396523719b5ea2 Mon Sep 17 00:00:00 2001 From: Devan Date: Wed, 24 Sep 2025 13:35:55 -0500 Subject: [PATCH 10/10] feat: Use math.Round for precision --- monitor/stats.go | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/monitor/stats.go b/monitor/stats.go index c507b17b4c6..b49eb87078f 100644 --- a/monitor/stats.go +++ b/monitor/stats.go @@ -1,8 +1,7 @@ package monitor import ( - "fmt" - "strconv" + "math" "github.com/influxdata/influxdb/monitor/diagnostics" "github.com/influxdata/influxdb/pkg/limiter" @@ -26,12 +25,7 @@ func (s *stats) CompactThroughputUsage() float64 { func (s *stats) Diagnostics() (*diagnostics.Diagnostics, error) { compactThroughputUsage := s.CompactThroughputUsage() - i := fmt.Sprintf("%.2f", compactThroughputUsage) - compactThroughputUsageTrunc, err := strconv.ParseFloat(i, 2) - if err != nil { - return nil, err - } - + compactThroughputUsageTrunc := math.Round(compactThroughputUsage*100.0) / 100.0 d := map[string]interface{}{ "compact-throughput-usage-percentage": compactThroughputUsageTrunc, }