diff --git a/docs/en/administration/monitoring.md b/docs/en/administration/monitoring.md index cfc0813c4ec1..bd3e1cd58249 100644 --- a/docs/en/administration/monitoring.md +++ b/docs/en/administration/monitoring.md @@ -143,9 +143,9 @@ For more information about Prometheus Operator, please check [official document] ### Hadoop -The [JuiceFS Hadoop Java SDK](../deployment/hadoop_java_sdk.md) supports reporting monitoring metrics to [Pushgateway](https://github.com/prometheus/pushgateway) and then letting Prometheus scrape the metrics from Pushgateway. +The [JuiceFS Hadoop Java SDK](../deployment/hadoop_java_sdk.md) supports reporting monitoring metrics to [Pushgateway](https://github.com/prometheus/pushgateway) and [Graphite](http://graphiteapp.org/) -Please enable metrics reporting with the following configuration: +Report metrics to Pushgateway: ```xml @@ -154,7 +154,7 @@ Please enable metrics reporting with the following configuration: ``` -At the same time, the frequency of reporting metrics can be modified through the `juicefs.push-interval` configuration. The default is to report once every 10 seconds. For all configurations supported by JuiceFS Hadoop Java SDK, please refer to [documentation](../deployment/hadoop_java_sdk.md#client-configurations). +At the same time, the frequency of reporting metrics can be modified through the `juicefs.push-interval` configuration. The default is to report once every 10 seconds. :::info According to the suggestion of [Pushgateway official document](https://github.com/prometheus/pushgateway/blob/master/README.md#configure-the-pushgateway-as-a-target-to-scrape), Prometheus's [scrape configuration](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config) needs to set `honor_labels: true`. @@ -176,6 +176,19 @@ $ curl -X PUT http://host:9091/api/v1/admin/wipe For more information about Pushgateway, please check [official document](https://github.com/prometheus/pushgateway/blob/master/README.md). +Report metrics to Graphite: + +```xml + + juicefs.push-graphite + host:port + +``` + +At the same time, the frequency of reporting metrics can be modified through the `juicefs.push-interval` configuration. The default is to report once every 10 seconds. + +For all configurations supported by JuiceFS Hadoop Java SDK, please refer to [documentation](../deployment/hadoop_java_sdk.md#client-configurations). + ### Use Consul as registration center :::note diff --git a/docs/en/deployment/hadoop_java_sdk.md b/docs/en/deployment/hadoop_java_sdk.md index 34029be5dabc..df13b0139258 100644 --- a/docs/en/deployment/hadoop_java_sdk.md +++ b/docs/en/deployment/hadoop_java_sdk.md @@ -158,20 +158,21 @@ Please refer to the following table to set the relevant parameters of the JuiceF #### Other Configurations -| Configuration | Default Value | Description | -| ------------------------- | ------------- | ------------------------------------------------------------ | -| `juicefs.bucket` | | Specify a different endpoint for object storage | -| `juicefs.debug` | `false` | Whether enable debug log | -| `juicefs.access-log` | | Access log path. Ensure Hadoop application has write permission, e.g. `/tmp/juicefs.access.log`. The log file will rotate automatically to keep at most 7 files. | -| `juicefs.superuser` | `hdfs` | The super user | -| `juicefs.users` | `null` | The path of username and UID list file, e.g. `jfs://name/etc/users`. The file format is `:`, one user per line. | +| Configuration | Default Value | Description | +|---------------------------| ------------- |-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `juicefs.bucket` | | Specify a different endpoint for object storage | +| `juicefs.debug` | `false` | Whether enable debug log | +| `juicefs.access-log` | | Access log path. Ensure Hadoop application has write permission, e.g. `/tmp/juicefs.access.log`. The log file will rotate automatically to keep at most 7 files. | +| `juicefs.superuser` | `hdfs` | The super user | +| `juicefs.users` | `null` | The path of username and UID list file, e.g. `jfs://name/etc/users`. The file format is `:`, one user per line. | | `juicefs.groups` | `null` | The path of group name, GID and group members list file, e.g. `jfs://name/etc/groups`. The file format is `::,`, one group per line. | -| `juicefs.umask` | `null` | The umask used when creating files and directories (e.g. `0022`), default value is `fs.permissions.umask-mode`. | -| `juicefs.push-gateway` | | [Prometheus Pushgateway](https://github.com/prometheus/pushgateway) address, format is `:`. | -| `juicefs.push-interval` | 10 | Prometheus push interval in seconds | -| `juicefs.push-auth` | | [Prometheus basic auth](https://prometheus.io/docs/guides/basic-auth) information, format is `:`. | -| `juicefs.fast-resolve` | `true` | Whether enable faster metadata lookup using Redis Lua script | -| `juicefs.no-usage-report` | `false` | Whether disable usage reporting. JuiceFS only collects anonymous usage data (e.g. version number), no user or any sensitive data will be collected. | +| `juicefs.umask` | `null` | The umask used when creating files and directories (e.g. `0022`), default value is `fs.permissions.umask-mode`. | +| `juicefs.push-gateway` | | [Prometheus Pushgateway](https://github.com/prometheus/pushgateway) address, format is `:`. | +| `juicefs.push-auth` | | [Prometheus basic auth](https://prometheus.io/docs/guides/basic-auth) information, format is `:`. | +| `juicefs.push-graphite` | | [Graphite](http://graphiteapp.org/) address, format is `:`. | +| `juicefs.push-interval` | 10 | Metric push interval (in seconds) | +| `juicefs.fast-resolve` | `true` | Whether enable faster metadata lookup using Redis Lua script | +| `juicefs.no-usage-report` | `false` | Whether disable usage reporting. JuiceFS only collects anonymous usage data (e.g. version number), no user or any sensitive data will be collected. | #### Multiple file systems configuration diff --git a/docs/zh_cn/administration/monitoring.md b/docs/zh_cn/administration/monitoring.md index b181e24131ea..e7198c278021 100644 --- a/docs/zh_cn/administration/monitoring.md +++ b/docs/zh_cn/administration/monitoring.md @@ -143,9 +143,9 @@ spec: ### Hadoop -[JuiceFS Hadoop Java SDK](../deployment/hadoop_java_sdk.md) 支持把监控指标上报到 [Pushgateway](https://github.com/prometheus/pushgateway),然后让 Prometheus 从 Pushgateway 抓取指标。 +[JuiceFS Hadoop Java SDK](../deployment/hadoop_java_sdk.md) 支持把监控指标上报到 [Pushgateway](https://github.com/prometheus/pushgateway) 或者 [Graphite](http://graphiteapp.org/) -请用如下配置启用指标上报: +启用指标上报到 Pushgateway : ```xml @@ -154,7 +154,7 @@ spec: ``` -同时可以通过 `juicefs.push-interval` 配置修改上报指标的频率,默认为 10 秒上报一次。JuiceFS Hadoop Java SDK 支持的所有配置参数请参考[文档](../deployment/hadoop_java_sdk.md#客户端配置参数)。 +同时可以通过 `juicefs.push-interval` 配置修改上报指标的频率,默认为 10 秒上报一次。 :::info 说明 根据 [Pushgateway 官方文档](https://github.com/prometheus/pushgateway/blob/master/README.md#configure-the-pushgateway-as-a-target-to-scrape)的建议,Prometheus 的[抓取配置](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config)中需要设置 `honor_labels: true`。 @@ -176,6 +176,19 @@ $ curl -X PUT http://host:9091/api/v1/admin/wipe 有关 Pushgateway 的更多信息,请查看[官方文档](https://github.com/prometheus/pushgateway/blob/master/README.md)。 +启用指标上报到 Graphite : + +```xml + + juicefs.push-graphite + host:port + +``` + +同时可以通过 `juicefs.push-interval` 配置修改上报指标的频率,默认为 10 秒上报一次。 + +JuiceFS Hadoop Java SDK 支持的所有配置参数请参考[文档](../deployment/hadoop_java_sdk.md#客户端配置参数)。 + ### 使用 Consul 作为注册中心 :::note 注意 diff --git a/docs/zh_cn/deployment/hadoop_java_sdk.md b/docs/zh_cn/deployment/hadoop_java_sdk.md index 8d9f82fa218b..d46bfd21d196 100644 --- a/docs/zh_cn/deployment/hadoop_java_sdk.md +++ b/docs/zh_cn/deployment/hadoop_java_sdk.md @@ -158,20 +158,21 @@ $ make win #### 其他配置 -| 配置项 | 默认值 | 描述 | -| ------------------------- | ------- | ------------------------------------------------------------ | -| `juicefs.bucket` | | 为对象存储指定跟格式化时不同的访问地址 | -| `juicefs.debug` | `false` | 是否开启 debug 日志 | -| `juicefs.access-log` | | 访问日志的路径。需要所有应用都有写权限,可以配置为 `/tmp/juicefs.access.log`。该文件会自动轮转,保留最近 7 个文件。 | -| `juicefs.superuser` | `hdfs` | 超级用户 | -| `juicefs.users` | `null` | 用户名以及 UID 列表文件的地址,比如 `jfs://name/etc/users`。文件格式为 `:`,一行一个用户。 | -| `juicefs.groups` | `null` | 用户组、GID 以及组成员列表文件的地址,比如 `jfs://name/etc/groups`。文件格式为 `::,`,一行一个用户组。 | -| `juicefs.umask` | `null` | 创建文件和目录的 umask 值(如 `0022`),如果没有此配置,默认值是 `fs.permissions.umask-mode`。 | -| `juicefs.push-gateway` | | [Prometheus Pushgateway](https://github.com/prometheus/pushgateway) 地址,格式为 `:`。 | -| `juicefs.push-interval` | 10 | 推送数据到 Prometheus 的时间间隔,单位为秒。 | -| `juicefs.push-auth` | | [Prometheus 基本认证](https://prometheus.io/docs/guides/basic-auth)信息,格式为 `:`。 | -| `juicefs.fast-resolve` | `true` | 是否开启快速元数据查找(通过 Redis Lua 脚本实现) | -| `juicefs.no-usage-report` | `false` | 是否上报数据。仅上版本号等使用量数据,不包含任何用户信息。 | +| 配置项 | 默认值 | 描述 | +|-----------------------------| ------- |-------------------------------------------------------------------------------------------------------------| +| `juicefs.bucket` | | 为对象存储指定跟格式化时不同的访问地址 | +| `juicefs.debug` | `false` | 是否开启 debug 日志 | +| `juicefs.access-log` | | 访问日志的路径。需要所有应用都有写权限,可以配置为 `/tmp/juicefs.access.log`。该文件会自动轮转,保留最近 7 个文件。 | +| `juicefs.superuser` | `hdfs` | 超级用户 | +| `juicefs.users` | `null` | 用户名以及 UID 列表文件的地址,比如 `jfs://name/etc/users`。文件格式为 `:`,一行一个用户。 | +| `juicefs.groups` | `null` | 用户组、GID 以及组成员列表文件的地址,比如 `jfs://name/etc/groups`。文件格式为 `::,`,一行一个用户组。 | +| `juicefs.umask` | `null` | 创建文件和目录的 umask 值(如 `0022`),如果没有此配置,默认值是 `fs.permissions.umask-mode`。 | +| `juicefs.push-gateway` | | [Prometheus Pushgateway](https://github.com/prometheus/pushgateway) 地址,格式为 `:`。 | +| `juicefs.push-auth` | | [Prometheus 基本认证](https://prometheus.io/docs/guides/basic-auth)信息,格式为 `:`。 | +| `juicefs.push-graphite` | | [Graphite](http://graphiteapp.org/) 地址,格式为 `:`。 | +| `juicefs.push-interval` | 10 | 指标推送的时间间隔,单位为秒。 | +| `juicefs.fast-resolve` | `true` | 是否开启快速元数据查找(通过 Redis Lua 脚本实现) | +| `juicefs.no-usage-report` | `false` | 是否上报数据。仅上版本号等使用量数据,不包含任何用户信息。 | #### 多文件系统配置 diff --git a/go.mod b/go.mod index 33ee88c48ebe..3f5dd34177de 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/go-redis/redis/v8 v8.4.0 github.com/go-sql-driver/mysql v1.6.0 github.com/gofrs/flock v0.8.1 + github.com/golang/protobuf v1.4.3 github.com/google/btree v1.0.1 github.com/google/gops v0.3.13 github.com/google/uuid v1.1.2 @@ -47,6 +48,7 @@ require ( github.com/pkg/xattr v0.4.4 github.com/prometheus/client_golang v1.9.0 github.com/prometheus/client_model v0.2.0 + github.com/prometheus/common v0.15.0 github.com/qingstor/qingstor-sdk-go/v4 v4.4.0 github.com/qiniu/api.v7/v7 v7.8.0 github.com/satori/go.uuid v1.2.0 @@ -120,7 +122,6 @@ require ( github.com/golang-jwt/jwt v3.2.2+incompatible // indirect github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect - github.com/golang/protobuf v1.4.3 // indirect github.com/golang/snappy v0.0.3 // indirect github.com/gomodule/redigo v1.8.3 // indirect github.com/google/flatbuffers v1.12.1 // indirect @@ -193,7 +194,6 @@ require ( github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd // indirect github.com/pingcap/kvproto v0.0.0-20211122024046-03abd340988f // indirect github.com/pquerna/ffjson v0.0.0-20190930134022-aa0246cd15f7 // indirect - github.com/prometheus/common v0.15.0 // indirect github.com/prometheus/procfs v0.2.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/rivo/uniseg v0.2.0 // indirect diff --git a/sdk/java/libjfs/bridge.go b/sdk/java/libjfs/bridge.go new file mode 100644 index 000000000000..c8672b2b0326 --- /dev/null +++ b/sdk/java/libjfs/bridge.go @@ -0,0 +1,334 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package graphite provides a bridge to push Prometheus metrics to a Graphite +// server. + +//nolint +package main + +import ( + "bufio" + "context" + "errors" + "fmt" + "github.com/golang/protobuf/proto" + "io" + "net" + "sort" + "time" + + "github.com/prometheus/common/expfmt" + "github.com/prometheus/common/model" + + dto "github.com/prometheus/client_model/go" + + "github.com/prometheus/client_golang/prometheus" +) + +const ( + defaultInterval = 15 * time.Second + millisecondsPerSecond = 1000 +) + +// HandlerErrorHandling defines how a Handler serving metrics will handle +// errors. +type HandlerErrorHandling int + +// These constants cause handlers serving metrics to behave as described if +// errors are encountered. +const ( + // Ignore errors and try to push as many metrics to Graphite as possible. + ContinueOnError HandlerErrorHandling = iota + + // Abort the push to Graphite upon the first error encountered. + AbortOnError +) + +// Config defines the Graphite bridge config. +type Config struct { + // Whether to use Graphite tags or not. Defaults to false. + UseTags bool + + // The url to push data to. Required. + URL string + + // The prefix for the pushed Graphite metrics. Defaults to empty string. + Prefix string + + // The interval to use for pushing data to Graphite. Defaults to 15 seconds. + Interval time.Duration + + // The timeout for pushing metrics to Graphite. Defaults to 15 seconds. + Timeout time.Duration + + // The Gatherer to use for metrics. Defaults to prometheus.DefaultGatherer. + Gatherer prometheus.Gatherer + + // The logger that messages are written to. Defaults to no logging. + Logger Logger + + // ErrorHandling defines how errors are handled. Note that errors are + // logged regardless of the configured ErrorHandling provided Logger + // is not nil. + ErrorHandling HandlerErrorHandling + + CommonLabels map[string]string +} + +// Bridge pushes metrics to the configured Graphite server. +type Bridge struct { + useTags bool + url string + prefix string + interval time.Duration + timeout time.Duration + + errorHandling HandlerErrorHandling + logger Logger + + g prometheus.Gatherer + commonLabels map[string]string +} + +// Logger is the minimal interface Bridge needs for logging. Note that +// log.Logger from the standard library implements this interface, and it is +// easy to implement by custom loggers, if they don't do so already anyway. +type Logger interface { + Println(v ...interface{}) +} + +// NewBridge returns a pointer to a new Bridge struct. +func NewBridge(c *Config) (*Bridge, error) { + b := &Bridge{} + + b.useTags = c.UseTags + + if c.URL == "" { + return nil, errors.New("missing URL") + } + b.url = c.URL + + if c.Gatherer == nil { + b.g = prometheus.DefaultGatherer + } else { + b.g = c.Gatherer + } + + if c.Logger != nil { + b.logger = c.Logger + } + + if c.Prefix != "" { + b.prefix = c.Prefix + } + + var z time.Duration + if c.Interval == z { + b.interval = defaultInterval + } else { + b.interval = c.Interval + } + + if c.Timeout == z { + b.timeout = defaultInterval + } else { + b.timeout = c.Timeout + } + + b.errorHandling = c.ErrorHandling + + b.commonLabels = c.CommonLabels + return b, nil +} + +// Run starts the event loop that pushes Prometheus metrics to Graphite at the +// configured interval. +func (b *Bridge) Run(ctx context.Context) { + ticker := time.NewTicker(b.interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + if err := b.Push(); err != nil && b.logger != nil { + b.logger.Println("error pushing to Graphite:", err) + } + case <-ctx.Done(): + return + } + } +} + +// Push pushes Prometheus metrics to the configured Graphite server. +func (b *Bridge) Push() error { + mfs, err := b.g.Gather() + if b.commonLabels != nil { + for _, mf := range mfs { + for _, metric := range mf.Metric { + for k, v := range b.commonLabels { + metric.Label = append(metric.Label, &dto.LabelPair{ + Name: proto.String(k), + Value: proto.String(v), + }) + } + } + } + } + if err != nil || len(mfs) == 0 { + switch b.errorHandling { + case AbortOnError: + return err + case ContinueOnError: + if b.logger != nil { + b.logger.Println("continue on error:", err) + } + default: + panic("unrecognized error handling value") + } + } + + conn, err := net.DialTimeout("tcp", b.url, b.timeout) + if err != nil { + return err + } + defer conn.Close() + + return writeMetrics(conn, mfs, b.useTags, b.prefix, model.Now()) +} + +func writeMetrics(w io.Writer, mfs []*dto.MetricFamily, useTags bool, prefix string, now model.Time) error { + vec, err := expfmt.ExtractSamples(&expfmt.DecodeOptions{ + Timestamp: now, + }, mfs...) + if err != nil { + return err + } + + buf := bufio.NewWriter(w) + for _, s := range vec { + if prefix != "" { + for _, c := range prefix { + if _, err := buf.WriteRune(c); err != nil { + return err + } + } + if err := buf.WriteByte('.'); err != nil { + return err + } + } + if err := writeMetric(buf, s.Metric, useTags); err != nil { + return err + } + if _, err := fmt.Fprintf(buf, " %g %d\n", s.Value, int64(s.Timestamp)/millisecondsPerSecond); err != nil { + return err + } + if err := buf.Flush(); err != nil { + return err + } + } + + return nil +} + +func writeMetric(buf *bufio.Writer, m model.Metric, useTags bool) error { + metricName, hasName := m[model.MetricNameLabel] + numLabels := len(m) - 1 + if !hasName { + numLabels = len(m) + } + + var err error + switch numLabels { + case 0: + if hasName { + return writeSanitized(buf, string(metricName)) + } + default: + if err = writeSanitized(buf, string(metricName)); err != nil { + return err + } + if useTags { + return writeTags(buf, m) + } else { + return writeLabels(buf, m, numLabels) + } + } + return nil +} + +func writeTags(buf *bufio.Writer, m model.Metric) error { + for label, value := range m { + if label != model.MetricNameLabel { + _, _ = buf.WriteRune(';') + if _, err := buf.WriteString(string(label)); err != nil { + return err + } + _, _ = buf.WriteRune('=') + if _, err := buf.WriteString(string(value)); err != nil { + return err + } + } + } + return nil +} + +func writeLabels(buf *bufio.Writer, m model.Metric, numLabels int) error { + labelStrings := make([]string, 0, numLabels) + for label, value := range m { + if label != model.MetricNameLabel { + labelString := string(label) + " " + string(value) + labelStrings = append(labelStrings, labelString) + } + } + sort.Strings(labelStrings) + for _, s := range labelStrings { + if err := buf.WriteByte('.'); err != nil { + return err + } + if err := writeSanitized(buf, s); err != nil { + return err + } + } + return nil +} + +func writeSanitized(buf *bufio.Writer, s string) error { + prevUnderscore := false + + for _, c := range s { + c = replaceInvalidRune(c) + if c == '_' { + if prevUnderscore { + continue + } + prevUnderscore = true + } else { + prevUnderscore = false + } + if _, err := buf.WriteRune(c); err != nil { + return err + } + } + + return nil +} + +func replaceInvalidRune(c rune) rune { + if c == ' ' { + return '.' + } + if !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || c == '_' || c == ':' || c == '-' || (c >= '0' && c <= '9')) { + return '_' + } + return c +} diff --git a/sdk/java/libjfs/bridge_test.go b/sdk/java/libjfs/bridge_test.go new file mode 100644 index 000000000000..06278643d30d --- /dev/null +++ b/sdk/java/libjfs/bridge_test.go @@ -0,0 +1,471 @@ +// Copyright 2018 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "bufio" + "bytes" + "context" + "fmt" + "io" + "log" + "net" + "os" + "reflect" + "regexp" + "sort" + "strings" + "testing" + "time" + + "github.com/prometheus/common/model" + + "github.com/prometheus/client_golang/prometheus" +) + +func TestSanitize(t *testing.T) { + testCases := []struct { + in, out string + }{ + {in: "hello", out: "hello"}, + {in: "hE/l1o", out: "hE_l1o"}, + {in: "he,*ll(.o", out: "he_ll_o"}, + {in: "hello_there%^&", out: "hello_there_"}, + {in: "hell-.o", out: "hell-_o"}, + } + + var buf bytes.Buffer + w := bufio.NewWriter(&buf) + + for i, tc := range testCases { + if err := writeSanitized(w, tc.in); err != nil { + t.Fatalf("write failed: %v", err) + } + if err := w.Flush(); err != nil { + t.Fatalf("flush failed: %v", err) + } + + if want, got := tc.out, buf.String(); want != got { + t.Fatalf("test case index %d: got sanitized string %s, want %s", i, got, want) + } + + buf.Reset() + } +} + +func TestWriteSummary(t *testing.T) { + testWriteSummary(t, false) + testWriteSummary(t, true) +} + +func testWriteSummary(t *testing.T, useTags bool) { + sumVec := prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Name: "name", + Help: "docstring", + ConstLabels: prometheus.Labels{"constname": "constvalue"}, + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }, + []string{"labelname"}, + ) + + sumVec.WithLabelValues("val1").Observe(float64(10)) + sumVec.WithLabelValues("val1").Observe(float64(20)) + sumVec.WithLabelValues("val1").Observe(float64(30)) + sumVec.WithLabelValues("val2").Observe(float64(20)) + sumVec.WithLabelValues("val2").Observe(float64(30)) + sumVec.WithLabelValues("val2").Observe(float64(40)) + + reg := prometheus.NewRegistry() + reg.MustRegister(sumVec) + + mfs, err := reg.Gather() + if err != nil { + t.Fatalf("error: %v", err) + } + + testCases := []struct { + prefix string + }{ + {prefix: "prefix"}, + {prefix: "pre/fix"}, + {prefix: "pre.fix"}, + {prefix: ""}, + } + + var ( + want = `%s.name.constname.constvalue.labelname.val1.quantile.0_5 20 1477043 +%s.name.constname.constvalue.labelname.val1.quantile.0_9 30 1477043 +%s.name.constname.constvalue.labelname.val1.quantile.0_99 30 1477043 +%s.name_sum.constname.constvalue.labelname.val1 60 1477043 +%s.name_count.constname.constvalue.labelname.val1 3 1477043 +%s.name.constname.constvalue.labelname.val2.quantile.0_5 30 1477043 +%s.name.constname.constvalue.labelname.val2.quantile.0_9 40 1477043 +%s.name.constname.constvalue.labelname.val2.quantile.0_99 40 1477043 +%s.name_sum.constname.constvalue.labelname.val2 90 1477043 +%s.name_count.constname.constvalue.labelname.val2 3 1477043 +` + wantTagged = `%s.name;constname=constvalue;labelname=val1;quantile=0.5 20 1477043 +%s.name;constname=constvalue;labelname=val1;quantile=0.9 30 1477043 +%s.name;constname=constvalue;labelname=val1;quantile=0.99 30 1477043 +%s.name_sum;constname=constvalue;labelname=val1 60 1477043 +%s.name_count;constname=constvalue;labelname=val1 3 1477043 +%s.name;constname=constvalue;labelname=val2;quantile=0.5 30 1477043 +%s.name;constname=constvalue;labelname=val2;quantile=0.9 40 1477043 +%s.name;constname=constvalue;labelname=val2;quantile=0.99 40 1477043 +%s.name_sum;constname=constvalue;labelname=val2 90 1477043 +%s.name_count;constname=constvalue;labelname=val2 3 1477043 +` + ) + + if useTags { + want = wantTagged + } + + for i, tc := range testCases { + + now := model.Time(1477043083) + var buf bytes.Buffer + err = writeMetrics(&buf, mfs, useTags, tc.prefix, now) + if err != nil { + t.Fatalf("error: %v", err) + } + + var wantWithPrefix string + if tc.prefix == "" { + wantWithPrefix = strings.ReplaceAll(want, "%s.", "") + } else { + wantWithPrefix = fmt.Sprintf(want, + tc.prefix, tc.prefix, tc.prefix, tc.prefix, tc.prefix, + tc.prefix, tc.prefix, tc.prefix, tc.prefix, tc.prefix, + ) + } + + got := buf.String() + + if err := checkLinesAreEqual(wantWithPrefix, got, useTags); err != nil { + t.Fatalf("test case index %d:\n%s", i, err.Error()) + } + } +} + +func TestWriteHistogram(t *testing.T) { + testWriteHistogram(t, false) + testWriteHistogram(t, true) +} + +func testWriteHistogram(t *testing.T, useTags bool) { + histVec := prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "name", + Help: "docstring", + ConstLabels: prometheus.Labels{"constname": "constvalue"}, + Buckets: []float64{0.01, 0.02, 0.05, 0.1}, + }, + []string{"labelname"}, + ) + + histVec.WithLabelValues("val1").Observe(float64(10)) + histVec.WithLabelValues("val1").Observe(float64(20)) + histVec.WithLabelValues("val1").Observe(float64(30)) + histVec.WithLabelValues("val2").Observe(float64(20)) + histVec.WithLabelValues("val2").Observe(float64(30)) + histVec.WithLabelValues("val2").Observe(float64(40)) + + reg := prometheus.NewRegistry() + reg.MustRegister(histVec) + + mfs, err := reg.Gather() + if err != nil { + t.Fatalf("error: %v", err) + } + + now := model.Time(1477043083) + var buf bytes.Buffer + err = writeMetrics(&buf, mfs, useTags, "prefix", now) + if err != nil { + t.Fatalf("error: %v", err) + } + + var ( + want = `prefix.name_bucket.constname.constvalue.labelname.val1.le.0_01 0 1477043 +prefix.name_bucket.constname.constvalue.labelname.val1.le.0_02 0 1477043 +prefix.name_bucket.constname.constvalue.labelname.val1.le.0_05 0 1477043 +prefix.name_bucket.constname.constvalue.labelname.val1.le.0_1 0 1477043 +prefix.name_sum.constname.constvalue.labelname.val1 60 1477043 +prefix.name_count.constname.constvalue.labelname.val1 3 1477043 +prefix.name_bucket.constname.constvalue.labelname.val1.le._Inf 3 1477043 +prefix.name_bucket.constname.constvalue.labelname.val2.le.0_01 0 1477043 +prefix.name_bucket.constname.constvalue.labelname.val2.le.0_02 0 1477043 +prefix.name_bucket.constname.constvalue.labelname.val2.le.0_05 0 1477043 +prefix.name_bucket.constname.constvalue.labelname.val2.le.0_1 0 1477043 +prefix.name_sum.constname.constvalue.labelname.val2 90 1477043 +prefix.name_count.constname.constvalue.labelname.val2 3 1477043 +prefix.name_bucket.constname.constvalue.labelname.val2.le._Inf 3 1477043 +` + wantTagged = `prefix.name_bucket;constname=constvalue;labelname=val1;le=0.01 0 1477043 +prefix.name_bucket;constname=constvalue;labelname=val1;le=0.02 0 1477043 +prefix.name_bucket;constname=constvalue;labelname=val1;le=0.05 0 1477043 +prefix.name_bucket;constname=constvalue;labelname=val1;le=0.1 0 1477043 +prefix.name_sum;constname=constvalue;labelname=val1 60 1477043 +prefix.name_count;constname=constvalue;labelname=val1 3 1477043 +prefix.name_bucket;constname=constvalue;labelname=val1;le=+Inf 3 1477043 +prefix.name_bucket;constname=constvalue;labelname=val2;le=0.01 0 1477043 +prefix.name_bucket;constname=constvalue;labelname=val2;le=0.02 0 1477043 +prefix.name_bucket;constname=constvalue;labelname=val2;le=0.05 0 1477043 +prefix.name_bucket;constname=constvalue;labelname=val2;le=0.1 0 1477043 +prefix.name_sum;constname=constvalue;labelname=val2 90 1477043 +prefix.name_count;constname=constvalue;labelname=val2 3 1477043 +prefix.name_bucket;constname=constvalue;labelname=val2;le=+Inf 3 1477043 +` + ) + + if useTags { + want = wantTagged + } + + got := buf.String() + + if err := checkLinesAreEqual(want, got, useTags); err != nil { + t.Fatalf(err.Error()) + } +} + +func TestToReader(t *testing.T) { + testToReader(t, false) + testToReader(t, true) +} + +func testToReader(t *testing.T, useTags bool) { + cntVec := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "name", + Help: "docstring", + ConstLabels: prometheus.Labels{"constname": "constvalue"}, + }, + []string{"labelname"}, + ) + cntVec.WithLabelValues("val1").Inc() + cntVec.WithLabelValues("val2").Inc() + + reg := prometheus.NewRegistry() + reg.MustRegister(cntVec) + + var ( + want = `prefix.name.constname.constvalue.labelname.val1 1 1477043 +prefix.name.constname.constvalue.labelname.val2 1 1477043 +` + wantTagged = `prefix.name;constname=constvalue;labelname=val1 1 1477043 +prefix.name;constname=constvalue;labelname=val2 1 1477043 +` + ) + + if useTags { + want = wantTagged + } + + mfs, err := reg.Gather() + if err != nil { + t.Fatalf("error: %v", err) + } + + now := model.Time(1477043083) + var buf bytes.Buffer + err = writeMetrics(&buf, mfs, useTags, "prefix", now) + if err != nil { + t.Fatalf("error: %v", err) + } + + got := buf.String() + + if err := checkLinesAreEqual(want, got, useTags); err != nil { + t.Fatalf(err.Error()) + } +} + +func checkLinesAreEqual(w, g string, useTags bool) error { + if useTags { + taggedLineRegexp := regexp.MustCompile(`;| `) + + wantLines, err := stringToLines(w) + if err != nil { + return err + } + + gotLines, err := stringToLines(g) + if err != nil { + return err + } + + for lineInd := range gotLines { + var log string + // Tagged metric, order of tags doesn't matter + // m1 := "prefix.name;tag1=val1;tag2=val2 3 1477043" + // m2 := "prefix.name;tag2=val2;tag1=val1 3 1477043" + // m1 should be equal to m2 + wantSplit := taggedLineRegexp.Split(wantLines[lineInd], -1) + gotSplit := taggedLineRegexp.Split(gotLines[lineInd], -1) + sort.Strings(wantSplit) + sort.Strings(gotSplit) + + log += fmt.Sprintf("want: %v\ngot: %v\n\n", wantSplit, gotSplit) + + if !reflect.DeepEqual(wantSplit, gotSplit) { + return fmt.Errorf(log) + } + } + return nil + } + + if w != g { + return fmt.Errorf("wanted:\n\n%s\ngot:\n\n%s", w, g) + } + + return nil +} + +func stringToLines(s string) (lines []string, err error) { + scanner := bufio.NewScanner(strings.NewReader(s)) + for scanner.Scan() { + lines = append(lines, scanner.Text()) + } + err = scanner.Err() + return +} + +func TestPush(t *testing.T) { + reg := prometheus.NewRegistry() + cntVec := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "name", + Help: "docstring", + ConstLabels: prometheus.Labels{"constname": "constvalue"}, + }, + []string{"labelname"}, + ) + cntVec.WithLabelValues("val1").Inc() + cntVec.WithLabelValues("val2").Inc() + reg.MustRegister(cntVec) + + host := "localhost" + port := ":56789" + b, err := NewBridge(&Config{ + URL: host + port, + Gatherer: reg, + Prefix: "prefix", + UseTags: true, + CommonLabels: map[string]string{"a": "b"}, + }) + if err != nil { + t.Fatalf("error creating bridge: %v", err) + } + + nmg, err := newMockGraphite(port) + if err != nil { + t.Fatalf("error creating mock graphite: %v", err) + } + defer nmg.Close() + + err = b.Push() + if err != nil { + t.Fatalf("error pushing: %v", err) + } + + wants := []string{ + "prefix.name.constname.constvalue.labelname.val1 1", + "prefix.name.constname.constvalue.labelname.val2 1", + } + + select { + case got := <-nmg.readc: + for _, want := range wants { + matched, err := regexp.MatchString(want, got) + if err != nil { + t.Fatalf("error pushing: %v", err) + } + if !matched { + t.Fatalf("missing metric:\nno match for %s received by server:\n%s", want, got) + } + } + return + case err := <-nmg.errc: + t.Fatalf("error reading push: %v", err) + case <-time.After(50 * time.Millisecond): + t.Fatalf("no result from graphite server") + } +} + +func newMockGraphite(port string) (*mockGraphite, error) { + readc := make(chan string) + errc := make(chan error) + ln, err := net.Listen("tcp", port) + if err != nil { + return nil, err + } + + go func() { + conn, err := ln.Accept() + if err != nil { + errc <- err + } + var b bytes.Buffer + io.Copy(&b, conn) + readc <- b.String() + }() + + return &mockGraphite{ + readc: readc, + errc: errc, + Listener: ln, + }, nil +} + +type mockGraphite struct { + readc chan string + errc chan error + + net.Listener +} + +func ExampleBridge() { + b, err := NewBridge(&Config{ + URL: "graphite.example.org:3099", + Gatherer: prometheus.DefaultGatherer, + Prefix: "prefix", + Interval: 15 * time.Second, + Timeout: 10 * time.Second, + ErrorHandling: AbortOnError, + Logger: log.New(os.Stdout, "graphite bridge: ", log.Lshortfile), + }) + if err != nil { + panic(err) + } + + go func() { + // Start something in a goroutine that uses metrics. + }() + + // Push initial metrics to Graphite. Fail fast if the push fails. + if err := b.Push(); err != nil { + panic(err) + } + + // Create a Context to control stopping the Run() loop that pushes + // metrics to Graphite. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start pushing metrics to Graphite in the Run() loop. + b.Run(ctx) +} diff --git a/sdk/java/libjfs/main.go b/sdk/java/libjfs/main.go index 81a396682b27..34abef3597ea 100644 --- a/sdk/java/libjfs/main.go +++ b/sdk/java/libjfs/main.go @@ -29,6 +29,7 @@ import ( "bytes" "encoding/json" "fmt" + "github.com/prometheus/client_golang/prometheus/push" "io" "net/http" _ "net/http/pprof" @@ -52,8 +53,6 @@ import ( "github.com/juicedata/juicefs/pkg/version" "github.com/juicedata/juicefs/pkg/vfs" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/push" - "github.com/sirupsen/logrus" ) @@ -66,8 +65,10 @@ var ( handlers = make(map[uintptr]*wrapper) activefs = make(map[string][]*wrapper) logger = utils.GetLogger("juicefs") + bOnce sync.Once + bridges []*Bridge + pOnce sync.Once pushers []*push.Pusher - once sync.Once ) const ( @@ -258,6 +259,7 @@ type javaConf struct { PushGateway string `json:"pushGateway"` PushInterval int `json:"pushInterval"` PushAuth string `json:"pushAuth"` + PushGraphite string `json:"pushGraphite"` } func getOrCreate(name, user, group, superuser, supergroup string, f func() *fs.FileSystem) uintptr { @@ -306,6 +308,60 @@ func createStorage(format meta.Format) (object.ObjectStorage, error) { return object.WithPrefix(blob, format.Name+"/"), nil } +func push2Gateway(pushGatewayAddr, pushAuth string, pushInterVal time.Duration, registry *prometheus.Registry, commonLabels map[string]string) { + pusher := push.New(pushGatewayAddr, "juicefs").Gatherer(registry) + for k, v := range commonLabels { + pusher.Grouping(k, v) + } + if pushAuth != "" { + if strings.Contains(pushAuth, ":") { + parts := strings.Split(pushAuth, ":") + pusher.BasicAuth(parts[0], parts[1]) + } + } + pushers = append(pushers, pusher) + + pOnce.Do(func() { + go func() { + for range time.NewTicker(pushInterVal).C { + for _, pusher := range pushers { + if err := pusher.Push(); err != nil { + logger.Warnf("error pushing to PushGateway: %s", err) + } + } + } + }() + }) +} + +func push2Graphite(graphite string, pushInterVal time.Duration, registry *prometheus.Registry, commonLabels map[string]string) { + if bridge, err := NewBridge(&Config{ + URL: graphite, + Gatherer: registry, + UseTags: true, + Timeout: 2 * time.Second, + ErrorHandling: ContinueOnError, + Logger: logger, + CommonLabels: commonLabels, + }); err != nil { + logger.Warnf("NewBridge error:%s", err) + } else { + bridges = append(bridges, bridge) + } + + bOnce.Do(func() { + go func() { + for range time.NewTicker(pushInterVal).C { + for _, brg := range bridges { + if err := brg.Push(); err != nil { + logger.Warnf("error pushing to Graphite: %s", err) + } + } + } + }() + }) +} + //export jfs_init func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) uintptr { name := C.GoString(cname) @@ -350,48 +406,32 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) uintp if err != nil { logger.Fatalf("load setting: %s", err) } - registry := prometheus.NewRegistry() // replace default so only JuiceFS metrics are exposed - registerer := prometheus.WrapRegistererWithPrefix("juicefs_", registry) - if jConf.PushGateway != "" { - registerer.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) - registerer.MustRegister(prometheus.NewGoCollector()) - pusher := push.New(jConf.PushGateway, "juicefs"). - Gatherer(registry). - Grouping("vol_name", name). - Grouping("mp", "sdk-"+strconv.Itoa(os.Getpid())) + var registerer prometheus.Registerer + if jConf.PushGateway != "" || jConf.PushGraphite != "" { + commonLabels := prometheus.Labels{"vol_name": name, "mp": "sdk-" + strconv.Itoa(os.Getpid())} if h, err := os.Hostname(); err == nil { - pusher.Grouping("instance", h) + commonLabels["instance"] = h } else { logger.Warnf("cannot get hostname: %s", err) } - if strings.Contains(jConf.PushAuth, ":") { - parts := strings.Split(jConf.PushAuth, ":") - pusher.BasicAuth(parts[0], parts[1]) - } - pushers = append(pushers, pusher) + registry := prometheus.NewRegistry() + registerer = prometheus.WrapRegistererWithPrefix("juicefs_", registry) + registerer.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) + registerer.MustRegister(prometheus.NewGoCollector()) - interval := time.Second * 10 + var interval time.Duration if jConf.PushInterval > 0 { interval = time.Second * time.Duration(jConf.PushInterval) } - once.Do(func() { - go func() { - for { - time.Sleep(interval) - for _, p := range pushers { - if err := p.Push(); err != nil { - logger.Warnf("push metrics to %s: %s", jConf.PushGateway, err) - } - } - } - }() - }) - + if jConf.PushGraphite != "" { + push2Graphite(jConf.PushGraphite, interval, registry, commonLabels) + } + if jConf.PushGateway != "" { + push2Gateway(jConf.PushGateway, jConf.PushAuth, interval, registry, commonLabels) + } meta.InitMetrics(registerer) vfs.InitMetrics(registerer) go metric.UpdateMetrics(m, registerer) - } else { - registerer = nil } if jConf.Bucket != "" { @@ -582,9 +622,14 @@ func jfs_term(pid int, h uintptr) int { } } } + for _, bridge := range bridges { + if err := bridge.Push(); err != nil { + logger.Warnf("error pushing to Graphite: %s", err) + } + } for _, pusher := range pushers { if err := pusher.Push(); err != nil { - logger.Warnf("push metrics: %s", err) + logger.Warnf("error pushing to PushGatway: %s", err) } } return 0 diff --git a/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java b/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java index f53dd5c536ca..3f215ee79edf 100644 --- a/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java +++ b/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java @@ -4,9 +4,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -323,6 +323,7 @@ public void initialize(URI uri, Configuration conf) throws IOException { obj.put("pushGateway", getConf(conf, "push-gateway", "")); obj.put("pushInterval", Integer.valueOf(getConf(conf, "push-interval", "10"))); obj.put("pushAuth", getConf(conf, "push-auth", "")); + obj.put("pushGraphite", getConf(conf, "push-graphite", "")); obj.put("fastResolve", Boolean.valueOf(getConf(conf, "fast-resolve", "true"))); obj.put("noUsageReport", Boolean.valueOf(getConf(conf, "no-usage-report", "false"))); obj.put("freeSpace", getConf(conf, "free-space", "0.1")); @@ -500,16 +501,16 @@ public static Libjfs loadLibrary() throws IOException { LibraryLoader libjfsLibraryLoader = LibraryLoader.create(Libjfs.class); libjfsLibraryLoader.failImmediately(); String resource = "libjfs.so.gz"; - String name = "libjfs.4.so"; + String name = "libjfs.5.so"; File dir = new File("/tmp"); String os = System.getProperty("os.name"); if (os.toLowerCase().contains("windows")) { resource = "libjfs.dll.gz"; - name = "libjfs3.dll"; + name = "libjfs5.dll"; dir = new File(System.getProperty("java.io.tmpdir")); } else if (os.toLowerCase().contains("mac")) { resource = "libjfs.dylib.gz"; - name = "libjfs.dylib"; + name = "libjfs5.dylib"; } File libFile = new File(dir, name);