Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,9 @@ func (s *Server) appendContinuousQueryService(c continuous_querier.Config) {
srv.QueryExecutor = s.QueryExecutor
srv.Monitor = s.Monitor
s.Services = append(s.Services, srv)
if s.Monitor != nil {
s.Monitor.RegisterDiagnosticsClient("cq", srv)
}
}

// Err returns an error channel that multiplexes all out of band errors received from all services.
Expand Down
1 change: 1 addition & 0 deletions monitor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ func (m *Monitor) Close() error {
m.DeregisterDiagnosticsClient("system")
m.DeregisterDiagnosticsClient("stats")
m.DeregisterDiagnosticsClient("config")
m.DeregisterDiagnosticsClient("cq")
return nil
}

Expand Down
22 changes: 22 additions & 0 deletions monitor/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import (

"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/monitor"
"github.com/influxdata/influxdb/services/continuous_querier"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/toml"
"github.com/influxdata/influxdb/tsdb"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"
)
Expand Down Expand Up @@ -414,6 +416,26 @@ func TestMonitor_QuickClose(t *testing.T) {
}
}

func TestMonitor_CQStatistics(t *testing.T) {
s := monitor.New(nil, monitor.Config{}, &tsdb.Config{})
err := s.Open()
require.NoError(t, err, "monitor open")
defer s.Close()

s.RegisterDiagnosticsClient("cq", continuous_querier.NewService(continuous_querier.NewConfig()))
stats, err := s.Statistics(nil)
require.NoError(t, err, "cq statistics")

for _, stat := range stats {
if stat.Name == "cq" {
require.Equal(t, stat.Values, map[string]interface{}{
"queryOk": 0,
"queryFail": 0,
}, "statistics")
}
}
}

func TestStatistic_ValueNames(t *testing.T) {
statistic := monitor.Statistic{
Statistic: models.Statistic{
Expand Down
7 changes: 7 additions & 0 deletions services/continuous_querier/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/monitor/diagnostics"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxql"
Expand Down Expand Up @@ -169,6 +170,12 @@ func (s *Service) Statistics(tags map[string]string) []models.Statistic {
}}
}

// Required for Monitor interface. Currently does not return any
// diagnostic values.
func (s *Service) Diagnostics() (*diagnostics.Diagnostics, error) {
return &diagnostics.Diagnostics{}, nil
}

// Run runs the specified continuous query, or all CQs if none is specified.
func (s *Service) Run(database, name string, t time.Time) error {
var dbs []meta.DatabaseInfo
Expand Down
35 changes: 35 additions & 0 deletions services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2395,6 +2395,33 @@ func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) {
uniqueKeys := make(map[string]int)

for _, s := range stats {
if s.Name == "cq" {
jv, err := parseCQStatistics(&s.Statistic)
if err != nil {
h.httpError(w, err.Error(), http.StatusInternalServerError)
return
}
data, err := json.Marshal(jv)
if err != nil {
h.httpError(w, err.Error(), http.StatusInternalServerError)
return
}

if !first {
_, err := fmt.Fprintln(w, ",")
if err != nil {
h.httpError(w, err.Error(), http.StatusInternalServerError)
return
}
}
first = false
_, err = fmt.Fprintf(w, "\"cq\": %s", data)
if err != nil {
h.httpError(w, err.Error(), http.StatusInternalServerError)
return
}
continue
}
val, err := json.Marshal(s)
if err != nil {
continue
Expand Down Expand Up @@ -2644,6 +2671,14 @@ func parseConfigDiagnostics(d *diagnostics.Diagnostics) (map[string]interface{},
return m, nil
}

func parseCQStatistics(s *models.Statistic) (map[string]interface{}, error) {
if len(s.Values) == 0 {
return nil, fmt.Errorf("no cq statistics data available")
}

return s.Values, nil
}

// httpError writes an error to the client in a standard format.
func (h *Handler) httpError(w http.ResponseWriter, errmsg string, code int) {
if code == http.StatusUnauthorized {
Expand Down