diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index 0502441bf3a..1b1f0dacd8d 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -184,12 +184,17 @@ type DefaultPlanner struct { type fileStore interface { Stats() []ExtFileStat LastModified() time.Time - ParseFileName(path string) (int, int, error) + ParseFileName(path string) (generation int, sequence int, err error) NextGeneration() int TSMReader(path string) (*TSMReader, error) + SupportsCompactionPlanning() bool } func NewDefaultPlanner(fs fileStore, writeColdDuration time.Duration) *DefaultPlanner { + if !fs.SupportsCompactionPlanning() { + // This should only happen due to developer mistakes. + panic("fileStore must support compaction planning") + } return &DefaultPlanner{ FileStore: fs, compactFullWriteColdDuration: writeColdDuration, @@ -202,15 +207,13 @@ func NewDefaultPlanner(fs fileStore, writeColdDuration time.Duration) *DefaultPl // 000001-01.tsm, 000001-02.tsm would be in the same generation // 000001 each with different sequence numbers. type tsmGeneration struct { - id int - files []ExtFileStat - parseFileName ParseFileNameFunc + id int + files []ExtFileStat } -func newTsmGeneration(id int, parseFileNameFunc ParseFileNameFunc) *tsmGeneration { +func newTsmGeneration(id int) *tsmGeneration { return &tsmGeneration{ - id: id, - parseFileName: parseFileNameFunc, + id: id, } } @@ -227,13 +230,12 @@ func (t *tsmGeneration) size() uint64 { func (t *tsmGeneration) level() int { // Level 0 is always created from the result of a cache compaction. It generates // 1 file with a sequence num of 1. Level 2 is generated by compacting multiple - // level 1 files. Level 3 is generate by compacting multiple level 2 files. Level + // level 1 files. Level 3 is generated by compacting multiple level 2 files. Level // 4 is for anything else. - _, seq, _ := t.parseFileName(t.files[0].Path) - if seq < 4 { - return seq - } + if t.files[0].Sequence < 4 { + return t.files[0].Sequence + } return 4 } @@ -259,10 +261,6 @@ func (c *DefaultPlanner) SetFileStore(fs *FileStore) { c.FileStore = fs } -func (c *DefaultPlanner) ParseFileName(path string) (int, int, error) { - return c.FileStore.ParseFileName(path) -} - func (c *DefaultPlanner) generationsFullyCompacted(gens tsmGenerations) (bool, string) { if len(gens) > 1 { return false, "not fully compacted and not idle because of more than one generation" @@ -713,17 +711,15 @@ func (c *DefaultPlanner) findGenerations(skipInUse bool) tsmGenerations { tsmStats := c.FileStore.Stats() generations := make(map[int]*tsmGeneration, len(tsmStats)) for _, f := range tsmStats { - gen, _, _ := c.ParseFileName(f.Path) - // Skip any files that are assigned to a current compaction plan if _, ok := c.filesInUse[f.Path]; skipInUse && ok { continue } - group := generations[gen] + group := generations[f.Generation] if group == nil { - group = newTsmGeneration(gen, c.ParseFileName) - generations[gen] = group + group = newTsmGeneration(f.Generation) + generations[f.Generation] = group } group.files = append(group.files, f) } @@ -795,7 +791,6 @@ type Compactor struct { RateLimit limiter.Rate formatFileName FormatFileNameFunc - parseFileName ParseFileNameFunc mu sync.RWMutex snapshotsEnabled bool @@ -818,7 +813,6 @@ type Compactor struct { func NewCompactor() *Compactor { return &Compactor{ formatFileName: DefaultFormatFileName, - parseFileName: DefaultParseFileName, } } @@ -826,10 +820,6 @@ func (c *Compactor) WithFormatFileNameFunc(formatFileNameFunc FormatFileNameFunc c.formatFileName = formatFileNameFunc } -func (c *Compactor) WithParseFileNameFunc(parseFileNameFunc ParseFileNameFunc) { - c.parseFileName = parseFileNameFunc -} - // Open initializes the Compactor. func (c *Compactor) Open() { c.mu.Lock() @@ -995,8 +985,12 @@ func (c *Compactor) compact(fast bool, tsmFiles []string, logger *zap.Logger, po // set. We need to find that max generation as well as the max sequence // number to ensure we write to the next unique location. var maxGeneration, maxSequence int + + if c.FileStore == nil { + return nil, fmt.Errorf("compactor for %s has no file store: %w", c.Dir, errCompactionsDisabled) + } for _, f := range tsmFiles { - gen, seq, err := c.parseFileName(f) + gen, seq, err := c.FileStore.ParseFileName(f) if err != nil { return nil, err } diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index bbbdea5ca8c..f06d201f05c 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -41,7 +41,7 @@ func TestCompactor_Snapshot(t *testing.T) { } fs := &fakeFileStore{} - t.Cleanup(func() { fs.Close() }) + t.Cleanup(func() { require.NoError(t, fs.Close()) }) compactor := tsm1.NewCompactor() compactor.Dir = dir @@ -121,7 +121,7 @@ func TestCompactor_CompactFullLastTimestamp(t *testing.T) { f2 := MustWriteTSM(t, dir, 2, writes) ffs := &fakeFileStore{} - t.Cleanup(func() { ffs.Close() }) + t.Cleanup(func() { require.NoError(t, ffs.Close()) }) compactor := tsm1.NewCompactor() compactor.Dir = dir @@ -180,7 +180,7 @@ func TestCompactor_CompactFull(t *testing.T) { f3 := MustWriteTSM(t, dir, 3, writes) ffs := &fakeFileStore{} - t.Cleanup(func() { ffs.Close() }) + t.Cleanup(func() { require.NoError(t, ffs.Close()) }) compactor := tsm1.NewCompactor() compactor.Dir = dir compactor.FileStore = ffs @@ -224,9 +224,18 @@ func TestCompactor_CompactFull(t *testing.T) { t.Fatalf("wrong sequence for new file: got %v, exp %v", gotSeq, expSeq) } - r := MustOpenTSMReader(files[0]) + r := MustOpenTSMReader(files[0], tsm1.WithParseFileNameFunc(tsm1.DefaultParseFileName)) t.Cleanup(func() { r.Close() }) + s := r.Stats() + if s.Generation != expGen { + t.Fatalf("wrong generation for new file in Stats: got %v, exp %v", s.Generation, expGen) + } + + if s.Sequence != expSeq { + t.Fatalf("wrong sequence for new file in Stats: got %v, exp %v", s.Sequence, expSeq) + } + if got, exp := r.KeyCount(), 3; got != exp { t.Fatalf("keys length mismatch: got %v, exp %v", got, exp) } @@ -290,7 +299,7 @@ func TestCompactor_DecodeError(t *testing.T) { f.Close() ffs := &fakeFileStore{} - defer ffs.Close() + t.Cleanup(func() { require.NoError(t, ffs.Close()) }) compactor := tsm1.NewCompactor() compactor.Dir = dir compactor.FileStore = ffs @@ -335,7 +344,7 @@ func TestCompactor_Compact_OverlappingBlocks(t *testing.T) { f3 := MustWriteTSM(t, dir, 3, writes) ffs := &fakeFileStore{} - t.Cleanup(func() { ffs.Close() }) + t.Cleanup(func() { require.NoError(t, ffs.Close()) }) compactor := tsm1.NewCompactor() compactor.Dir = dir compactor.FileStore = ffs @@ -414,7 +423,7 @@ func TestCompactor_Compact_OverlappingBlocksMultiple(t *testing.T) { f3 := MustWriteTSM(t, dir, 3, writes) ffs := &fakeFileStore{} - t.Cleanup(func() { ffs.Close() }) + t.Cleanup(func() { require.NoError(t, ffs.Close()) }) compactor := tsm1.NewCompactor() compactor.Dir = dir compactor.FileStore = ffs @@ -483,7 +492,7 @@ func TestCompactor_Compact_UnsortedBlocks(t *testing.T) { f2 := MustWriteTSM(t, dir, 2, writes) fs := &fakeFileStore{} - t.Cleanup(func() { fs.Close() }) + t.Cleanup(func() { require.NoError(t, fs.Close()) }) compactor := tsm1.NewCompactor() compactor.Dir = dir compactor.FileStore = &fakeFileStore{} @@ -558,7 +567,7 @@ func TestCompactor_Compact_UnsortedBlocksOverlapping(t *testing.T) { f3 := MustWriteTSM(t, dir, 3, writes) fs := &fakeFileStore{} - t.Cleanup(func() { fs.Close() }) + t.Cleanup(func() { require.NoError(t, fs.Close()) }) compactor := tsm1.NewCompactor() compactor.Dir = dir compactor.FileStore = &fakeFileStore{} @@ -629,7 +638,7 @@ func TestCompactor_CompactFull_SkipFullBlocks(t *testing.T) { f3 := MustWriteTSM(t, dir, 3, writes) ffs := &fakeFileStore{} - t.Cleanup(func() { ffs.Close() }) + t.Cleanup(func() { require.NoError(t, ffs.Close()) }) compactor := tsm1.NewCompactor() compactor.Dir = dir compactor.FileStore = ffs @@ -663,8 +672,17 @@ func TestCompactor_CompactFull_SkipFullBlocks(t *testing.T) { t.Fatalf("wrong sequence for new file: got %v, exp %v", gotSeq, expSeq) } - r := MustOpenTSMReader(files[0]) - t.Cleanup(func() { r.Close() }) + r := MustOpenTSMReader(files[0], tsm1.WithParseFileNameFunc(tsm1.DefaultParseFileName)) + t.Cleanup(func() { require.NoError(t, r.Close()) }) + + s := r.Stats() + if s.Generation != expGen { + t.Fatalf("wrong generation for new file in Stats: got %v, exp %v", s.Generation, expGen) + } + + if s.Sequence != expSeq { + t.Fatalf("wrong sequence for new file in Stats: got %v, exp %v", s.Sequence, expSeq) + } if got, exp := r.KeyCount(), 1; got != exp { t.Fatalf("keys length mismatch: got %v, exp %v", got, exp) @@ -730,7 +748,7 @@ func TestCompactor_CompactFull_TombstonedSkipBlock(t *testing.T) { f3 := MustWriteTSM(t, dir, 3, writes) ffs := &fakeFileStore{} - t.Cleanup(func() { ffs.Close() }) + t.Cleanup(func() { require.NoError(t, ffs.Close()) }) compactor := tsm1.NewCompactor() compactor.Dir = dir compactor.FileStore = ffs @@ -764,8 +782,17 @@ func TestCompactor_CompactFull_TombstonedSkipBlock(t *testing.T) { t.Fatalf("wrong sequence for new file: got %v, exp %v", gotSeq, expSeq) } - r := MustOpenTSMReader(files[0]) - t.Cleanup(func() { r.Close() }) + r := MustOpenTSMReader(files[0], tsm1.WithParseFileNameFunc(tsm1.DefaultParseFileName)) + t.Cleanup(func() { require.NoError(t, r.Close()) }) + + s := r.Stats() + if s.Generation != expGen { + t.Fatalf("wrong generation for new file in Stats: got %v, exp %v", s.Generation, expGen) + } + + if s.Sequence != expSeq { + t.Fatalf("wrong sequence for new file in Stats: got %v, exp %v", s.Sequence, expSeq) + } if got, exp := r.KeyCount(), 1; got != exp { t.Fatalf("keys length mismatch: got %v, exp %v", got, exp) @@ -832,7 +859,7 @@ func TestCompactor_CompactFull_TombstonedPartialBlock(t *testing.T) { f3 := MustWriteTSM(t, dir, 3, writes) ffs := &fakeFileStore{} - t.Cleanup(func() { ffs.Close() }) + t.Cleanup(func() { require.NoError(t, ffs.Close()) }) compactor := tsm1.NewCompactor() compactor.Dir = dir compactor.FileStore = ffs @@ -866,8 +893,17 @@ func TestCompactor_CompactFull_TombstonedPartialBlock(t *testing.T) { t.Fatalf("wrong sequence for new file: got %v, exp %v", gotSeq, expSeq) } - r := MustOpenTSMReader(files[0]) - t.Cleanup(func() { r.Close() }) + r := MustOpenTSMReader(files[0], tsm1.WithParseFileNameFunc(tsm1.DefaultParseFileName)) + t.Cleanup(func() { require.NoError(t, r.Close()) }) + + s := r.Stats() + if s.Generation != expGen { + t.Fatalf("wrong generation for new file in Stats: got %v, exp %v", s.Generation, expGen) + } + + if s.Sequence != expSeq { + t.Fatalf("wrong sequence for new file in Stats: got %v, exp %v", s.Sequence, expSeq) + } if got, exp := r.KeyCount(), 1; got != exp { t.Fatalf("keys length mismatch: got %v, exp %v", got, exp) @@ -939,7 +975,7 @@ func TestCompactor_CompactFull_TombstonedMultipleRanges(t *testing.T) { f3 := MustWriteTSM(t, dir, 3, writes) ffs := &fakeFileStore{} - t.Cleanup(func() { ffs.Close() }) + t.Cleanup(func() { require.NoError(t, ffs.Close()) }) compactor := tsm1.NewCompactor() compactor.Dir = dir compactor.FileStore = ffs @@ -973,8 +1009,17 @@ func TestCompactor_CompactFull_TombstonedMultipleRanges(t *testing.T) { t.Fatalf("wrong sequence for new file: got %v, exp %v", gotSeq, expSeq) } - r := MustOpenTSMReader(files[0]) - t.Cleanup(func() { r.Close() }) + r := MustOpenTSMReader(files[0], tsm1.WithParseFileNameFunc(tsm1.DefaultParseFileName)) + t.Cleanup(func() { require.NoError(t, r.Close()) }) + + s := r.Stats() + if s.Generation != expGen { + t.Fatalf("wrong generation for new file in Stats: got %v, exp %v", s.Generation, expGen) + } + + if s.Sequence != expSeq { + t.Fatalf("wrong sequence for new file in Stats: got %v, exp %v", s.Sequence, expSeq) + } if got, exp := r.KeyCount(), 1; got != exp { t.Fatalf("keys length mismatch: got %v, exp %v", got, exp) @@ -1054,7 +1099,7 @@ func TestCompactor_CompactFull_MaxKeys(t *testing.T) { f2.Close() ffs := &fakeFileStore{} - t.Cleanup(func() { ffs.Close() }) + t.Cleanup(func() { require.NoError(t, ffs.Close()) }) compactor := tsm1.NewCompactor() compactor.Dir = dir compactor.FileStore = ffs @@ -1116,7 +1161,7 @@ func TestCompactor_CompactFull_InProgress(t *testing.T) { return f2Name }() ffs := &fakeFileStore{} - defer ffs.Close() + t.Cleanup(func() { require.NoError(t, ffs.Close()) }) compactor := tsm1.NewCompactor() compactor.Dir = dir compactor.FileStore = ffs @@ -1584,28 +1629,30 @@ func TestCacheKeyIterator_Abort(t *testing.T) { } } -func normalizeExtFileStat(efs []tsm1.ExtFileStat, defaultBlockCount int) []tsm1.ExtFileStat { +func normalizeExtFileStat(t *testing.T, efs []tsm1.ExtFileStat, defaultBlockCount int) []tsm1.ExtFileStat { + var err error efsNorm := make([]tsm1.ExtFileStat, 0, len(efs)) for _, f := range efs { if f.FirstBlockCount == 0 { f.FirstBlockCount = defaultBlockCount } + f.Generation, f.Sequence, err = tsm1.DefaultParseFileName(f.Path) + require.NoErrorf(t, err, "failed to parse file name: %s", f.Path) efsNorm = append(efsNorm, f) } - return efsNorm } type ffsOpt func(ffs *fakeFileStore) -func withExtFileStats(efs []tsm1.ExtFileStat) ffsOpt { +func withExtFileStats(t *testing.T, efs []tsm1.ExtFileStat) ffsOpt { return func(ffs *fakeFileStore) { - ffs.PathsFn = func() []tsm1.ExtFileStat { return normalizeExtFileStat(efs, ffs.defaultBlockCount) } + ffs.PathsFn = func() []tsm1.ExtFileStat { return normalizeExtFileStat(t, efs, ffs.defaultBlockCount) } } } -func withFileStats(fs []tsm1.FileStat) ffsOpt { - return withExtFileStats(tsm1.FileStats(fs).ToExtFileStats()) +func withFileStats(t *testing.T, fs []tsm1.FileStat) ffsOpt { + return withExtFileStats(t, tsm1.FileStats(fs).ToExtFileStats()) } func withDefaultBlockCount(blockCount int) ffsOpt { @@ -1638,7 +1685,7 @@ func TestDefaultPlanner_Plan_Min(t *testing.T) { }, } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(fileStats)), tsdb.DefaultCompactFullWriteColdDuration, + newFakeFileStore(withFileStats(t, fileStats)), tsdb.DefaultCompactFullWriteColdDuration, ) tsm, pLen := cp.Plan(time.Now()) @@ -1684,7 +1731,7 @@ func TestDefaultPlanner_Plan_CombineSequence(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(data)), + newFakeFileStore(withFileStats(t, data)), tsdb.DefaultCompactFullWriteColdDuration, ) @@ -1745,7 +1792,7 @@ func TestDefaultPlanner_Plan_MultipleGroups(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(data)), + newFakeFileStore(withFileStats(t, data)), tsdb.DefaultCompactFullWriteColdDuration) expFiles := []tsm1.FileStat{data[0], data[1], data[2], data[3], @@ -1833,7 +1880,7 @@ func TestDefaultPlanner_PlanLevel_SmallestCompactionStep(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(data)), + newFakeFileStore(withFileStats(t, data)), tsdb.DefaultCompactFullWriteColdDuration, ) @@ -1885,7 +1932,7 @@ func TestDefaultPlanner_PlanLevel_SplitFile(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(data)), + newFakeFileStore(withFileStats(t, data)), tsdb.DefaultCompactFullWriteColdDuration, ) @@ -1937,7 +1984,7 @@ func TestDefaultPlanner_PlanLevel_IsolatedHighLevel(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(data)), + newFakeFileStore(withFileStats(t, data)), tsdb.DefaultCompactFullWriteColdDuration, ) @@ -1979,7 +2026,7 @@ func TestDefaultPlanner_PlanLevel3_MinFiles(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(data)), + newFakeFileStore(withFileStats(t, data)), tsdb.DefaultCompactFullWriteColdDuration, ) @@ -2010,7 +2057,7 @@ func TestDefaultPlanner_PlanLevel2_MinFiles(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(data)), + newFakeFileStore(withFileStats(t, data)), tsdb.DefaultCompactFullWriteColdDuration, ) @@ -2053,7 +2100,7 @@ func TestDefaultPlanner_PlanLevel_Tombstone(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(data)), + newFakeFileStore(withFileStats(t, data)), tsdb.DefaultCompactFullWriteColdDuration, ) @@ -2109,7 +2156,7 @@ func TestDefaultPlanner_PlanLevel_Multiple(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(data)), + newFakeFileStore(withFileStats(t, data)), tsdb.DefaultCompactFullWriteColdDuration, ) @@ -2198,7 +2245,7 @@ func TestDefaultPlanner_PlanLevel_InUse(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(data)), + newFakeFileStore(withFileStats(t, data)), tsdb.DefaultCompactFullWriteColdDuration, ) @@ -2261,7 +2308,7 @@ func TestDefaultPlanner_PlanOptimize_NoLevel4(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(data)), + newFakeFileStore(withFileStats(t, data)), tsdb.DefaultCompactFullWriteColdDuration, ) @@ -2294,7 +2341,7 @@ func TestDefaultPlanner_PlanOptimize_Tombstones(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(data)), + newFakeFileStore(withFileStats(t, data)), tsdb.DefaultCompactFullWriteColdDuration, ) @@ -2345,7 +2392,7 @@ func TestDefaultPlanner_Plan_FullOnCold(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(data)), + newFakeFileStore(withFileStats(t, data)), time.Nanosecond, ) @@ -2378,7 +2425,7 @@ func TestDefaultPlanner_Plan_SkipMaxSizeFiles(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(data)), + newFakeFileStore(withFileStats(t, data)), tsdb.DefaultCompactFullWriteColdDuration, ) @@ -2412,7 +2459,7 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) { }, } - ffs := newFakeFileStore(withFileStats(testSet), withDefaultBlockCount(tsdb.DefaultMaxPointsPerBlock)) + ffs := newFakeFileStore(withFileStats(t, testSet), withDefaultBlockCount(tsdb.DefaultMaxPointsPerBlock)) cp := tsm1.NewDefaultPlanner(ffs, time.Nanosecond) @@ -2449,7 +2496,7 @@ func TestDefaultPlanner_Plan_SkipPlanningAfterFull(t *testing.T) { }, } - overFs := newFakeFileStore(withFileStats(over), withDefaultBlockCount(tsdb.DefaultMaxPointsPerBlock)) + overFs := newFakeFileStore(withFileStats(t, over), withDefaultBlockCount(tsdb.DefaultMaxPointsPerBlock)) cp.FileStore = overFs plan, pLen = cp.Plan(time.Now().Add(-time.Second)) @@ -2528,7 +2575,7 @@ func TestDefaultPlanner_Plan_TwoGenLevel3(t *testing.T) { }, } - fs := newFakeFileStore(withFileStats(data), withDefaultBlockCount(tsdb.DefaultMaxPointsPerBlock)) + fs := newFakeFileStore(withFileStats(t, data), withDefaultBlockCount(tsdb.DefaultMaxPointsPerBlock)) cp := tsm1.NewDefaultPlanner(fs, time.Hour) @@ -2562,7 +2609,7 @@ func TestDefaultPlanner_Plan_NotFullOverMaxsize(t *testing.T) { }, } - ffs := newFakeFileStore(withFileStats(testSet), withDefaultBlockCount(100)) + ffs := newFakeFileStore(withFileStats(t, testSet), withDefaultBlockCount(100)) cp := tsm1.NewDefaultPlanner( ffs, @@ -2590,7 +2637,7 @@ func TestDefaultPlanner_Plan_NotFullOverMaxsize(t *testing.T) { }, } - overFs := newFakeFileStore(withFileStats(over), withDefaultBlockCount(100)) + overFs := newFakeFileStore(withFileStats(t, over), withDefaultBlockCount(100)) cp.FileStore = overFs cGroups, pLen := cp.Plan(time.Now().Add(-time.Second)) @@ -2628,7 +2675,7 @@ func TestDefaultPlanner_Plan_CompactsMiddleSteps(t *testing.T) { } cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats(data)), + newFakeFileStore(withFileStats(t, data)), tsdb.DefaultCompactFullWriteColdDuration, ) @@ -2649,7 +2696,7 @@ func TestDefaultPlanner_Plan_CompactsMiddleSteps(t *testing.T) { func TestDefaultPlanner_Plan_LargeGeneration(t *testing.T) { cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats( + newFakeFileStore(withFileStats(t, []tsm1.FileStat{ { Path: "000000278-000000006.tsm", @@ -2685,7 +2732,7 @@ func TestDefaultPlanner_Plan_LargeGeneration(t *testing.T) { func TestDefaultPlanner_Plan_ForceFull(t *testing.T) { cp := tsm1.NewDefaultPlanner( - newFakeFileStore(withFileStats( + newFakeFileStore(withFileStats(t, []tsm1.FileStat{ { Path: "000000001-000000001.tsm", @@ -2871,7 +2918,7 @@ func TestIsGroupOptimized(t *testing.T) { }, } - ffs := newFakeFileStore(withExtFileStats(testSet)) + ffs := newFakeFileStore(withExtFileStats(t, testSet)) cp := tsm1.NewDefaultPlanner(ffs, tsdb.DefaultCompactFullWriteColdDuration) e := MustOpenEngine(t, tsi1.IndexName) @@ -4497,7 +4544,7 @@ func TestEnginePlanCompactions(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - ffs := newFakeFileStore(withExtFileStats(test.files), withDefaultBlockCount(test.defaultBlockCount)) + ffs := newFakeFileStore(withExtFileStats(t, test.files), withDefaultBlockCount(test.defaultBlockCount)) cp := tsm1.NewDefaultPlanner(ffs, test.testShardTime) e.MaxPointsPerBlock = tsdb.DefaultMaxPointsPerBlock @@ -4510,10 +4557,10 @@ func TestEnginePlanCompactions(t *testing.T) { e.Scheduler.SetDepth(1, mockGroupLen) e.Scheduler.SetDepth(2, mockGroupLen) - // Normally this is called within PlanCompactions but because we want to simulate already running - // some compactions we will set them manually here. - e.Scheduler.SetActive(1, int64(mockGroupLen)) - e.Scheduler.SetActive(2, int64(mockGroupLen)) + // Normally this is called within PlanCompactions but because we want to simulate already running + // some compactions we will set them manually here. + e.Scheduler.SetActive(1, int64(mockGroupLen)) + e.Scheduler.SetActive(2, int64(mockGroupLen)) // Plan and check results. level1Groups, level2Groups, Level3Groups, Level4Groups, Level5Groups := e.PlanCompactions() @@ -4616,17 +4663,17 @@ func MustWriteTSM(tb testing.TB, dir string, gen int, values map[string][]tsm1.V return name } -func MustTSMReader(tb testing.TB, dir string, gen int, values map[string][]tsm1.Value) *tsm1.TSMReader { - return MustOpenTSMReader(MustWriteTSM(tb, dir, gen, values)) +func MustTSMReader(t testing.TB, dir string, gen int, values map[string][]tsm1.Value, options ...tsm1.TsmReaderOption) *tsm1.TSMReader { + return MustOpenTSMReader(MustWriteTSM(t, dir, gen, values), options...) } -func MustOpenTSMReader(name string) *tsm1.TSMReader { +func MustOpenTSMReader(name string, options ...tsm1.TsmReaderOption) *tsm1.TSMReader { f, err := os.Open(name) if err != nil { panic(fmt.Sprintf("open file: %v", err)) } - r, err := tsm1.NewTSMReader(f) + r, err := tsm1.NewTSMReader(f, options...) if err != nil { panic(fmt.Sprintf("new reader: %v", err)) } @@ -4654,19 +4701,28 @@ func (w *fakeFileStore) LastModified() time.Time { } func (w *fakeFileStore) TSMReader(path string) (*tsm1.TSMReader, error) { - r := MustOpenTSMReader(path) + r := MustOpenTSMReader(path, tsm1.WithParseFileNameFunc(w.ParseFileName)) w.readers = append(w.readers, r) r.Ref() return r, nil } -func (w *fakeFileStore) Close() { +func (w *fakeFileStore) Close() error { for _, r := range w.readers { - r.Close() + err := r.Close() + if err != nil { + return err + } } w.readers = nil + return nil } func (w *fakeFileStore) ParseFileName(path string) (int, int, error) { return tsm1.DefaultParseFileName(path) } + +func (w *fakeFileStore) SupportsCompactionPlanning() bool { + // Our ParseFileName is hard-coded to always use default. + return true +} diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index f81abfa45ac..58262eec418 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -272,7 +272,6 @@ func (e *Engine) WithFormatFileNameFunc(formatFileNameFunc FormatFileNameFunc) { func (e *Engine) WithParseFileNameFunc(parseFileNameFunc ParseFileNameFunc) { e.FileStore.WithParseFileNameFunc(parseFileNameFunc) - e.Compactor.WithParseFileNameFunc(parseFileNameFunc) } // Digest returns a reader for the shard's digest. diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index 50e407504b6..758d2318e53 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -195,11 +195,11 @@ type FileStore struct { copyFiles bool + readerOptions []TsmReaderOption + // newReaderBlockCount keeps track of the current new reader block requests. // If non-zero, no new TSMReader objects may be created. newReaderBlockCount int - - readerOptions []tsmReaderOption } // FileStat holds information about a TSM file on disk. @@ -210,6 +210,8 @@ type FileStat struct { LastModified int64 MinTime, MaxTime int64 MinKey, MaxKey []byte + Generation int + Sequence int } type FileStats []FileStat @@ -263,7 +265,7 @@ func (f FileStat) ContainsKey(key []byte) bool { } // NewFileStore returns a new instance of FileStore based on the given directory. -func NewFileStore(dir string, tags tsdb.EngineTags, options ...tsmReaderOption) *FileStore { +func NewFileStore(dir string, tags tsdb.EngineTags, options ...TsmReaderOption) *FileStore { logger := zap.NewNop() fs := &FileStore{ dir: dir, @@ -279,12 +281,17 @@ func NewFileStore(dir string, tags tsdb.EngineTags, options ...tsmReaderOption) obs: noFileStoreObserver{}, parseFileName: DefaultParseFileName, copyFiles: runtime.GOOS == "windows", - readerOptions: options, + readerOptions: append([]TsmReaderOption{WithParseFileNameFunc(DefaultParseFileName)}, options...), } fs.purger.fileStore = fs return fs } +// SupportsCompactionPlanning returns true if f supports all functionality needed for compaction planning. +func (f *FileStore) SupportsCompactionPlanning() bool { + return f.parseFileName != nil +} + // WithObserver sets the observer for the file store. func (f *FileStore) WithObserver(obs tsdb.FileStoreObserver) { f.obs = obs @@ -292,9 +299,13 @@ func (f *FileStore) WithObserver(obs tsdb.FileStoreObserver) { func (f *FileStore) WithParseFileNameFunc(parseFileNameFunc ParseFileNameFunc) { f.parseFileName = parseFileNameFunc + f.readerOptions = append(f.readerOptions, WithParseFileNameFunc(f.parseFileName)) } func (f *FileStore) ParseFileName(path string) (int, int, error) { + if f == nil || f.parseFileName == nil { + return 0, 0, fmt.Errorf("failed parsing %s: file store is not initialized", path) + } return f.parseFileName(path) } @@ -620,7 +631,7 @@ func (f *FileStore) Open(ctx context.Context) error { readerC := make(chan *res) for i, fn := range files { // Keep track of the latest ID - generation, _, err := f.parseFileName(fn) + generation, _, err := f.ParseFileName(fn) if err != nil { return fmt.Errorf("error parsing %q in FileStore.Open: %w", fn, err) } @@ -1376,12 +1387,12 @@ func DefaultParseFileName(name string) (int, int, error) { generation, err := strconv.ParseUint(id[:idx], 10, 32) if err != nil { - return 0, 0, fmt.Errorf("file %s is named incorrectly", name) + return 0, 0, fmt.Errorf("cannot parse generation number; file %s is named incorrectly: %w", name, err) } sequence, err := strconv.ParseUint(id[idx+1:], 10, 32) if err != nil { - return 0, 0, fmt.Errorf("file %s is named incorrectly", name) + return 0, 0, fmt.Errorf("cannot parse sequence number; file %s is named incorrectly: %w", name, err) } return int(generation), int(sequence), nil diff --git a/tsdb/engine/tsm1/file_store_internal_test.go b/tsdb/engine/tsm1/file_store_internal_test.go index 8f0a1485b94..fbabb790708 100644 --- a/tsdb/engine/tsm1/file_store_internal_test.go +++ b/tsdb/engine/tsm1/file_store_internal_test.go @@ -4,7 +4,7 @@ import ( "github.com/influxdata/influxdb/v2/tsdb" ) -var TestMmapInitFailOption = func(err error) tsmReaderOption { +var TestMmapInitFailOption = func(err error) TsmReaderOption { return func(r *TSMReader) { r.accessor = &badBlockAccessor{error: err} } diff --git a/tsdb/engine/tsm1/reader.go b/tsdb/engine/tsm1/reader.go index e25b26b7660..89f7089c0dd 100644 --- a/tsdb/engine/tsm1/reader.go +++ b/tsdb/engine/tsm1/reader.go @@ -62,6 +62,10 @@ type TSMReader struct { deleteMu sync.Mutex firstBlockCountCache blockCountCache + + parseFileNameFunc ParseFileNameFunc + generation int + sequence int } // TSMIndex represent the index section of a TSM file. The index records all @@ -221,18 +225,24 @@ func (b *BlockIterator) Err() error { return b.err } -type tsmReaderOption func(*TSMReader) +type TsmReaderOption func(*TSMReader) // WithMadviseWillNeed is an option for specifying whether to provide a MADV_WILL need hint to the kernel. -var WithMadviseWillNeed = func(willNeed bool) tsmReaderOption { +var WithMadviseWillNeed = func(willNeed bool) TsmReaderOption { return func(r *TSMReader) { r.madviseWillNeed = willNeed } } -// TODO(DSB) - add a tsmReaderOption in a test call that has the mmmapAccessor mock a failure +var WithParseFileNameFunc = func(f ParseFileNameFunc) TsmReaderOption { + return func(r *TSMReader) { + r.parseFileNameFunc = f + } +} + +// TODO(DSB) - add a TsmReaderOption in a test call that has the mmmapAccessor mock a failure // NewTSMReader returns a new TSMReader from the given file. -func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) { +func NewTSMReader(f *os.File, options ...TsmReaderOption) (*TSMReader, error) { t := &TSMReader{} for _, option := range options { option(t) @@ -264,6 +274,11 @@ func NewTSMReader(f *os.File, options ...tsmReaderOption) (*TSMReader, error) { return nil, err } + err = t.parseAndCacheFileName(t.Path()) + if nil != err { + return nil, fmt.Errorf("failed creating new TSM reader: %w", err) + } + return t, nil } @@ -422,7 +437,27 @@ func (t *TSMReader) Remove() error { func (t *TSMReader) Rename(path string) error { t.mu.Lock() defer t.mu.Unlock() - return t.accessor.rename(path) + + if err := t.accessor.rename(path); err != nil { + return fmt.Errorf("failure renaming to %q: %w", path, err) + } + + if err := t.parseAndCacheFileName(path); err != nil { + return fmt.Errorf("rename failed: %w", err) + } + return nil +} + +func (t *TSMReader) parseAndCacheFileName(path string) error { + if t.parseFileNameFunc != nil { + var err error + t.generation, t.sequence, err = t.parseFileNameFunc(path) + return err + } else { + // If parseFileNameFunc is nil, we are in a test or another TSMReader use + // that does not involve compaction planning, and was not created by the FileStore + return nil + } } // Remove removes any underlying files stored on disk for this reader. @@ -632,6 +667,7 @@ func (t *TSMReader) firstBlockCount() (int, error) { func (t *TSMReader) Stats() FileStat { minTime, maxTime := t.index.TimeRange() minKey, maxKey := t.index.KeyRange() + return FileStat{ Path: t.Path(), Size: t.Size(), @@ -641,6 +677,8 @@ func (t *TSMReader) Stats() FileStat { MinKey: minKey, MaxKey: maxKey, HasTombstone: t.tombstoner.HasTombstones(), + Generation: t.generation, + Sequence: t.sequence, } }