From cda0ef4445a7f23eaeba1d959a4dab1bbb354821 Mon Sep 17 00:00:00 2001 From: davidby-influx Date: Fri, 14 Feb 2025 22:03:11 -0800 Subject: [PATCH 1/2] chore: refactor field creation for maintainability --- tsdb/engine/tsm1/engine.go | 2 +- tsdb/engine/tsm1/engine_test.go | 32 +++++++-------- tsdb/field_validator.go | 2 +- tsdb/shard.go | 69 +++++++++++++-------------------- tsdb/shard_test.go | 10 ++--- 5 files changed, 51 insertions(+), 64 deletions(-) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index b22e76cff45..827eb506cf1 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -1290,7 +1290,7 @@ func (e *Engine) addToIndexFromKey(keys [][]byte, fieldTypes []influxql.DataType keys[i], field = SeriesAndFieldFromCompositeKey(keys[i]) name := models.ParseName(keys[i]) mf := e.fieldset.CreateFieldsIfNotExists(name) - if _, err := mf.CreateFieldIfNotExists(field, fieldTypes[i]); err != nil { + if _, err := mf.CreateFieldIfNotExists(string(field), fieldTypes[i]); err != nil { return err } diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index ae7cc19a330..41b030ebfe3 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -82,7 +82,7 @@ func TestEngine_DeleteSeriesAfterCacheSnapshot(t *testing.T) { t.Fatalf("failed to write points: %s", err.Error()) } - e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) + e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float) e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}), notrack) e.CreateSeriesIfNotExists([]byte("cpu,host=B"), []byte("cpu"), models.NewTags(map[string]string{"host": "B"}), notrack) @@ -771,7 +771,7 @@ func TestEngine_CreateIterator_Cache_Ascending(t *testing.T) { e := MustOpenEngine(index) defer e.Close() - e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) + e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float) e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}), notrack) if err := e.WritePointsString( @@ -828,7 +828,7 @@ func TestEngine_CreateIterator_Cache_Descending(t *testing.T) { e := MustOpenEngine(index) defer e.Close() - e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) + e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float) e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}), notrack) if err := e.WritePointsString( @@ -884,7 +884,7 @@ func TestEngine_CreateIterator_TSM_Ascending(t *testing.T) { e := MustOpenEngine(index) defer e.Close() - e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) + e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float) e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}), notrack) if err := e.WritePointsString( @@ -942,7 +942,7 @@ func TestEngine_CreateIterator_TSM_Descending(t *testing.T) { e := MustOpenEngine(index) defer e.Close() - e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) + e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float) e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}), notrack) if err := e.WritePointsString( @@ -1000,8 +1000,8 @@ func TestEngine_CreateIterator_Aux(t *testing.T) { e := MustOpenEngine(index) defer e.Close() - e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) - e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("F"), influxql.Float) + e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float) + e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("F", influxql.Float) e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}), notrack) if err := e.WritePointsString( @@ -1060,9 +1060,9 @@ func TestEngine_CreateIterator_Condition(t *testing.T) { e := MustOpenEngine(index) defer e.Close() - e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) - e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("X"), influxql.Float) - e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("Y"), influxql.Float) + e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float) + e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("X", influxql.Float) + e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("Y", influxql.Float) e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}), notrack) e.SetFieldName([]byte("cpu"), "X") e.SetFieldName([]byte("cpu"), "Y") @@ -1936,7 +1936,7 @@ func TestEngine_CreateCursor_Ascending(t *testing.T) { e := MustOpenEngine(index) defer e.Close() - e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) + e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float) e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}), notrack) if err := e.WritePointsString( @@ -1996,7 +1996,7 @@ func TestEngine_CreateCursor_Descending(t *testing.T) { e := MustOpenEngine(index) defer e.Close() - e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) + e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float) e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}), notrack) if err := e.WritePointsString( @@ -2056,7 +2056,7 @@ func TestEngine_CreateIterator_SeriesKey(t *testing.T) { e := MustOpenEngine(index) defer e.Close() - e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) + e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float) e.CreateSeriesIfNotExists([]byte("cpu,host=A,region=east"), []byte("cpu"), models.NewTags(map[string]string{"host": "A", "region": "east"}), notrack) e.CreateSeriesIfNotExists([]byte("cpu,host=B,region=east"), []byte("cpu"), models.NewTags(map[string]string{"host": "B", "region": "east"}), notrack) e.CreateSeriesIfNotExists([]byte("cpu,host=C,region=east"), []byte("cpu"), models.NewTags(map[string]string{"host": "C", "region": "east"}), notrack) @@ -2364,7 +2364,7 @@ func BenchmarkEngine_WritePoints(b *testing.B) { for _, sz := range batchSizes { for _, index := range tsdb.RegisteredIndexes() { e := MustOpenEngine(index) - e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) + e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float) pp := make([]models.Point, 0, sz) for i := 0; i < sz; i++ { p := MustParsePointString(fmt.Sprintf("cpu,host=%d value=1.2", i)) @@ -2390,7 +2390,7 @@ func BenchmarkEngine_WritePoints_Parallel(b *testing.B) { for _, sz := range batchSizes { for _, index := range tsdb.RegisteredIndexes() { e := MustOpenEngine(index) - e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) + e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float) cpus := runtime.GOMAXPROCS(0) pp := make([]models.Point, 0, sz*cpus) @@ -2623,7 +2623,7 @@ func MustInitDefaultBenchmarkEngine(name string, pointN int) *benchmarkEngine { e := MustOpenEngine(tsdb.DefaultIndex) // Initialize metadata. - e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float) + e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float) e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}), notrack) // Generate time ascending points with jitterred time & value. diff --git a/tsdb/field_validator.go b/tsdb/field_validator.go index 887ed9d36e7..306b37d199c 100644 --- a/tsdb/field_validator.go +++ b/tsdb/field_validator.go @@ -47,7 +47,7 @@ func ValidateFields(mf *MeasurementFields, point models.Point, skipSizeValidatio } // If the field is not present, remember to create it. - f := mf.FieldBytes(fieldKey) + f := mf.Field(string(fieldKey)) if f == nil { fieldsToCreate = append(fieldsToCreate, &FieldCreate{ Measurement: point.Name(), diff --git a/tsdb/shard.go b/tsdb/shard.go index 5737cc1d0f7..688cdfdb7f1 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -703,36 +703,31 @@ func (s *Shard) validateSeriesAndFields(points []models.Point, tracker StatsTrac continue } - err := func(p models.Point, iter models.FieldIterator) error { - var newFields []*FieldCreate - var validateErr error - name := p.Name() - mf := engine.MeasurementFields(name) - // Check with the field validator. - if newFields, validateErr = ValidateFields(mf, p, s.options.Config.SkipFieldSizeValidation); validateErr != nil { - var err PartialWriteError - switch { - case errors.As(validateErr, &err): - // This will turn into an error later, outside this lambda - if reason == "" { - reason = err.Reason - } - dropped += err.Dropped - atomic.AddInt64(&s.stats.WritePointsDropped, int64(err.Dropped)) - default: - return err + var newFields []*FieldCreate + var validateErr error + name := p.Name() + mf := engine.MeasurementFields(name) + // Check with the field validator. + if newFields, validateErr = ValidateFields(mf, p, s.options.Config.SkipFieldSizeValidation); validateErr != nil { + var err PartialWriteError + switch { + case errors.As(validateErr, &err): + // This will turn into an error later, outside this lambda + if reason == "" { + reason = err.Reason } - return nil + dropped += err.Dropped + atomic.AddInt64(&s.stats.WritePointsDropped, int64(err.Dropped)) + continue + default: + // Return validateErr, because err will be nil here + return nil, nil, validateErr } - - points[j] = points[i] - j++ - fieldsToCreate = append(fieldsToCreate, newFields...) - return nil - }(p, iter) - if err != nil { - return nil, nil, err } + + points[j] = points[i] + j++ + fieldsToCreate = append(fieldsToCreate, newFields...) } if dropped > 0 { err = PartialWriteError{Reason: reason, Dropped: dropped, Database: s.database, RetentionPolicy: s.retentionPolicy} @@ -777,7 +772,7 @@ func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) (int, changes := make([]*FieldChange, 0, len(fieldsToCreate)) for _, f := range fieldsToCreate { mf := engine.MeasurementFields(f.Measurement) - if created, err := mf.CreateFieldIfNotExists([]byte(f.Field.Name), f.Field.Type); err != nil { + if created, err := mf.CreateFieldIfNotExists(f.Field.Name, f.Field.Type); err != nil { return 0, err } else if created { numCreated++ @@ -1594,9 +1589,9 @@ func (m *MeasurementFields) bytes() int { // CreateFieldIfNotExists creates a new field with the given name and type. // Returns an error if the field already exists with a different type. -func (m *MeasurementFields) CreateFieldIfNotExists(name []byte, typ influxql.DataType) (bool, error) { +func (m *MeasurementFields) CreateFieldIfNotExists(name string, typ influxql.DataType) (bool, error) { newField := &Field{ - Name: string(name), + Name: name, Type: typ, } if f, loaded := m.fields.LoadOrStore(newField.Name, newField); loaded { @@ -1626,14 +1621,6 @@ func (m *MeasurementFields) HasField(name string) bool { return ok } -// FieldBytes returns the field for name, or nil if there is no field for name. -// FieldBytes should be preferred to Field when the caller has a []byte, because -// it avoids a string allocation, which can't be avoided if the caller converts -// the []byte to a string and calls Field. -func (m *MeasurementFields) FieldBytes(name []byte) *Field { - return m.Field(string(name)) -} - // FieldSet returns the set of fields and their types for the measurement. func (m *MeasurementFields) FieldSet() map[string]influxql.DataType { fieldTypes := make(map[string]influxql.DataType) @@ -2271,7 +2258,7 @@ func (fs *MeasurementFieldSet) ApplyChanges() error { fs.Delete(string(fc.Measurement)) } else { mf := fs.CreateFieldsIfNotExists(fc.Measurement) - if _, err := mf.CreateFieldIfNotExists([]byte(fc.Field.Name), fc.Field.Type); err != nil { + if _, err := mf.CreateFieldIfNotExists(fc.Field.Name, fc.Field.Type); err != nil { err = fmt.Errorf("failed creating %q.%q: %w", fc.Measurement, fc.Field.Name, err) log.Error("field creation", zap.Error(err)) return err @@ -2284,8 +2271,8 @@ func (fs *MeasurementFieldSet) ApplyChanges() error { // Field represents a series field. All of the fields must be hashable. type Field struct { - Name string `json:"name,omitempty"` - Type influxql.DataType `json:"type,omitempty"` + Name string + Type influxql.DataType } type FieldChange struct { diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index 13c56e4aa7b..c0c483b70f1 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -1691,7 +1691,7 @@ func TestMeasurementFieldSet_SaveLoad(t *testing.T) { } defer checkMeasurementFieldSetClose(t, mf) fields := mf.CreateFieldsIfNotExists([]byte(measurement)) - if _, err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil { + if _, err := fields.CreateFieldIfNotExists(fieldName, influxql.Float); err != nil { t.Fatalf("create field error: %v", err) } change := tsdb.FieldChange{ @@ -1743,7 +1743,7 @@ func TestMeasurementFieldSet_Corrupt(t *testing.T) { measurement := []byte("cpu") fields := mf.CreateFieldsIfNotExists(measurement) fieldName := "value" - if _, err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil { + if _, err := fields.CreateFieldIfNotExists(fieldName, influxql.Float); err != nil { t.Fatalf("create field error: %v", err) } change := tsdb.FieldChange{ @@ -1814,7 +1814,7 @@ func TestMeasurementFieldSet_CorruptChangeFile(t *testing.T) { defer checkMeasurementFieldSetClose(t, mf) for _, f := range testFields { fields := mf.CreateFieldsIfNotExists([]byte(f.Measurement)) - if _, err := fields.CreateFieldIfNotExists([]byte(f.Field), f.FieldType); err != nil { + if _, err := fields.CreateFieldIfNotExists(f.Field, f.FieldType); err != nil { t.Fatalf("create field error: %v", err) } change := tsdb.FieldChange{ @@ -1876,7 +1876,7 @@ func TestMeasurementFieldSet_DeleteEmpty(t *testing.T) { defer checkMeasurementFieldSetClose(t, mf) fields := mf.CreateFieldsIfNotExists([]byte(measurement)) - if _, err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil { + if _, err := fields.CreateFieldIfNotExists(fieldName, influxql.Float); err != nil { t.Fatalf("create field error: %v", err) } @@ -2009,7 +2009,7 @@ func testFieldMaker(t *testing.T, wg *sync.WaitGroup, mf *tsdb.MeasurementFieldS fields := mf.CreateFieldsIfNotExists([]byte(measurement)) for _, fieldName := range fieldNames { - if _, err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil { + if _, err := fields.CreateFieldIfNotExists(fieldName, influxql.Float); err != nil { t.Logf("create field error: %v", err) t.Fail() return From 89b4b9720d66b8c1a95dfa209c8663dea80a84d1 Mon Sep 17 00:00:00 2001 From: davidby-influx Date: Fri, 14 Feb 2025 22:40:40 -0800 Subject: [PATCH 2/2] chore: avoid an unnecessary allocation on field creation --- tsdb/field_validator.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tsdb/field_validator.go b/tsdb/field_validator.go index 306b37d199c..9bbd05e70ed 100644 --- a/tsdb/field_validator.go +++ b/tsdb/field_validator.go @@ -47,12 +47,13 @@ func ValidateFields(mf *MeasurementFields, point models.Point, skipSizeValidatio } // If the field is not present, remember to create it. - f := mf.Field(string(fieldKey)) + fieldName := string(fieldKey) + f := mf.Field(fieldName) if f == nil { fieldsToCreate = append(fieldsToCreate, &FieldCreate{ Measurement: point.Name(), Field: &Field{ - Name: string(fieldKey), + Name: fieldName, Type: dataType, }}) } else if f.Type != dataType { @@ -60,7 +61,7 @@ func ValidateFields(mf *MeasurementFields, point models.Point, skipSizeValidatio return nil, PartialWriteError{ Reason: fmt.Sprintf( "%s: input field \"%s\" on measurement \"%s\" is type %s, already exists as type %s", - ErrFieldTypeConflict, fieldKey, point.Name(), dataType, f.Type), + ErrFieldTypeConflict, fieldName, point.Name(), dataType, f.Type), Dropped: 1, } }