From 409d10da00e41d0b7818c575dd3ceaf571a603d5 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Sat, 24 Apr 2021 11:02:46 +0200 Subject: [PATCH 1/5] core/rawdb/freezer: add testcase for append/truncate datarace --- core/rawdb/freezer_table_test.go | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index b8d3170c6235..82ec84942191 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -18,6 +18,7 @@ package rawdb import ( "bytes" + "encoding/binary" "fmt" "math/rand" "os" @@ -640,3 +641,31 @@ func TestOffset(t *testing.T) { // However, all 'normal' failure modes arising due to failing to sync() or save a file should be // handled already, and the case described above can only (?) happen if an external process/user // deletes files from the filesystem. + +func TestAppendTruncateParallel(t *testing.T) { + f, err := newCustomTable("./freezer", "tmp", metrics.NilMeter{}, + metrics.NilMeter{}, metrics.NilGauge{}, 8, true) + + if err != nil { + t.Fatal(err) + } + fill := func(mark uint64) []byte { + data := make([]byte, 8) + binary.LittleEndian.PutUint64(data, mark) + return data + } + + for { + f.truncate(0) + data0 := fill(0) + f.Append(0, data0) + data1 := fill(1) + go f.truncate(0) + go f.Append(1, data1) + if have, err := f.Retrieve(0); err == nil { + if !bytes.Equal(have, data0) { + t.Fatalf("have %x want %x", have, data0) + } + } + } +} From dd1f2d7fab33bfa1a3ce7e9c86cc03ddbc692f92 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Sat, 24 Apr 2021 11:09:15 +0200 Subject: [PATCH 2/5] core/rawdb: fix datarace in freezertable --- core/rawdb/freezer_table.go | 56 +++++++++++++++++++++++-------------- 1 file changed, 35 insertions(+), 21 deletions(-) diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index b614c10d3726..3c871539c84d 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -465,35 +465,53 @@ func (t *freezerTable) releaseFilesAfter(num uint32, remove bool) { // Note, this method will *not* flush any data to disk so be sure to explicitly // fsync before irreversibly deleting data from the database. func (t *freezerTable) Append(item uint64, blob []byte) error { + // Encode the blob before the lock portion + if !t.noCompression { + blob = snappy.Encode(nil, blob) + } // Read lock prevents competition with truncate - t.lock.RLock() + retry, err := t.append(item, blob, false) + if err != nil { + return err + } + if retry { + // Read lock was insufficient, retry with a writelock + _, err = t.append(item, blob, true) + } + return err +} +func (t *freezerTable) append(item uint64, encodedBlob []byte, wlock bool) (bool, error) { + if wlock { + t.lock.Lock() + defer t.lock.Unlock() + } else { + t.lock.RLock() + defer t.lock.RUnlock() + } // Ensure the table is still accessible if t.index == nil || t.head == nil { - t.lock.RUnlock() - return errClosed + return false, errClosed } // Ensure only the next item can be written, nothing else if atomic.LoadUint64(&t.items) != item { - t.lock.RUnlock() - return fmt.Errorf("appending unexpected item: want %d, have %d", t.items, item) + return false, fmt.Errorf("appending unexpected item: want %d, have %d", t.items, item) } - // Encode the blob and write it into the data file - if !t.noCompression { - blob = snappy.Encode(nil, blob) - } - bLen := uint32(len(blob)) + bLen := uint32(len(encodedBlob)) if t.headBytes+bLen < bLen || t.headBytes+bLen > t.maxFileSize { // we need a new file, writing would overflow - t.lock.RUnlock() - t.lock.Lock() + // try again with a writelock. We need a writelock here, but if we + // release the readlock, there's a chance that a truncation happens, so + // we need to redo all the checks above + if !wlock { + return true, nil + } nextID := atomic.LoadUint32(&t.headId) + 1 // We open the next file in truncated mode -- if this file already // exists, we need to start over from scratch on it newHead, err := t.openFile(nextID, openFreezerFileTruncated) if err != nil { - t.lock.Unlock() - return err + return false, err } // Close old file, and reopen in RDONLY mode t.releaseFile(t.headId) @@ -503,13 +521,9 @@ func (t *freezerTable) Append(item uint64, blob []byte) error { t.head = newHead atomic.StoreUint32(&t.headBytes, 0) atomic.StoreUint32(&t.headId, nextID) - t.lock.Unlock() - t.lock.RLock() } - - defer t.lock.RUnlock() - if _, err := t.head.Write(blob); err != nil { - return err + if _, err := t.head.Write(encodedBlob); err != nil { + return false, err } newOffset := atomic.AddUint32(&t.headBytes, bLen) idx := indexEntry{ @@ -523,7 +537,7 @@ func (t *freezerTable) Append(item uint64, blob []byte) error { t.sizeGauge.Inc(int64(bLen + indexEntrySize)) atomic.AddUint64(&t.items, 1) - return nil + return false, nil } // getBounds returns the indexes for the item From 430537ed60b01393bdd64845cc68dc33f53fa413 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Sat, 24 Apr 2021 11:14:15 +0200 Subject: [PATCH 3/5] core/rawdb: disable inifinite-running test --- core/rawdb/freezer_table_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index 82ec84942191..d0c60ec60e73 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -642,7 +642,13 @@ func TestOffset(t *testing.T) { // handled already, and the case described above can only (?) happen if an external process/user // deletes files from the filesystem. +// TestAppendTruncateParallel is a test to check if the Append/truncate operations +// are racy. It's disabled, since it's a long-running test which does not fit well +// in CI. +// The reason why it's not a regular fuzzer, within tests/fuzzers, is that it is +// dependent on timing rather than 'clever' input -- there's no determinism. func TestAppendTruncateParallel(t *testing.T) { + t.Skip() f, err := newCustomTable("./freezer", "tmp", metrics.NilMeter{}, metrics.NilMeter{}, metrics.NilGauge{}, 8, true) From ea3b2b1a55fa1f874e748145cd36fc4a2cb88d85 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Sat, 24 Apr 2021 19:38:42 +0200 Subject: [PATCH 4/5] core/rawdb: simplify freezer mutex use --- core/rawdb/freezer_table.go | 42 +++++++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 3c871539c84d..d7bfe18e020f 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -480,6 +480,13 @@ func (t *freezerTable) Append(item uint64, blob []byte) error { } return err } + +// append injects a binary blob at the end of the freezer table. +// Normally, inserts do not require holding the write-lock, so it should be invoked with 'wlock' set to +// false. +// However, if the data will grown the current file out of bounds, then this +// method will return 'true, nil', indicating that the caller should retry, this time +// with 'wlock' set to true. func (t *freezerTable) append(item uint64, encodedBlob []byte, wlock bool) (bool, error) { if wlock { t.lock.Lock() @@ -499,10 +506,9 @@ func (t *freezerTable) append(item uint64, encodedBlob []byte, wlock bool) (bool bLen := uint32(len(encodedBlob)) if t.headBytes+bLen < bLen || t.headBytes+bLen > t.maxFileSize { - // we need a new file, writing would overflow - // try again with a writelock. We need a writelock here, but if we - // release the readlock, there's a chance that a truncation happens, so - // we need to redo all the checks above + // Writing would overflow, so we need to open a new data file. + // If we don't already hold the writelock, abort and let the caller + // invoke this method a second time. if !wlock { return true, nil } @@ -576,44 +582,48 @@ func (t *freezerTable) getBounds(item uint64) (uint32, uint32, uint32, error) { // Retrieve looks up the data offset of an item with the given number and retrieves // the raw binary blob from the data file. func (t *freezerTable) Retrieve(item uint64) ([]byte, error) { + blob, err := t.retrieve(item) + if err != nil { + return nil, err + } + if t.noCompression { + return blob, nil + } + return snappy.Decode(nil, blob) +} + +// retrieve looks up the data offset of an item with the given number and retrieves +// the raw binary blob from the data file. OBS! This method does not decode +// compressed data. +func (t *freezerTable) retrieve(item uint64) ([]byte, error) { t.lock.RLock() + defer t.lock.RUnlock() // Ensure the table and the item is accessible if t.index == nil || t.head == nil { - t.lock.RUnlock() return nil, errClosed } if atomic.LoadUint64(&t.items) <= item { - t.lock.RUnlock() return nil, errOutOfBounds } // Ensure the item was not deleted from the tail either if uint64(t.itemOffset) > item { - t.lock.RUnlock() return nil, errOutOfBounds } startOffset, endOffset, filenum, err := t.getBounds(item - uint64(t.itemOffset)) if err != nil { - t.lock.RUnlock() return nil, err } dataFile, exist := t.files[filenum] if !exist { - t.lock.RUnlock() return nil, fmt.Errorf("missing data file %d", filenum) } // Retrieve the data itself, decompress and return blob := make([]byte, endOffset-startOffset) if _, err := dataFile.ReadAt(blob, int64(startOffset)); err != nil { - t.lock.RUnlock() return nil, err } - t.lock.RUnlock() t.readMeter.Mark(int64(len(blob) + 2*indexEntrySize)) - - if t.noCompression { - return blob, nil - } - return snappy.Decode(nil, blob) + return blob, nil } // has returns an indicator whether the specified number data From d62e81d3a2e71f013e874afbf9d0656c26706a34 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Mon, 26 Apr 2021 12:34:14 +0200 Subject: [PATCH 5/5] core/rawdb: enable TestAppendTruncateParallel --- core/rawdb/freezer_table_test.go | 47 ++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 15 deletions(-) diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index d0c60ec60e73..0df28f236d2d 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -20,9 +20,11 @@ import ( "bytes" "encoding/binary" "fmt" + "io/ioutil" "math/rand" "os" "path/filepath" + "sync" "testing" "time" @@ -638,36 +640,51 @@ func TestOffset(t *testing.T) { // 1. have data files d0, d1, d2, d3 // 2. remove d2,d3 // -// However, all 'normal' failure modes arising due to failing to sync() or save a file should be -// handled already, and the case described above can only (?) happen if an external process/user -// deletes files from the filesystem. - -// TestAppendTruncateParallel is a test to check if the Append/truncate operations -// are racy. It's disabled, since it's a long-running test which does not fit well -// in CI. -// The reason why it's not a regular fuzzer, within tests/fuzzers, is that it is -// dependent on timing rather than 'clever' input -- there's no determinism. +// However, all 'normal' failure modes arising due to failing to sync() or save a file +// should be handled already, and the case described above can only (?) happen if an +// external process/user deletes files from the filesystem. + +// TestAppendTruncateParallel is a test to check if the Append/truncate operations are +// racy. +// +// The reason why it's not a regular fuzzer, within tests/fuzzers, is that it is dependent +// on timing rather than 'clever' input -- there's no determinism. func TestAppendTruncateParallel(t *testing.T) { - t.Skip() - f, err := newCustomTable("./freezer", "tmp", metrics.NilMeter{}, - metrics.NilMeter{}, metrics.NilGauge{}, 8, true) + dir, err := ioutil.TempDir("", "freezer") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(dir) + f, err := newCustomTable(dir, "tmp", metrics.NilMeter{}, metrics.NilMeter{}, metrics.NilGauge{}, 8, true) if err != nil { t.Fatal(err) } + fill := func(mark uint64) []byte { data := make([]byte, 8) binary.LittleEndian.PutUint64(data, mark) return data } - for { + for i := 0; i < 5000; i++ { f.truncate(0) data0 := fill(0) f.Append(0, data0) data1 := fill(1) - go f.truncate(0) - go f.Append(1, data1) + + var wg sync.WaitGroup + wg.Add(2) + go func() { + f.truncate(0) + wg.Done() + }() + go func() { + f.Append(1, data1) + wg.Done() + }() + wg.Wait() + if have, err := f.Retrieve(0); err == nil { if !bytes.Equal(have, data0) { t.Fatalf("have %x want %x", have, data0)