diff --git a/cmd/influx_inspect/verify/seriesfile/verify_test.go b/cmd/influx_inspect/verify/seriesfile/verify_test.go index 0692a0419e0..6f05632683c 100644 --- a/cmd/influx_inspect/verify/seriesfile/verify_test.go +++ b/cmd/influx_inspect/verify/seriesfile/verify_test.go @@ -107,7 +107,7 @@ func NewTest(t *testing.T) *Test { } // delete one series - if err := seriesFile.DeleteSeriesID(ids[0]); err != nil { + if _, err := seriesFile.DeleteSeriesID(ids[0], tsdb.Flush); err != nil { return err } diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index f03255aede5..b7798d36247 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -1852,12 +1852,15 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error { // Remove the remaining ids from the series file as they no longer exist // in any shard. var err error + var partitionIDs = make(map[int]struct{}, tsdb.SeriesFilePartitionN) ids.ForEach(func(id uint64) { name, tags := e.sfile.Series(id) - if err1 := e.sfile.DeleteSeriesID(id); err1 != nil { + part, err1 := e.sfile.DeleteSeriesID(id, tsdb.NoFlush) + if err1 != nil { err = err1 return } + partitionIDs[part.ID()] = struct{}{} // In the case of the inmem index the series can be removed across // the global index (all shards). @@ -1871,6 +1874,13 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error { if err != nil { return err } + + if err := e.sfile.FlushSegments(partitionIDs); err != nil { + e.sfile.Logger.Error( + "error while flushing a series file segment", + zap.String("series_file_path", e.sfile.Path()), + zap.Error(err)) + } } return nil diff --git a/tsdb/series_file.go b/tsdb/series_file.go index eca08cc3863..72847afabfa 100644 --- a/tsdb/series_file.go +++ b/tsdb/series_file.go @@ -30,6 +30,10 @@ const SeriesIDSize = 8 const ( // SeriesFilePartitionN is the number of partitions a series file is split into. SeriesFilePartitionN = 8 + // Flush lets us know when to fsync + Flush = true + // NoFlush lets us know when to not fsync + NoFlush = false ) // SeriesFile represents the section of the index that holds series data. @@ -194,13 +198,43 @@ func (f *SeriesFile) CreateSeriesListIfNotExists(names [][]byte, tagsSlice []mod } // DeleteSeriesID flags a series as permanently deleted. -// If the series is reintroduced later then it must create a new id. -func (f *SeriesFile) DeleteSeriesID(id uint64) error { +// If the series is reintroduced later than it must create a new id. +// Setting flush will indicate whether this method triggers a fsync. +func (f *SeriesFile) DeleteSeriesID(id uint64, flush bool) (*SeriesPartition, error) { p := f.SeriesIDPartition(id) if p == nil { - return ErrInvalidSeriesPartitionID + return nil, ErrInvalidSeriesPartitionID } - return p.DeleteSeriesID(id) + return p, p.DeleteSeriesID(id, flush) +} + +func (f *SeriesFile) FlushSegments(partitionIDs map[int]struct{}) error { + var wg sync.WaitGroup + errCh := make(chan error, SeriesFilePartitionN) + + for id := range partitionIDs { + wg.Add(1) + p := f.partitions[id] + go func() { + defer wg.Done() + p.mu.Lock() + defer p.mu.Unlock() + if segment := p.activeSegment(); segment != nil { + if err := segment.Flush(); err != nil { + errCh <- fmt.Errorf("unable to flush segment %s: %w", segment.file.Name(), err) + } + } + }() + } + + wg.Wait() + close(errCh) + + var errs = make([]error, 0, SeriesFilePartitionN) + for err := range errCh { + errs = append(errs, err) + } + return errors.Join(errs...) } // IsDeleted returns true if the ID has been deleted before. diff --git a/tsdb/series_file_test.go b/tsdb/series_file_test.go index e13a79f1429..d05df3d497e 100644 --- a/tsdb/series_file_test.go +++ b/tsdb/series_file_test.go @@ -10,6 +10,7 @@ import ( "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/tsdb" + "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) @@ -191,38 +192,59 @@ func TestSeriesFileCompactor(t *testing.T) { // Ensure series file deletions persist across compactions. func TestSeriesFile_DeleteSeriesID(t *testing.T) { - sfile := MustOpenSeriesFile() - defer sfile.Close() - - ids0, err := sfile.CreateSeriesListIfNotExists([][]byte{[]byte("m1")}, []models.Tags{nil}, tsdb.NoopStatsTracker()) - if err != nil { - t.Fatal(err) - } else if _, err := sfile.CreateSeriesListIfNotExists([][]byte{[]byte("m2")}, []models.Tags{nil}, tsdb.NoopStatsTracker()); err != nil { - t.Fatal(err) - } else if err := sfile.ForceCompact(); err != nil { - t.Fatal(err) + deleteTestFn := func(flush bool) { + sfile := MustOpenSeriesFile() + defer func(sfile *SeriesFile) { + err := sfile.Close() + require.NoError(t, err, "close sfile") + }(sfile) + + ids0, err := sfile.CreateSeriesListIfNotExists([][]byte{[]byte("m1")}, []models.Tags{nil}, tsdb.NoopStatsTracker()) + require.NoError(t, err, "create series list") + _, err = sfile.CreateSeriesListIfNotExists([][]byte{[]byte("m2")}, []models.Tags{nil}, tsdb.NoopStatsTracker()) + require.NoError(t, err, "create series list") + err = sfile.ForceCompact() + require.NoError(t, err, "force compact") + + // Delete and ensure deletion. + _, err = sfile.DeleteSeriesID(ids0[0], flush) + require.NoError(t, err, "delete series list") + _, err = sfile.CreateSeriesListIfNotExists([][]byte{[]byte("m1")}, []models.Tags{nil}, tsdb.NoopStatsTracker()) + require.NoError(t, err, "create series list") + require.True(t, sfile.IsDeleted(ids0[0]), "expected deleted") + + err = sfile.ForceCompact() + require.NoError(t, err, "force compact") + require.True(t, sfile.IsDeleted(ids0[0]), "expected deleted") + + err = sfile.Reopen() + require.NoError(t, err, "reopen") + require.True(t, sfile.IsDeleted(ids0[0]), "expected deleted") } - // Delete and ensure deletion. - if err := sfile.DeleteSeriesID(ids0[0]); err != nil { - t.Fatal(err) - } else if _, err := sfile.CreateSeriesListIfNotExists([][]byte{[]byte("m1")}, []models.Tags{nil}, tsdb.NoopStatsTracker()); err != nil { - t.Fatal(err) - } else if !sfile.IsDeleted(ids0[0]) { - t.Fatal("expected deletion before compaction") + tests := []struct { + name string + fn func() + }{{ + name: "delete series with flush", + fn: func() { + deleteTestFn(tsdb.Flush) + }, + }, + { + name: "delete series with no flush", + fn: func() { + deleteTestFn(tsdb.NoFlush) + }, + }, } - if err := sfile.ForceCompact(); err != nil { - t.Fatal(err) - } else if !sfile.IsDeleted(ids0[0]) { - t.Fatal("expected deletion after compaction") + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.fn() + }) } - if err := sfile.Reopen(); err != nil { - t.Fatal(err) - } else if !sfile.IsDeleted(ids0[0]) { - t.Fatal("expected deletion after reopen") - } } func TestSeriesFile_Compaction(t *testing.T) { @@ -246,7 +268,7 @@ func TestSeriesFile_Compaction(t *testing.T) { // Delete a subset of keys. for i, id := range ids { if i%10 == 0 { - if err := sfile.DeleteSeriesID(id); err != nil { + if _, err := sfile.DeleteSeriesID(id, tsdb.Flush); err != nil { t.Fatal(err) } } @@ -325,7 +347,7 @@ func BenchmarkSeriesFile_Compaction(b *testing.B) { // Delete a subset of keys. for i := 0; i < len(ids); i += 10 { - if err := sfile.DeleteSeriesID(ids[i]); err != nil { + if _, err := sfile.DeleteSeriesID(ids[i], tsdb.Flush); err != nil { b.Fatal(err) } } @@ -353,6 +375,37 @@ func BenchmarkSeriesFile_Compaction(b *testing.B) { } } +func TestSeriesFile_FlushSegments(t *testing.T) { + const SeriesN = 100 + sfile := MustOpenSeriesFile() + defer func() { + require.NoError(t, sfile.Close(), "series file close") + }() + + // Create some series to ensure there's data in the segments. + var names = make([][]byte, 0, SeriesN) + var tagsSlice = make([]models.Tags, 0, SeriesN) + for i := 0; i < SeriesN; i++ { + names = append(names, []byte(fmt.Sprintf("measurement%d", i))) + tagsSlice = append(tagsSlice, models.NewTags(map[string]string{"tag": "value"})) + } + ids, err := sfile.CreateSeriesListIfNotExists(names, tagsSlice, tsdb.NoopStatsTracker()) + require.NoError(t, err) + + // Collect all partition IDs that have series. + partitionIDs := make(map[int]struct{}, tsdb.SeriesFilePartitionN) + for _, id := range ids { + partitionIDs[sfile.SeriesIDPartitionID(id)] = struct{}{} + } + + // Flush the segments. + err = sfile.FlushSegments(partitionIDs) + require.NoError(t, err) + + // Verify series still exist after flush. + require.Equal(t, uint64(SeriesN), sfile.SeriesCount()) +} + // Series represents name/tagset pairs that are used in testing. type Series struct { Name []byte diff --git a/tsdb/series_partition.go b/tsdb/series_partition.go index b5288eeef46..2665539d008 100644 --- a/tsdb/series_partition.go +++ b/tsdb/series_partition.go @@ -278,7 +278,7 @@ func (p *SeriesPartition) CreateSeriesListIfNotExists(keys [][]byte, keyPartitio // Flush active segment writes so we can access data in mmap. if segment := p.activeSegment(); segment != nil { if err := segment.Flush(); err != nil { - return err + return fmt.Errorf("unable to flush segment %s: %w", segment.file.Name(), err) } } @@ -329,8 +329,9 @@ func (p *SeriesPartition) Compacting() bool { } // DeleteSeriesID flags a series as permanently deleted. -// If the series is reintroduced later then it must create a new id. -func (p *SeriesPartition) DeleteSeriesID(id uint64) error { +// If the series is reintroduced later than it must create a new id. +// Setting flush will indicate whether this method triggers a fsync. +func (p *SeriesPartition) DeleteSeriesID(id uint64, flush bool) error { p.mu.Lock() defer p.mu.Unlock() @@ -350,9 +351,11 @@ func (p *SeriesPartition) DeleteSeriesID(id uint64) error { } // Flush active segment write. - if segment := p.activeSegment(); segment != nil { - if err := segment.Flush(); err != nil { - return err + if flush { + if segment := p.activeSegment(); segment != nil { + if err := segment.Flush(); err != nil { + return fmt.Errorf("unable to flush segment %s: %w", segment.file.Name(), err) + } } } diff --git a/tsdb/store.go b/tsdb/store.go index d47e5b65c35..76ab00c1b97 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -1073,15 +1073,45 @@ func (s *Store) DeleteShard(shardID uint64) error { } } + const DeleteLogTrigger = 10_000 + seriesCount := ss.Cardinality() + deleteStart := time.Now() + var deletedCount atomic.Uint64 + var partitionIDs = make(map[int]struct{}, SeriesFilePartitionN) + ss.ForEach(func(id uint64) { - if err := sfile.DeleteSeriesID(id); err != nil { + p, err := sfile.DeleteSeriesID(id, NoFlush) + if err != nil { sfile.Logger.Error( "cannot delete series in shard", zap.Uint64("series_id", id), zap.Uint64("shard_id", shardID), + zap.String("series_file_path", sfile.Path()), zap.Error(err)) + } else { + partitionIDs[p.id] = struct{}{} + deleted := deletedCount.Add(1) + + if deleted%DeleteLogTrigger == 0 { + s.Logger.Info(fmt.Sprintf("DeleteShard: %d series deleted", DeleteLogTrigger), + zap.String("db", db), + zap.Uint64("shard_id", shardID), + zap.String("series_file_path", sfile.Path()), + zap.Uint64("deleted", deleted), + zap.Uint64("remaining", seriesCount-deleted), + zap.Uint64("total", seriesCount), + zap.Duration("elapsed", time.Since(deleteStart))) + } } }) + + if err := sfile.FlushSegments(partitionIDs); err != nil { + sfile.Logger.Error( + "error while flushing a series file segment", + zap.Uint64("shard_id", shardID), + zap.String("series_file_path", sfile.Path()), + zap.Error(err)) + } } }