Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.8.0
2.8.0
189 changes: 189 additions & 0 deletions collector/remote_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"path"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
)

// Labels for remote info metrics
var defaulRemoteInfoLabels = []string{"remote_cluster"}
var defaultRemoteInfoLabelValues = func(remote_cluster string) []string {
return []string{
remote_cluster,
}
}

type remoteInfoMetric struct {
Type prometheus.ValueType
Desc *prometheus.Desc
Value func(remoteStats RemoteCluster) float64
Labels func(remote_cluster string) []string
}

// RemoteInfo information struct
type RemoteInfo struct {
logger log.Logger
client *http.Client
url *url.URL

up prometheus.Gauge
totalScrapes, jsonParseFailures prometheus.Counter

remoteInfoMetrics []*remoteInfoMetric
}

// NewClusterSettings defines Cluster Settings Prometheus metrics
func NewRemoteInfo(logger log.Logger, client *http.Client, url *url.URL) *RemoteInfo {

return &RemoteInfo{
logger: logger,
client: client,
url: url,

up: prometheus.NewGauge(prometheus.GaugeOpts{
Name: prometheus.BuildFQName(namespace, "remote_info_stats", "up"),
Help: "Was the last scrape of the ElasticSearch remote info endpoint successful.",
}),
totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{
Name: prometheus.BuildFQName(namespace, "remote_info_stats", "total_scrapes"),
Help: "Current total ElasticSearch remote info scrapes.",
}),
jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{
Name: prometheus.BuildFQName(namespace, "remote_info_stats", "json_parse_failures"),
Help: "Number of errors while parsing JSON.",
}),
// Send all of the remote metrics
remoteInfoMetrics: []*remoteInfoMetric{
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "remote_info", "num_nodes_connected"),
"Number of nodes connected", defaulRemoteInfoLabels, nil,
),
Value: func(remoteStats RemoteCluster) float64 {
return float64(remoteStats.NumNodesConnected)
},
Labels: defaultRemoteInfoLabelValues,
},
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "remote_info", "num_proxy_sockets_connected"),
"Number of proxy sockets connected", defaulRemoteInfoLabels, nil,
),
Value: func(remoteStats RemoteCluster) float64 {
return float64(remoteStats.NumProxySocketsConnected)
},
Labels: defaultRemoteInfoLabelValues,
},
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "remote_info", "max_connections_per_cluster"),
"Max connections per cluster", defaulRemoteInfoLabels, nil,
),
Value: func(remoteStats RemoteCluster) float64 {
return float64(remoteStats.MaxConnectionsPerCluster)
},
Labels: defaultRemoteInfoLabelValues,
},
},
}
}

func (c *RemoteInfo) fetchAndDecodeRemoteInfoStats() (RemoteInfoResponse, error) {
var rir RemoteInfoResponse

u := *c.url
u.Path = path.Join(u.Path, "/_remote/info")

res, err := c.client.Get(u.String())
if err != nil {
return rir, fmt.Errorf("failed to get remote info from %s://%s:%s%s: %s",
u.Scheme, u.Hostname(), u.Port(), u.Path, err)
}

defer func() {
err = res.Body.Close()
if err != nil {
_ = level.Warn(c.logger).Log(
"msg", "failed to close http.Client",
"err", err,
)
}
}()

if res.StatusCode != http.StatusOK {
return rir, fmt.Errorf("HTTP Request failed with code %d", res.StatusCode)
}

if err := json.NewDecoder(res.Body).Decode(&rir); err != nil {
c.jsonParseFailures.Inc()
return rir, err
}
return rir, nil
}

// Collect gets remote info values
func (ri *RemoteInfo) Collect(ch chan<- prometheus.Metric) {
ri.totalScrapes.Inc()
defer func() {
ch <- ri.up
ch <- ri.totalScrapes
ch <- ri.jsonParseFailures
}()

remoteInfoResp, err := ri.fetchAndDecodeRemoteInfoStats()
if err != nil {
ri.up.Set(0)
_ = level.Warn(ri.logger).Log(
"msg", "failed to fetch and decode remote info",
"err", err,
)
return
}
ri.totalScrapes.Inc()
ri.up.Set(1)

// Remote Info
for remote_cluster, remoteInfo := range remoteInfoResp {
for _, metric := range ri.remoteInfoMetrics {
ch <- prometheus.MustNewConstMetric(
metric.Desc,
metric.Type,
metric.Value(remoteInfo),
metric.Labels(remote_cluster)...,
)
}
}
}

// Describe add Indices metrics descriptions
func (ri *RemoteInfo) Describe(ch chan<- *prometheus.Desc) {
for _, metric := range ri.remoteInfoMetrics {
ch <- metric.Desc
}
ch <- ri.up.Desc()
ch <- ri.totalScrapes.Desc()
ch <- ri.jsonParseFailures.Desc()
}
28 changes: 28 additions & 0 deletions collector/remote_info_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

// RemoteInfoResponse is a representation of a Elasticsearch _remote/info
type RemoteInfoResponse map[string]RemoteCluster

// RemoteClsuter defines the struct of the tree for the Remote Cluster
type RemoteCluster struct {
Seeds []string `json:"seeds"`
Connected bool `json:"connected"`
NumNodesConnected int64 `json:"num_nodes_connected"`
NumProxySocketsConnected int64 `json:"num_proxy_sockets_connected"`
MaxConnectionsPerCluster int64 `json:"max_connections_per_cluster"`
InitialConnectTimeout string `json:"initial_connect_timeout"`
SkipUnavailable bool `json:"skip_unavailable"`
}
56 changes: 56 additions & 0 deletions collector/remote_info_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"io"
"net/http"
"net/http/httptest"
"net/url"
"os"
"testing"

"github.com/go-kit/log"
)

func TestRemoteInfoStats(t *testing.T) {
// Testcases created using:
// docker run -d -p 9200:9200 elasticsearch:VERSION-alpine
// curl http://localhost:9200/_cluster/settings/?include_defaults=true
files := []string{"../fixtures/settings-5.4.2.json", "../fixtures/settings-merge-5.4.2.json"}
for _, filename := range files {
f, _ := os.Open(filename)
defer f.Close()
for hn, handler := range map[string]http.Handler{
"plain": http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
io.Copy(w, f)
}),
} {
ts := httptest.NewServer(handler)
defer ts.Close()

u, err := url.Parse(ts.URL)
if err != nil {
t.Fatalf("Failed to parse URL: %s", err)
}
c := NewRemoteInfo(log.NewNopLogger(), http.DefaultClient, u)
nsr, err := c.fetchAndDecodeRemoteInfoStats()
if err != nil {
t.Fatalf("Failed to fetch or decode remote info stats: %s", err)
}
t.Logf("[%s/%s] Remote Info Stats Response: %+v", hn, filename, nsr)

}
}
}
8 changes: 8 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ func main() {
esInsecureSkipVerify = kingpin.Flag("es.ssl-skip-verify",
"Skip SSL verification when connecting to Elasticsearch.").
Default("false").Bool()
esExportRemoteInfo = kingpin.Flag("es.remote_info",
"Export stats associated with configured remote clusters.").
Default("false").Envar("ES_REMOTE_INFO").Bool()
logLevel = kingpin.Flag("log.level",
"Sets the loglevel. Valid levels are debug, info, warn, error").
Default("info").String()
Expand Down Expand Up @@ -234,6 +237,11 @@ func main() {
prometheus.MustRegister(collector.NewIlmIndicies(logger, httpClient, esURL))
}

if *esExportRemoteInfo {
// Create Remote info Collector
prometheus.MustRegister(collector.NewRemoteInfo(logger, httpClient, esURL))
}

// Create a context that is cancelled on SIGKILL or SIGINT.
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer cancel()
Expand Down