Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,9 @@ func (s *Server) appendContinuousQueryService(c continuous_querier.Config) {
srv.QueryExecutor = s.QueryExecutor
srv.Monitor = s.Monitor
s.Services = append(s.Services, srv)
if s.Monitor != nil {
s.Monitor.RegisterDiagnosticsClient("cq", srv)
}
}

// Err returns an error channel that multiplexes all out of band errors received from all services.
Expand Down
1 change: 1 addition & 0 deletions monitor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ func (m *Monitor) Close() error {
m.DeregisterDiagnosticsClient("system")
m.DeregisterDiagnosticsClient("stats")
m.DeregisterDiagnosticsClient("config")
m.DeregisterDiagnosticsClient("cq")
return nil
}

Expand Down
22 changes: 22 additions & 0 deletions monitor/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import (

"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/monitor"
"github.com/influxdata/influxdb/services/continuous_querier"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/toml"
"github.com/influxdata/influxdb/tsdb"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"
)
Expand Down Expand Up @@ -414,6 +416,26 @@ func TestMonitor_QuickClose(t *testing.T) {
}
}

func TestMonitor_CQStatistics(t *testing.T) {
s := monitor.New(nil, monitor.Config{}, &tsdb.Config{})
err := s.Open()
require.NoError(t, err, "monitor open")
defer s.Close()

s.RegisterDiagnosticsClient("cq", continuous_querier.NewService(continuous_querier.NewConfig()))
stats, err := s.Statistics(nil)
require.NoError(t, err, "cq statistics")

for _, stat := range stats {
if stat.Name == "cq" {
require.Equal(t, stat.Values, map[string]interface{}{
"queryOk": 0,
"queryFail": 0,
}, "statistics")
}
}
}

func TestStatistic_ValueNames(t *testing.T) {
statistic := monitor.Statistic{
Statistic: models.Statistic{
Expand Down
7 changes: 7 additions & 0 deletions services/continuous_querier/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/monitor/diagnostics"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxql"
Expand Down Expand Up @@ -169,6 +170,12 @@ func (s *Service) Statistics(tags map[string]string) []models.Statistic {
}}
}

// Required for Monitor interface. Currently does not return any
// diagnostic values.
func (s *Service) Diagnostics() (*diagnostics.Diagnostics, error) {
return &diagnostics.Diagnostics{}, nil
}

// Run runs the specified continuous query, or all CQs if none is specified.
func (s *Service) Run(database, name string, t time.Time) error {
var dbs []meta.DatabaseInfo
Expand Down
26 changes: 26 additions & 0 deletions services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2395,6 +2395,24 @@ func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) {
uniqueKeys := make(map[string]int)

for _, s := range stats {
if s.Name == "cq" {
jv, err := parseCQStatistics(&s.Statistic)
data, err := json.Marshal(jv)
if !first {
_, err := fmt.Fprintln(w, ",")
if err != nil {
h.httpError(w, err.Error(), http.StatusInternalServerError)
return
}
}
first = false
_, err = fmt.Fprintf(w, "\"cq\": %s", data)
if err != nil {
h.httpError(w, err.Error(), http.StatusInternalServerError)
return
}
continue
}
val, err := json.Marshal(s)
if err != nil {
continue
Expand Down Expand Up @@ -2644,6 +2662,14 @@ func parseConfigDiagnostics(d *diagnostics.Diagnostics) (map[string]interface{},
return m, nil
}

func parseCQStatistics(s *models.Statistic) (map[string]interface{}, error) {
if len(s.Values) == 0 {
return nil, fmt.Errorf("no cq statistics data available")
}

return s.Values, nil
}

// httpError writes an error to the client in a standard format.
func (h *Handler) httpError(w http.ResponseWriter, errmsg string, code int) {
if code == http.StatusUnauthorized {
Expand Down