2323package shed
2424
2525import (
26+ "errors"
27+ "fmt"
28+ "strconv"
29+ "strings"
30+ "time"
31+
2632 "github.com/ethereum/go-ethereum/metrics"
33+ "github.com/ethereum/go-ethereum/swarm/log"
2734 "github.com/syndtr/goleveldb/leveldb"
2835 "github.com/syndtr/goleveldb/leveldb/iterator"
2936 "github.com/syndtr/goleveldb/leveldb/opt"
3037)
3138
32- // The limit for LevelDB OpenFilesCacheCapacity.
33- const openFileLimit = 128
39+ const (
40+ openFileLimit = 128 // The limit for LevelDB OpenFilesCacheCapacity.
41+ writePauseWarningThrottler = 1 * time.Minute
42+ )
3443
3544// DB provides abstractions over LevelDB in order to
3645// implement complex structures using fields and ordered indexes.
3746// It provides a schema functionality to store fields and indexes
3847// information about naming and types.
3948type DB struct {
4049 ldb *leveldb.DB
50+
51+ compTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction
52+ compReadMeter metrics.Meter // Meter for measuring the data read during compaction
53+ compWriteMeter metrics.Meter // Meter for measuring the data written during compaction
54+ writeDelayNMeter metrics.Meter // Meter for measuring the write delay number due to database compaction
55+ writeDelayMeter metrics.Meter // Meter for measuring the write delay duration due to database compaction
56+ diskReadMeter metrics.Meter // Meter for measuring the effective amount of data read
57+ diskWriteMeter metrics.Meter // Meter for measuring the effective amount of data written
58+
59+ quitChan chan chan error // Quit channel to stop the metrics collection before closing the database
4160}
4261
4362// NewDB constructs a new DB and validates the schema
4463// if it exists in database on the given path.
45- func NewDB(path string) (db *DB, err error) {
64+ // metricsPrefix is used for metrics collection for the given DB.
65+ func NewDB(path string, metricsPrefix string) (db *DB, err error) {
4666 ldb, err := leveldb.OpenFile(path, &opt.Options{
4767 OpenFilesCacheCapacity: openFileLimit,
4868 })
@@ -66,6 +86,15 @@ func NewDB(path string) (db *DB, err error) {
6686 return nil, err
6787 }
6888 }
89+
90+ // Configure meters for DB
91+ db.configure(metricsPrefix)
92+
93+ // Create a quit channel for the periodic metrics collector and run it
94+ db.quitChan = make(chan chan error)
95+
96+ go db.meter(10 * time.Second)
97+
6998 return db, nil
7099}
71100
@@ -126,5 +155,175 @@ func (db *DB) WriteBatch(batch *leveldb.Batch) (err error) {
126155
127156// Close closes LevelDB database.
128157func (db *DB) Close() (err error) {
158+ close(db.quitChan)
129159 return db.ldb.Close()
130160}
161+
162+ // Configure configures the database metrics collectors
163+ func (db *DB) configure(prefix string) {
164+ // Initialize all the metrics collector at the requested prefix
165+ db.compTimeMeter = metrics.NewRegisteredMeter(prefix+"compact/time", nil)
166+ db.compReadMeter = metrics.NewRegisteredMeter(prefix+"compact/input", nil)
167+ db.compWriteMeter = metrics.NewRegisteredMeter(prefix+"compact/output", nil)
168+ db.diskReadMeter = metrics.NewRegisteredMeter(prefix+"disk/read", nil)
169+ db.diskWriteMeter = metrics.NewRegisteredMeter(prefix+"disk/write", nil)
170+ db.writeDelayMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/duration", nil)
171+ db.writeDelayNMeter = metrics.NewRegisteredMeter(prefix+"compact/writedelay/counter", nil)
172+ }
173+
174+ func (db *DB) meter(refresh time.Duration) {
175+ // Create the counters to store current and previous compaction values
176+ compactions := make([][]float64, 2)
177+ for i := 0; i < 2; i++ {
178+ compactions[i] = make([]float64, 3)
179+ }
180+ // Create storage for iostats.
181+ var iostats [2]float64
182+
183+ // Create storage and warning log tracer for write delay.
184+ var (
185+ delaystats [2]int64
186+ lastWritePaused time.Time
187+ )
188+
189+ var (
190+ errc chan error
191+ merr error
192+ )
193+
194+ // Iterate ad infinitum and collect the stats
195+ for i := 1; errc == nil && merr == nil; i++ {
196+ // Retrieve the database stats
197+ stats, err := db.ldb.GetProperty("leveldb.stats")
198+ if err != nil {
199+ log.Error("Failed to read database stats", "err", err)
200+ merr = err
201+ continue
202+ }
203+ // Find the compaction table, skip the header
204+ lines := strings.Split(stats, "\n")
205+ for len(lines) > 0 && strings.TrimSpace(lines[0]) != "Compactions" {
206+ lines = lines[1:]
207+ }
208+ if len(lines) <= 3 {
209+ log.Error("Compaction table not found")
210+ merr = errors.New("compaction table not found")
211+ continue
212+ }
213+ lines = lines[3:]
214+
215+ // Iterate over all the table rows, and accumulate the entries
216+ for j := 0; j < len(compactions[i%2]); j++ {
217+ compactions[i%2][j] = 0
218+ }
219+ for _, line := range lines {
220+ parts := strings.Split(line, "|")
221+ if len(parts) != 6 {
222+ break
223+ }
224+ for idx, counter := range parts[3:] {
225+ value, err := strconv.ParseFloat(strings.TrimSpace(counter), 64)
226+ if err != nil {
227+ log.Error("Compaction entry parsing failed", "err", err)
228+ merr = err
229+ continue
230+ }
231+ compactions[i%2][idx] += value
232+ }
233+ }
234+ // Update all the requested meters
235+ if db.compTimeMeter != nil {
236+ db.compTimeMeter.Mark(int64((compactions[i%2][0] - compactions[(i-1)%2][0]) * 1000 * 1000 * 1000))
237+ }
238+ if db.compReadMeter != nil {
239+ db.compReadMeter.Mark(int64((compactions[i%2][1] - compactions[(i-1)%2][1]) * 1024 * 1024))
240+ }
241+ if db.compWriteMeter != nil {
242+ db.compWriteMeter.Mark(int64((compactions[i%2][2] - compactions[(i-1)%2][2]) * 1024 * 1024))
243+ }
244+
245+ // Retrieve the write delay statistic
246+ writedelay, err := db.ldb.GetProperty("leveldb.writedelay")
247+ if err != nil {
248+ log.Error("Failed to read database write delay statistic", "err", err)
249+ merr = err
250+ continue
251+ }
252+ var (
253+ delayN int64
254+ delayDuration string
255+ duration time.Duration
256+ paused bool
257+ )
258+ if n, err := fmt.Sscanf(writedelay, "DelayN:%d Delay:%s Paused:%t", &delayN, &delayDuration, &paused); n != 3 || err != nil {
259+ log.Error("Write delay statistic not found")
260+ merr = err
261+ continue
262+ }
263+ duration, err = time.ParseDuration(delayDuration)
264+ if err != nil {
265+ log.Error("Failed to parse delay duration", "err", err)
266+ merr = err
267+ continue
268+ }
269+ if db.writeDelayNMeter != nil {
270+ db.writeDelayNMeter.Mark(delayN - delaystats[0])
271+ }
272+ if db.writeDelayMeter != nil {
273+ db.writeDelayMeter.Mark(duration.Nanoseconds() - delaystats[1])
274+ }
275+ // If a warning that db is performing compaction has been displayed, any subsequent
276+ // warnings will be withheld for one minute not to overwhelm the user.
277+ if paused && delayN-delaystats[0] == 0 && duration.Nanoseconds()-delaystats[1] == 0 &&
278+ time.Now().After(lastWritePaused.Add(writePauseWarningThrottler)) {
279+ log.Warn("Database compacting, degraded performance")
280+ lastWritePaused = time.Now()
281+ }
282+ delaystats[0], delaystats[1] = delayN, duration.Nanoseconds()
283+
284+ // Retrieve the database iostats.
285+ ioStats, err := db.ldb.GetProperty("leveldb.iostats")
286+ if err != nil {
287+ log.Error("Failed to read database iostats", "err", err)
288+ merr = err
289+ continue
290+ }
291+ var nRead, nWrite float64
292+ parts := strings.Split(ioStats, " ")
293+ if len(parts) < 2 {
294+ log.Error("Bad syntax of ioStats", "ioStats", ioStats)
295+ merr = fmt.Errorf("bad syntax of ioStats %s", ioStats)
296+ continue
297+ }
298+ if n, err := fmt.Sscanf(parts[0], "Read(MB):%f", &nRead); n != 1 || err != nil {
299+ log.Error("Bad syntax of read entry", "entry", parts[0])
300+ merr = err
301+ continue
302+ }
303+ if n, err := fmt.Sscanf(parts[1], "Write(MB):%f", &nWrite); n != 1 || err != nil {
304+ log.Error("Bad syntax of write entry", "entry", parts[1])
305+ merr = err
306+ continue
307+ }
308+ if db.diskReadMeter != nil {
309+ db.diskReadMeter.Mark(int64((nRead - iostats[0]) * 1024 * 1024))
310+ }
311+ if db.diskWriteMeter != nil {
312+ db.diskWriteMeter.Mark(int64((nWrite - iostats[1]) * 1024 * 1024))
313+ }
314+ iostats[0], iostats[1] = nRead, nWrite
315+
316+ // Sleep a bit, then repeat the stats collection
317+ select {
318+ case errc = <-db.quitChan:
319+ // Quit requesting, stop hammering the database
320+ case <-time.After(refresh):
321+ // Timeout, gather a new set of stats
322+ }
323+ }
324+
325+ if errc == nil {
326+ errc = <-db.quitChan
327+ }
328+ errc <- merr
329+ }
0 commit comments