@@ -22,6 +22,8 @@ package leveldb
2222
2323import (
2424 "fmt"
25+ "strconv"
26+ "strings"
2527 "sync"
2628 "time"
2729
@@ -75,8 +77,6 @@ type Database struct {
7577 seekCompGauge metrics.Gauge // Gauge for tracking the number of table compaction caused by read opt
7678 manualMemAllocGauge metrics.Gauge // Gauge to track the amount of memory that has been manually allocated (not a part of runtime/GC)
7779
78- levelsGauge [7 ]metrics.Gauge // Gauge for tracking the number of tables in levels
79-
8080 quitLock sync.Mutex // Mutex protecting the quit channel access
8181 quitChan chan chan error // Quit channel to stop the metrics collection before closing the database
8282
@@ -146,11 +146,6 @@ func NewCustom(file string, namespace string, customize func(options *opt.Option
146146 ldb .seekCompGauge = metrics .NewRegisteredGauge (namespace + "compact/seek" , nil )
147147 ldb .manualMemAllocGauge = metrics .NewRegisteredGauge (namespace + "memory/manualalloc" , nil )
148148
149- // leveldb has only up to 7 levels
150- for i := range ldb .levelsGauge {
151- ldb .levelsGauge [i ] = metrics .NewRegisteredGauge (namespace + fmt .Sprintf ("tables/level%v" , i ), nil )
152- }
153-
154149 // Start up the metrics gathering and return
155150 go ldb .meter (metricsGatheringInterval )
156151 return ldb , nil
@@ -271,63 +266,122 @@ func (db *Database) Path() string {
271266
272267// meter periodically retrieves internal leveldb counters and reports them to
273268// the metrics subsystem.
269+ //
270+ // This is how a LevelDB stats table looks like (currently):
271+ //
272+ // Compactions
273+ // Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)
274+ // -------+------------+---------------+---------------+---------------+---------------
275+ // 0 | 0 | 0.00000 | 1.27969 | 0.00000 | 12.31098
276+ // 1 | 85 | 109.27913 | 28.09293 | 213.92493 | 214.26294
277+ // 2 | 523 | 1000.37159 | 7.26059 | 66.86342 | 66.77884
278+ // 3 | 570 | 1113.18458 | 0.00000 | 0.00000 | 0.00000
279+ //
280+ // This is how the write delay look like (currently):
281+ // DelayN:5 Delay:406.604657ms Paused: false
282+ //
283+ // This is how the iostats look like (currently):
284+ // Read(MB):3895.04860 Write(MB):3654.64712
274285func (db * Database ) meter (refresh time.Duration ) {
275286 // Create the counters to store current and previous compaction values
276- compactions := make ([][]int64 , 2 )
287+ compactions := make ([][]float64 , 2 )
277288 for i := 0 ; i < 2 ; i ++ {
278- compactions [i ] = make ([]int64 , 4 )
289+ compactions [i ] = make ([]float64 , 4 )
279290 }
280- // Create storages for states and warning log tracer.
281- var (
282- errc chan error
283- merr error
291+ // Create storage for iostats.
292+ var iostats [2 ]float64
284293
285- stats leveldb. DBStats
286- iostats [ 2 ] int64
294+ // Create storage and warning log tracer for write delay.
295+ var (
287296 delaystats [2 ]int64
288297 lastWritePaused time.Time
289298 )
299+
300+ var (
301+ errc chan error
302+ merr error
303+ )
304+
290305 timer := time .NewTimer (refresh )
291306 defer timer .Stop ()
292307
293308 // Iterate ad infinitum and collect the stats
294309 for i := 1 ; errc == nil && merr == nil ; i ++ {
295310 // Retrieve the database stats
296- // Stats method resets buffers inside therefore it's okay to just pass the struct.
297- err := db .db .Stats (& stats )
311+ stats , err := db .db .GetProperty ("leveldb.stats" )
298312 if err != nil {
299313 db .log .Error ("Failed to read database stats" , "err" , err )
300314 merr = err
301315 continue
302316 }
317+ // Find the compaction table, skip the header
318+ lines := strings .Split (stats , "\n " )
319+ for len (lines ) > 0 && strings .TrimSpace (lines [0 ]) != "Compactions" {
320+ lines = lines [1 :]
321+ }
322+ if len (lines ) <= 3 {
323+ db .log .Error ("Compaction leveldbTable not found" )
324+ merr = errors .New ("compaction leveldbTable not found" )
325+ continue
326+ }
327+ lines = lines [3 :]
328+
303329 // Iterate over all the leveldbTable rows, and accumulate the entries
304330 for j := 0 ; j < len (compactions [i % 2 ]); j ++ {
305331 compactions [i % 2 ][j ] = 0
306332 }
307- compactions [i % 2 ][0 ] = stats .LevelSizes .Sum ()
308- for _ , t := range stats .LevelDurations {
309- compactions [i % 2 ][1 ] += t .Nanoseconds ()
333+ for _ , line := range lines {
334+ parts := strings .Split (line , "|" )
335+ if len (parts ) != 6 {
336+ break
337+ }
338+ for idx , counter := range parts [2 :] {
339+ value , err := strconv .ParseFloat (strings .TrimSpace (counter ), 64 )
340+ if err != nil {
341+ db .log .Error ("Compaction entry parsing failed" , "err" , err )
342+ merr = err
343+ continue
344+ }
345+ compactions [i % 2 ][idx ] += value
346+ }
310347 }
311- compactions [i % 2 ][2 ] = stats .LevelRead .Sum ()
312- compactions [i % 2 ][3 ] = stats .LevelWrite .Sum ()
313348 // Update all the requested meters
314349 if db .diskSizeGauge != nil {
315- db .diskSizeGauge .Update (compactions [i % 2 ][0 ])
350+ db .diskSizeGauge .Update (int64 ( compactions [i % 2 ][0 ] * 1024 * 1024 ) )
316351 }
317352 if db .compTimeMeter != nil {
318- db .compTimeMeter .Mark (compactions [i % 2 ][1 ] - compactions [(i - 1 )% 2 ][1 ])
353+ db .compTimeMeter .Mark (int64 (( compactions [i % 2 ][1 ] - compactions [(i - 1 )% 2 ][1 ]) * 1000 * 1000 * 1000 ) )
319354 }
320355 if db .compReadMeter != nil {
321- db .compReadMeter .Mark (compactions [i % 2 ][2 ] - compactions [(i - 1 )% 2 ][2 ])
356+ db .compReadMeter .Mark (int64 (( compactions [i % 2 ][2 ] - compactions [(i - 1 )% 2 ][2 ]) * 1024 * 1024 ) )
322357 }
323358 if db .compWriteMeter != nil {
324- db .compWriteMeter .Mark (compactions [i % 2 ][3 ] - compactions [(i - 1 )% 2 ][3 ])
359+ db .compWriteMeter .Mark (int64 ((compactions [i % 2 ][3 ] - compactions [(i - 1 )% 2 ][3 ]) * 1024 * 1024 ))
360+ }
361+ // Retrieve the write delay statistic
362+ writedelay , err := db .db .GetProperty ("leveldb.writedelay" )
363+ if err != nil {
364+ db .log .Error ("Failed to read database write delay statistic" , "err" , err )
365+ merr = err
366+ continue
325367 }
326368 var (
327- delayN = int64 (stats .WriteDelayCount )
328- duration = stats .WriteDelayDuration
329- paused = stats .WritePaused
369+ delayN int64
370+ delayDuration string
371+ duration time.Duration
372+ paused bool
330373 )
374+ if n , err := fmt .Sscanf (writedelay , "DelayN:%d Delay:%s Paused:%t" , & delayN , & delayDuration , & paused ); n != 3 || err != nil {
375+ db .log .Error ("Write delay statistic not found" )
376+ merr = err
377+ continue
378+ }
379+ duration , err = time .ParseDuration (delayDuration )
380+ if err != nil {
381+ db .log .Error ("Failed to parse delay duration" , "err" , err )
382+ merr = err
383+ continue
384+ }
331385 if db .writeDelayNMeter != nil {
332386 db .writeDelayNMeter .Mark (delayN - delaystats [0 ])
333387 }
@@ -343,27 +397,60 @@ func (db *Database) meter(refresh time.Duration) {
343397 }
344398 delaystats [0 ], delaystats [1 ] = delayN , duration .Nanoseconds ()
345399
346- var (
347- nRead = int64 (stats .IORead )
348- nWrite = int64 (stats .IOWrite )
349- )
400+ // Retrieve the database iostats.
401+ ioStats , err := db .db .GetProperty ("leveldb.iostats" )
402+ if err != nil {
403+ db .log .Error ("Failed to read database iostats" , "err" , err )
404+ merr = err
405+ continue
406+ }
407+ var nRead , nWrite float64
408+ parts := strings .Split (ioStats , " " )
409+ if len (parts ) < 2 {
410+ db .log .Error ("Bad syntax of ioStats" , "ioStats" , ioStats )
411+ merr = fmt .Errorf ("bad syntax of ioStats %s" , ioStats )
412+ continue
413+ }
414+ if n , err := fmt .Sscanf (parts [0 ], "Read(MB):%f" , & nRead ); n != 1 || err != nil {
415+ db .log .Error ("Bad syntax of read entry" , "entry" , parts [0 ])
416+ merr = err
417+ continue
418+ }
419+ if n , err := fmt .Sscanf (parts [1 ], "Write(MB):%f" , & nWrite ); n != 1 || err != nil {
420+ db .log .Error ("Bad syntax of write entry" , "entry" , parts [1 ])
421+ merr = err
422+ continue
423+ }
350424 if db .diskReadMeter != nil {
351- db .diskReadMeter .Mark (nRead - iostats [0 ])
425+ db .diskReadMeter .Mark (int64 (( nRead - iostats [0 ]) * 1024 * 1024 ) )
352426 }
353427 if db .diskWriteMeter != nil {
354- db .diskWriteMeter .Mark (nWrite - iostats [1 ])
428+ db .diskWriteMeter .Mark (int64 (( nWrite - iostats [1 ]) * 1024 * 1024 ) )
355429 }
356430 iostats [0 ], iostats [1 ] = nRead , nWrite
357431
358- db .memCompGauge .Update (int64 (stats .MemComp ))
359- db .level0CompGauge .Update (int64 (stats .Level0Comp ))
360- db .nonlevel0CompGauge .Update (int64 (stats .NonLevel0Comp ))
361- db .seekCompGauge .Update (int64 (stats .SeekComp ))
432+ compCount , err := db .db .GetProperty ("leveldb.compcount" )
433+ if err != nil {
434+ db .log .Error ("Failed to read database iostats" , "err" , err )
435+ merr = err
436+ continue
437+ }
362438
363- // update tables amount
364- for i , tables := range stats .LevelTablesCounts {
365- db .levelsGauge [i ].Update (int64 (tables ))
439+ var (
440+ memComp uint32
441+ level0Comp uint32
442+ nonLevel0Comp uint32
443+ seekComp uint32
444+ )
445+ if n , err := fmt .Sscanf (compCount , "MemComp:%d Level0Comp:%d NonLevel0Comp:%d SeekComp:%d" , & memComp , & level0Comp , & nonLevel0Comp , & seekComp ); n != 4 || err != nil {
446+ db .log .Error ("Compaction count statistic not found" )
447+ merr = err
448+ continue
366449 }
450+ db .memCompGauge .Update (int64 (memComp ))
451+ db .level0CompGauge .Update (int64 (level0Comp ))
452+ db .nonlevel0CompGauge .Update (int64 (nonLevel0Comp ))
453+ db .seekCompGauge .Update (int64 (seekComp ))
367454
368455 // Sleep a bit, then repeat the stats collection
369456 select {
0 commit comments