diff --git a/cmd/influx_tools/compact/command.go b/cmd/influx_tools/compact/command.go index 4b56b5f0179..a34f08283cf 100644 --- a/cmd/influx_tools/compact/command.go +++ b/cmd/influx_tools/compact/command.go @@ -150,6 +150,10 @@ func (sc *shardCompactor) ParseFileName(path string) (int, int, error) { return 0, 0, errors.New("not implemented") } +func (sc *shardCompactor) SupportsCompactionPlanning() bool { + return false +} + func newShardCompactor(path string, logger *zap.Logger) (sc *shardCompactor, err error) { sc = &shardCompactor{ logger: logger, diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 8e6ae5a77f8..1b9a2b14e6a 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -170,12 +170,17 @@ type DefaultPlanner struct { type fileStore interface { Stats() []ExtFileStat LastModified() time.Time - ParseFileName(path string) (int, int, error) + ParseFileName(path string) (generation int, sequence int, err error) NextGeneration() int TSMReader(path string) (*TSMReader, error) + SupportsCompactionPlanning() bool } func NewDefaultPlanner(fs fileStore, writeColdDuration time.Duration) *DefaultPlanner { + if !fs.SupportsCompactionPlanning() { + // This should only happen due to developer mistakes. + panic("fileStore must support compaction planning") + } return &DefaultPlanner{ FileStore: fs, compactFullWriteColdDuration: writeColdDuration, @@ -188,15 +193,13 @@ func NewDefaultPlanner(fs fileStore, writeColdDuration time.Duration) *DefaultPl // 000001-01.tsm, 000001-02.tsm would be in the same generation // 000001 each with different sequence numbers. type tsmGeneration struct { - id int - files []ExtFileStat - parseFileName ParseFileNameFunc + id int + files []ExtFileStat } -func newTsmGeneration(id int, parseFileNameFunc ParseFileNameFunc) *tsmGeneration { +func newTsmGeneration(id int) *tsmGeneration { return &tsmGeneration{ - id: id, - parseFileName: parseFileNameFunc, + id: id, } } @@ -213,13 +216,12 @@ func (t *tsmGeneration) size() uint64 { func (t *tsmGeneration) level() int { // Level 0 is always created from the result of a cache compaction. It generates // 1 file with a sequence num of 1. Level 2 is generated by compacting multiple - // level 1 files. Level 3 is generate by compacting multiple level 2 files. Level + // level 1 files. Level 3 is generated by compacting multiple level 2 files. Level // 4 is for anything else. - _, seq, _ := t.parseFileName(t.files[0].Path) - if seq < 4 { - return seq - } + if t.files[0].Sequence < 4 { + return t.files[0].Sequence + } return 4 } @@ -250,10 +252,6 @@ func (c *DefaultPlanner) SetFileStore(fs *FileStore) { c.FileStore = fs } -func (c *DefaultPlanner) ParseFileName(path string) (int, int, error) { - return c.FileStore.ParseFileName(path) -} - func (c *DefaultPlanner) generationsFullyCompacted(gens tsmGenerations) (bool, string) { if len(gens) > 1 { return false, "not fully compacted and not idle because of more than one generation" @@ -704,17 +702,15 @@ func (c *DefaultPlanner) findGenerations(skipInUse bool) tsmGenerations { tsmStats := c.FileStore.Stats() generations := make(map[int]*tsmGeneration, len(tsmStats)) for _, f := range tsmStats { - gen, _, _ := c.ParseFileName(f.Path) - // Skip any files that are assigned to a current compaction plan if _, ok := c.filesInUse[f.Path]; skipInUse && ok { continue } - group := generations[gen] + group := generations[f.Generation] if group == nil { - group = newTsmGeneration(gen, c.ParseFileName) - generations[gen] = group + group = newTsmGeneration(f.Generation) + generations[f.Generation] = group } group.files = append(group.files, f) } @@ -786,7 +782,6 @@ type Compactor struct { RateLimit limiter.Rate formatFileName FormatFileNameFunc - parseFileName ParseFileNameFunc mu sync.RWMutex snapshotsEnabled bool @@ -809,7 +804,6 @@ type Compactor struct { func NewCompactor() *Compactor { return &Compactor{ formatFileName: DefaultFormatFileName, - parseFileName: DefaultParseFileName, } } @@ -817,10 +811,6 @@ func (c *Compactor) WithFormatFileNameFunc(formatFileNameFunc FormatFileNameFunc c.formatFileName = formatFileNameFunc } -func (c *Compactor) WithParseFileNameFunc(parseFileNameFunc ParseFileNameFunc) { - c.parseFileName = parseFileNameFunc -} - // Open initializes the Compactor. func (c *Compactor) Open() { c.mu.Lock() @@ -986,8 +976,12 @@ func (c *Compactor) compact(fast bool, tsmFiles []string, logger *zap.Logger, po // set. We need to find that max generation as well as the max sequence // number to ensure we write to the next unique location. var maxGeneration, maxSequence int + + if c.FileStore == nil { + return nil, fmt.Errorf("compactor for %s has no file store: %w", c.Dir, errCompactionsDisabled) + } for _, f := range tsmFiles { - gen, seq, err := c.parseFileName(f) + gen, seq, err := c.FileStore.ParseFileName(f) if err != nil { return nil, err } diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index 23b8a58286b..831055c0355 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -219,7 +219,16 @@ func TestCompactor_CompactFull(t *testing.T) { t.Fatalf("wrong sequence for new file: got %v, exp %v", gotSeq, expSeq) } - r := MustOpenTSMReader(files[0]) + r := MustOpenTSMReader(files[0], tsm1.WithParseFileNameFunc(tsm1.DefaultParseFileName)) + + s := r.Stats() + if s.Generation != expGen { + t.Fatalf("wrong generation for new file in Stats: got %v, exp %v", s.Generation, expGen) + } + + if s.Sequence != expSeq { + t.Fatalf("wrong sequence for new file in Stats: got %v, exp %v", s.Sequence, expSeq) + } if got, exp := r.KeyCount(), 3; got != exp { t.Fatalf("keys length mismatch: got %v, exp %v", got, exp) @@ -655,7 +664,16 @@ func TestCompactor_CompactFull_SkipFullBlocks(t *testing.T) { t.Fatalf("wrong sequence for new file: got %v, exp %v", gotSeq, expSeq) } - r := MustOpenTSMReader(files[0]) + r := MustOpenTSMReader(files[0], tsm1.WithParseFileNameFunc(tsm1.DefaultParseFileName)) + + s := r.Stats() + if s.Generation != expGen { + t.Fatalf("wrong generation for new file in Stats: got %v, exp %v", s.Generation, expGen) + } + + if s.Sequence != expSeq { + t.Fatalf("wrong sequence for new file in Stats: got %v, exp %v", s.Sequence, expSeq) + } if got, exp := r.KeyCount(), 1; got != exp { t.Fatalf("keys length mismatch: got %v, exp %v", got, exp) @@ -756,7 +774,16 @@ func TestCompactor_CompactFull_TombstonedSkipBlock(t *testing.T) { t.Fatalf("wrong sequence for new file: got %v, exp %v", gotSeq, expSeq) } - r := MustOpenTSMReader(files[0]) + r := MustOpenTSMReader(files[0], tsm1.WithParseFileNameFunc(tsm1.DefaultParseFileName)) + + s := r.Stats() + if s.Generation != expGen { + t.Fatalf("wrong generation for new file in Stats: got %v, exp %v", s.Generation, expGen) + } + + if s.Sequence != expSeq { + t.Fatalf("wrong sequence for new file in Stats: got %v, exp %v", s.Sequence, expSeq) + } if got, exp := r.KeyCount(), 1; got != exp { t.Fatalf("keys length mismatch: got %v, exp %v", got, exp) @@ -858,7 +885,16 @@ func TestCompactor_CompactFull_TombstonedPartialBlock(t *testing.T) { t.Fatalf("wrong sequence for new file: got %v, exp %v", gotSeq, expSeq) } - r := MustOpenTSMReader(files[0]) + r := MustOpenTSMReader(files[0], tsm1.WithParseFileNameFunc(tsm1.DefaultParseFileName)) + + s := r.Stats() + if s.Generation != expGen { + t.Fatalf("wrong generation for new file in Stats: got %v, exp %v", s.Generation, expGen) + } + + if s.Sequence != expSeq { + t.Fatalf("wrong sequence for new file in Stats: got %v, exp %v", s.Sequence, expSeq) + } if got, exp := r.KeyCount(), 1; got != exp { t.Fatalf("keys length mismatch: got %v, exp %v", got, exp) @@ -965,7 +1001,16 @@ func TestCompactor_CompactFull_TombstonedMultipleRanges(t *testing.T) { t.Fatalf("wrong sequence for new file: got %v, exp %v", gotSeq, expSeq) } - r := MustOpenTSMReader(files[0]) + r := MustOpenTSMReader(files[0], tsm1.WithParseFileNameFunc(tsm1.DefaultParseFileName)) + + s := r.Stats() + if s.Generation != expGen { + t.Fatalf("wrong generation for new file in Stats: got %v, exp %v", s.Generation, expGen) + } + + if s.Sequence != expSeq { + t.Fatalf("wrong sequence for new file in Stats: got %v, exp %v", s.Sequence, expSeq) + } if got, exp := r.KeyCount(), 1; got != exp { t.Fatalf("keys length mismatch: got %v, exp %v", got, exp) @@ -1571,28 +1616,30 @@ func TestCacheKeyIterator_Abort(t *testing.T) { } } -func normalizeExtFileStat(efs []tsm1.ExtFileStat, defaultBlockCount int) []tsm1.ExtFileStat { +func normalizeExtFileStat(t *testing.T, efs []tsm1.ExtFileStat, defaultBlockCount int) []tsm1.ExtFileStat { + var err error efsNorm := make([]tsm1.ExtFileStat, 0, len(efs)) for _, f := range efs { if f.FirstBlockCount == 0 { f.FirstBlockCount = defaultBlockCount } + f.Generation, f.Sequence, err = tsm1.DefaultParseFileName(f.Path) + require.NoErrorf(t, err, "failed to parse file name: %s", f.Path) efsNorm = append(efsNorm, f) } - return efsNorm } type ffsOpt func(ffs *fakeFileStore) -func withExtFileStats(efs []tsm1.ExtFileStat) ffsOpt { +func withExtFileStats(t *testing.T, efs []tsm1.ExtFileStat) ffsOpt { return func(ffs *fakeFileStore) { - ffs.PathsFn = func() []tsm1.ExtFileStat { return normalizeExtFileStat(efs, ffs.defaultBlockCount) } + ffs.PathsFn = func() []tsm1.ExtFileStat { return normalizeExtFileStat(t, efs, ffs.defaultBlockCount) } } } -func withFileStats(fs []tsm1.FileStat) ffsOpt { - return withExtFileStats(tsm1.FileStats(fs).ToExtFileStats()) +func withFileStats(t *testing.T, fs []tsm1.FileStat) ffsOpt { + return withExtFileStats(t, tsm1.FileStats(fs).ToExtFileStats()) } func withDefaultBlockCount(blockCount int) ffsOpt { @@ -1625,7 +1672,7 @@ func TestDefaultPlanner_Plan_Min(t *testing.T) { }, } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(fileStats)), tsdb.DefaultCompactFullWriteColdDuration, + newFakeFileStore(withFileStats(t, fileStats)), tsdb.DefaultCompactFullWriteColdDuration, ) tsm, pLen := cp.Plan(time.Now()) @@ -1671,7 +1718,7 @@ func TestDefaultPlanner_Plan_CombineSequence(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(data)), + newFakeFileStore(withFileStats(t, data)), tsdb.DefaultCompactFullWriteColdDuration, ) @@ -1732,7 +1779,7 @@ func TestDefaultPlanner_Plan_MultipleGroups(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(data)), + newFakeFileStore(withFileStats(t, data)), tsdb.DefaultCompactFullWriteColdDuration) expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3], @@ -1820,7 +1867,7 @@ func TestDefaultPlanner_PlanLevel_SmallestCompactionStep(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(data)), + newFakeFileStore(withFileStats(t, data)), tsdb.DefaultCompactFullWriteColdDuration, ) @@ -1872,7 +1919,7 @@ func TestDefaultPlanner_PlanLevel_SplitFile(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(data)), + newFakeFileStore(withFileStats(t, data)), tsdb.DefaultCompactFullWriteColdDuration, ) @@ -1924,7 +1971,7 @@ func TestDefaultPlanner_PlanLevel_IsolatedHighLevel(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(data)), + newFakeFileStore(withFileStats(t, data)), tsdb.DefaultCompactFullWriteColdDuration, ) @@ -1966,7 +2013,7 @@ func TestDefaultPlanner_PlanLevel3_MinFiles(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(data)), + newFakeFileStore(withFileStats(t, data)), tsdb.DefaultCompactFullWriteColdDuration, ) @@ -1997,7 +2044,7 @@ func TestDefaultPlanner_PlanLevel2_MinFiles(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(data)), + newFakeFileStore(withFileStats(t, data)), tsdb.DefaultCompactFullWriteColdDuration, ) @@ -2040,7 +2087,7 @@ func TestDefaultPlanner_PlanLevel_Tombstone(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(data)), + newFakeFileStore(withFileStats(t, data)), tsdb.DefaultCompactFullWriteColdDuration, ) @@ -2096,7 +2143,7 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(data)), + newFakeFileStore(withFileStats(t, data)), tsdb.DefaultCompactFullWriteColdDuration, ) @@ -2185,7 +2232,7 @@ func TestDefaultPlanner_PlanLevel_InUse(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(data)), + newFakeFileStore(withFileStats(t, data)), tsdb.DefaultCompactFullWriteColdDuration, ) @@ -2248,7 +2295,7 @@ func TestDefaultPlanner_PlanOptimize_NoLevel4(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(data)), + newFakeFileStore(withFileStats(t, data)), tsdb.DefaultCompactFullWriteColdDuration, ) @@ -2281,7 +2328,7 @@ func TestDefaultPlanner_PlanOptimize_Tombstones(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(data)), + newFakeFileStore(withFileStats(t, data)), tsdb.DefaultCompactFullWriteColdDuration, ) @@ -2332,7 +2379,7 @@ func TestDefaultPlanner_Plan_FullOnCold(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(data)), + newFakeFileStore(withFileStats(t, data)), time.Nanosecond, ) @@ -2365,7 +2412,7 @@ func TestDefaultPlanner_Plan_SkipMaxSizeFiles(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(data)), + newFakeFileStore(withFileStats(t, data)), tsdb.DefaultCompactFullWriteColdDuration, ) @@ -2399,7 +2446,7 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) { }, } - ffs := newFakeFileStore(withFileStats(testSet), withDefaultBlockCount(tsdb.DefaultMaxPointsPerBlock)) + ffs := newFakeFileStore(withFileStats(t, testSet), withDefaultBlockCount(tsdb.DefaultMaxPointsPerBlock)) cp := tsm1.NewDefaultPlanner(ffs, time.Nanosecond) @@ -2436,7 +2483,7 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) { }, } - overFs := newFakeFileStore(withFileStats(over), withDefaultBlockCount(tsdb.DefaultMaxPointsPerBlock)) + overFs := newFakeFileStore(withFileStats(t, over), withDefaultBlockCount(tsdb.DefaultMaxPointsPerBlock)) cp.FileStore = overFs plan, pLen = cp.Plan(time.Now().Add(-time.Second)) @@ -2515,7 +2562,7 @@ func TestDefaultPlanner_Plan_TwoGenLevel3(t *testing.T) { }, } - fs := newFakeFileStore(withFileStats(data), withDefaultBlockCount(tsdb.DefaultMaxPointsPerBlock)) + fs := newFakeFileStore(withFileStats(t, data), withDefaultBlockCount(tsdb.DefaultMaxPointsPerBlock)) cp := tsm1.NewDefaultPlanner(fs, time.Hour) @@ -2549,7 +2596,7 @@ func TestDefaultPlanner_Plan_NotFullOverMaxsize(t *testing.T) { }, } - ffs := newFakeFileStore(withFileStats(testSet), withDefaultBlockCount(100)) + ffs := newFakeFileStore(withFileStats(t, testSet), withDefaultBlockCount(100)) cp := tsm1.NewDefaultPlanner( ffs, @@ -2577,7 +2624,7 @@ func TestDefaultPlanner_Plan_NotFullOverMaxsize(t *testing.T) { }, } - overFs := newFakeFileStore(withFileStats(over), withDefaultBlockCount(100)) + overFs := newFakeFileStore(withFileStats(t, over), withDefaultBlockCount(100)) cp.FileStore = overFs cGroups, pLen := cp.Plan(time.Now().Add(-time.Second)) @@ -2615,7 +2662,7 @@ func TestDefaultPlanner_Plan_CompactsMiddleSteps(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(data)), + newFakeFileStore(withFileStats(t, data)), tsdb.DefaultCompactFullWriteColdDuration, ) @@ -2636,7 +2683,7 @@ func TestDefaultPlanner_Plan_CompactsMiddleSteps(t *testing.T) { func TestDefaultPlanner_Plan_LargeGeneration(t *testing.T) { cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats( + newFakeFileStore(withFileStats(t, []tsm1.FileStat{ { Path: "000000278-000000006.tsm", @@ -2672,7 +2719,7 @@ func TestDefaultPlanner_Plan_LargeGeneration(t *testing.T) { func TestDefaultPlanner_Plan_ForceFull(t *testing.T) { cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats( + newFakeFileStore(withFileStats(t, []tsm1.FileStat{ { Path: "000000001-000000001.tsm", @@ -2858,7 +2905,7 @@ func TestIsGroupOptimized(t *testing.T) { }, } - ffs := newFakeFileStore(withExtFileStats(testSet)) + ffs := newFakeFileStore(withExtFileStats(t, testSet)) cp := tsm1.NewDefaultPlanner(ffs, tsdb.DefaultCompactFullWriteColdDuration) e := MustOpenEngine(tsdb.InmemIndexName) @@ -4485,7 +4532,7 @@ func TestEnginePlanCompactions(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - ffs := newFakeFileStore(withExtFileStats(test.files), withDefaultBlockCount(test.defaultBlockCount)) + ffs := newFakeFileStore(withExtFileStats(t, test.files), withDefaultBlockCount(test.defaultBlockCount)) cp := tsm1.NewDefaultPlanner(ffs, test.testShardTime) e.MaxPointsPerBlock = tsdb.DefaultMaxPointsPerBlock @@ -4604,17 +4651,17 @@ func MustWriteTSM(dir string, gen int, values map[string][]tsm1.Value) string { return name } -func MustTSMReader(dir string, gen int, values map[string][]tsm1.Value) *tsm1.TSMReader { - return MustOpenTSMReader(MustWriteTSM(dir, gen, values)) +func MustTSMReader(dir string, gen int, values map[string][]tsm1.Value, options ...tsm1.TsmReaderOption) *tsm1.TSMReader { + return MustOpenTSMReader(MustWriteTSM(dir, gen, values), options...) } -func MustOpenTSMReader(name string) *tsm1.TSMReader { +func MustOpenTSMReader(name string, options ...tsm1.TsmReaderOption) *tsm1.TSMReader { f, err := os.Open(name) if err != nil { panic(fmt.Sprintf("open file: %v", err)) } - r, err := tsm1.NewTSMReader(f) + r, err := tsm1.NewTSMReader(f, options...) if err != nil { panic(fmt.Sprintf("new reader: %v", err)) } @@ -4645,7 +4692,7 @@ func (w *fakeFileStore) LastModified() time.Time { } func (w *fakeFileStore) TSMReader(path string) (*tsm1.TSMReader, error) { - r := MustOpenTSMReader(path) + r := MustOpenTSMReader(path, tsm1.WithParseFileNameFunc(w.ParseFileName)) w.readers = append(w.readers, r) r.Ref() return r, nil @@ -4661,3 +4708,8 @@ func (w *fakeFileStore) Close() { func (w *fakeFileStore) ParseFileName(path string) (int, int, error) { return tsm1.DefaultParseFileName(path) } + +func (w *fakeFileStore) SupportsCompactionPlanning() bool { + // Our ParseFileName is hard-coded to always use default. + return true +} diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index fc4a54eac96..93efe73fc65 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -311,7 +311,6 @@ func (e *Engine) WithFormatFileNameFunc(formatFileNameFunc FormatFileNameFunc) { func (e *Engine) WithParseFileNameFunc(parseFileNameFunc ParseFileNameFunc) { e.FileStore.WithParseFileNameFunc(parseFileNameFunc) - e.Compactor.WithParseFileNameFunc(parseFileNameFunc) } // Digest returns a reader for the shard's digest. diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index bfe82f572ef..4fe59276366 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -225,7 +225,7 @@ type FileStore struct { copyFiles bool - readerOptions []tsmReaderOption + readerOptions []TsmReaderOption // newReaderBlockCount keeps track of the current new reader block requests. // If non-zero, no new TSMReader objects may be created. @@ -240,6 +240,8 @@ type FileStat struct { LastModified int64 MinTime, MaxTime int64 MinKey, MaxKey []byte + Generation int + Sequence int } type FileStats []FileStat @@ -293,7 +295,7 @@ func (f FileStat) ContainsKey(key []byte) bool { } // NewFileStore returns a new instance of FileStore based on the given directory. -func NewFileStore(dir string, options ...tsmReaderOption) *FileStore { +func NewFileStore(dir string, options ...TsmReaderOption) *FileStore { logger := zap.NewNop() fs := &FileStore{ dir: dir, @@ -309,12 +311,17 @@ func NewFileStore(dir string, options ...tsmReaderOption) *FileStore { obs: noFileStoreObserver{}, parseFileName: DefaultParseFileName, copyFiles: runtime.GOOS == "windows", - readerOptions: options, + readerOptions: append([]TsmReaderOption{WithParseFileNameFunc(DefaultParseFileName)}, options...), } fs.purger.fileStore = fs return fs } +// SupportsCompactionPlanning returns true if f supports all functionality needed for compaction planning. +func (f *FileStore) SupportsCompactionPlanning() bool { + return f.parseFileName != nil +} + // WithObserver sets the observer for the file store. func (f *FileStore) WithObserver(obs tsdb.FileStoreObserver) { f.obs = obs @@ -322,9 +329,13 @@ func (f *FileStore) WithObserver(obs tsdb.FileStoreObserver) { func (f *FileStore) WithParseFileNameFunc(parseFileNameFunc ParseFileNameFunc) { f.parseFileName = parseFileNameFunc + f.readerOptions = append(f.readerOptions, WithParseFileNameFunc(f.parseFileName)) } func (f *FileStore) ParseFileName(path string) (int, int, error) { + if f == nil || f.parseFileName == nil { + return 0, 0, fmt.Errorf("failed parsing %s: file store is not initialized", path) + } return f.parseFileName(path) } @@ -603,7 +614,7 @@ func (f *FileStore) Open() error { readerC := make(chan *res) for i, fn := range files { // Keep track of the latest ID - generation, _, err := f.parseFileName(fn) + generation, _, err := f.ParseFileName(fn) if err != nil { return fmt.Errorf("error parsing %q in FileStore.Open: %w", fn, err) } @@ -1355,12 +1366,12 @@ func DefaultParseFileName(name string) (int, int, error) { generation, err := strconv.ParseUint(id[:idx], 10, 32) if err != nil { - return 0, 0, fmt.Errorf("file %s is named incorrectly", name) + return 0, 0, fmt.Errorf("cannot parse generation number; file %s is named incorrectly: %w", name, err) } sequence, err := strconv.ParseUint(id[idx+1:], 10, 32) if err != nil { - return 0, 0, fmt.Errorf("file %s is named incorrectly", name) + return 0, 0, fmt.Errorf("cannot parse sequence number; file %s is named incorrectly: %w", name, err) } return int(generation), int(sequence), nil diff --git a/tsdb/engine/tsm1/file_store_internal_test.go b/tsdb/engine/tsm1/file_store_internal_test.go index e5db2d29bff..1cc0616a472 100644 --- a/tsdb/engine/tsm1/file_store_internal_test.go +++ b/tsdb/engine/tsm1/file_store_internal_test.go @@ -4,7 +4,7 @@ import ( "github.com/influxdata/influxdb/tsdb" ) -var TestMmapInitFailOption = func(err error) tsmReaderOption { +var TestMmapInitFailOption = func(err error) TsmReaderOption { return func(r *TSMReader) { r.accessor = &badBlockAccessor{error: err} } diff --git a/tsdb/engine/tsm1/reader.go b/tsdb/engine/tsm1/reader.go index abf3137397d..351c06beefa 100644 --- a/tsdb/engine/tsm1/reader.go +++ b/tsdb/engine/tsm1/reader.go @@ -62,6 +62,10 @@ type TSMReader struct { deleteMu sync.Mutex firstBlockCountCache blockCountCache + + parseFileNameFunc ParseFileNameFunc + generation int + sequence int } // TSMIndex represent the index section of a TSM file. The index records all @@ -221,18 +225,24 @@ func (b *BlockIterator) Err() error { return b.err } -type tsmReaderOption func(*TSMReader) +type TsmReaderOption func(*TSMReader) // WithMadviseWillNeed is an option for specifying whether to provide a MADV_WILL need hint to the kernel. -var WithMadviseWillNeed = func(willNeed bool) tsmReaderOption { +var WithMadviseWillNeed = func(willNeed bool) TsmReaderOption { return func(r *TSMReader) { r.madviseWillNeed = willNeed } } -// TODO(DSB) - add a tsmReaderOption in a test call that has the mmmapAccessor mock a failure +var WithParseFileNameFunc = func(f ParseFileNameFunc) TsmReaderOption { + return func(r *TSMReader) { + r.parseFileNameFunc = f + } +} + +// TODO(DSB) - add a TsmReaderOption in a test call that has the mmmapAccessor mock a failure // NewTSMReader returns a new TSMReader from the given file. -func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) { +func NewTSMReader(f *os.File, options ...TsmReaderOption) (*TSMReader, error) { t := &TSMReader{} for _, option := range options { option(t) @@ -263,6 +273,11 @@ func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) { return nil, err } + err = t.parseAndCacheFileName(t.Path()) + if nil != err { + return nil, fmt.Errorf("failed creating new TSM reader: %w", err) + } + return t, nil } @@ -421,7 +436,27 @@ func (t *TSMReader) Remove() error { func (t *TSMReader) Rename(path string) error { t.mu.Lock() defer t.mu.Unlock() - return t.accessor.rename(path) + + if err := t.accessor.rename(path); err != nil { + return fmt.Errorf("failure renaming to %q: %w", path, err) + } + + if err := t.parseAndCacheFileName(path); err != nil { + return fmt.Errorf("rename failed: %w", err) + } + return nil +} + +func (t *TSMReader) parseAndCacheFileName(path string) error { + if t.parseFileNameFunc != nil { + var err error + t.generation, t.sequence, err = t.parseFileNameFunc(path) + return err + } else { + // If parseFileNameFunc is nil, we are in a test or another TSMReader use + // that does not involve compaction planning, and was not created by the FileStore + return nil + } } // Remove removes any underlying files stored on disk for this reader. @@ -631,6 +666,7 @@ func (t *TSMReader) firstBlockCount() (int, error) { func (t *TSMReader) Stats() FileStat { minTime, maxTime := t.index.TimeRange() minKey, maxKey := t.index.KeyRange() + return FileStat{ Path: t.Path(), Size: t.Size(), @@ -640,6 +676,8 @@ func (t *TSMReader) Stats() FileStat { MinKey: minKey, MaxKey: maxKey, HasTombstone: t.tombstoner.HasTombstones(), + Generation: t.generation, + Sequence: t.sequence, } }