Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions pkg/meta/boltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package meta
import (
"os"
"path"
"sync"
"time"

boltdb "github.com/boltdb/bolt"
Expand All @@ -15,6 +16,7 @@ func init() {

type bolt struct {
db *boltdb.DB
sync.Mutex
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we should add this sync.Mutex, since I found that in the definition of bolt.DB, there are lock as well. Does it conflict? Please check for this.

type DB struct {
	// When enabled, the database will perform a Check() after every commit.
	// A panic is issued if the database is in an inconsistent state. This
	// flag has a large performance impact so it should only be used for
	// debugging purposes.
	StrictMode bool

	// Setting the NoSync flag will cause the database to skip fsync()
	// calls after each commit. This can be useful when bulk loading data
	// into a database and you can restart the bulk load in the event of
	// a system failure or database corruption. Do not set this flag for
	// normal use.
	//
	// If the package global IgnoreNoSync constant is true, this value is
	// ignored.  See the comment on that constant for more details.
	//
	// THIS IS UNSAFE. PLEASE USE WITH CAUTION.
	NoSync bool

	// When true, skips the truncate call when growing the database.
	// Setting this to true is only safe on non-ext3/ext4 systems.
	// Skipping truncation avoids preallocation of hard drive space and
	// bypasses a truncate() and fsync() syscall on remapping.
	//
	// https://github.com/boltdb/bolt/issues/284
	NoGrowSync bool

	// If you want to read the entire database fast, you can set MmapFlag to
	// syscall.MAP_POPULATE on Linux 2.6.23+ for sequential read-ahead.
	MmapFlags int

	// MaxBatchSize is the maximum size of a batch. Default value is
	// copied from DefaultMaxBatchSize in Open.
	//
	// If <=0, disables batching.
	//
	// Do not change concurrently with calls to Batch.
	MaxBatchSize int

	// MaxBatchDelay is the maximum delay before a batch starts.
	// Default value is copied from DefaultMaxBatchDelay in Open.
	//
	// If <=0, effectively disables batching.
	//
	// Do not change concurrently with calls to Batch.
	MaxBatchDelay time.Duration

	// AllocSize is the amount of space allocated when the database
	// needs to create new pages. This is done to amortize the cost
	// of truncate() and fsync() when growing the data file.
	AllocSize int

	path     string
	file     *os.File
	lockfile *os.File // windows only
	dataref  []byte   // mmap'ed readonly, write throws SEGV
	data     *[maxMapSize]byte
	datasz   int
	filesz   int // current on disk file size
	meta0    *meta
	meta1    *meta
	pageSize int
	opened   bool
	rwtx     *Tx
	txs      []*Tx
	freelist *freelist
	stats    Stats

	pagePool sync.Pool

	batchMu sync.Mutex
	batch   *batch

	rwlock   sync.Mutex   // Allows only one writer at a time.
	metalock sync.Mutex   // Protects meta page access.
	mmaplock sync.RWMutex // Protects mmap access during remapping.
	statlock sync.RWMutex // Protects stats access.

	ops struct {
		writeAt func(b []byte, off int64) (n int, err error)
	}

	// Read only mode.
	// When true, Update() and Begin(true) return ErrDatabaseReadOnly immediately.
	readOnly bool
}

}

// NewBolt is used to make bolt metadata store instance.
Expand Down Expand Up @@ -46,6 +48,9 @@ func NewBolt(cfg Config) (Backend, error) {
}

func (b *bolt) prepare(db *boltdb.DB, bucket []byte) error {
b.Lock()
defer b.Unlock()

return db.Update(func(tx *boltdb.Tx) error {
_, err := tx.CreateBucketIfNotExists(bucket)
if err != nil {
Expand All @@ -64,6 +69,9 @@ func (b *bolt) Path(key string) string {
func (b *bolt) Keys(bucket string) ([]string, error) {
keys := make([]string, 0)

b.Lock()
defer b.Unlock()

err := b.db.View(func(tx *boltdb.Tx) error {
bkt := tx.Bucket([]byte(bucket))
if bkt == nil {
Expand All @@ -81,6 +89,9 @@ func (b *bolt) Keys(bucket string) ([]string, error) {

// Put is used to put metadate into boltdb.
func (b *bolt) Put(bucket, key string, value []byte) error {
b.Lock()
defer b.Unlock()

return b.db.Update(func(tx *boltdb.Tx) error {
bkt := tx.Bucket([]byte(bucket))
if bkt == nil {
Expand All @@ -95,6 +106,9 @@ func (b *bolt) Put(bucket, key string, value []byte) error {

// Del is used to delete metadate from boltdb.
func (b *bolt) Remove(bucket string, key string) error {
b.Lock()
defer b.Unlock()

return b.db.Update(func(tx *boltdb.Tx) error {
bkt := tx.Bucket([]byte(bucket))
if bkt == nil {
Expand All @@ -108,6 +122,9 @@ func (b *bolt) Remove(bucket string, key string) error {
func (b *bolt) Get(bucket string, key string) ([]byte, error) {
var value []byte

b.Lock()
defer b.Unlock()

err := b.db.View(func(tx *boltdb.Tx) error {
bkt := tx.Bucket([]byte(bucket))
if bkt == nil {
Expand All @@ -126,6 +143,9 @@ func (b *bolt) Get(bucket string, key string) ([]byte, error) {
func (b *bolt) List(bucket string) ([][]byte, error) {
values := make([][]byte, 0, 20)

b.Lock()
defer b.Unlock()

err := b.db.View(func(tx *boltdb.Tx) error {
bkt := tx.Bucket([]byte(bucket))
if bkt == nil {
Expand Down