diff --git a/pkg/data/gensyncmap/gensyncmap.go b/pkg/data/gensyncmap/gensyncmap.go new file mode 100644 index 00000000000..00b02b68cb5 --- /dev/null +++ b/pkg/data/gensyncmap/gensyncmap.go @@ -0,0 +1,47 @@ +package gensyncmap + +import "sync" + +type Map[K comparable, V any] struct { + m sync.Map +} + +func (m *Map[K, V]) Delete(key K) { + m.m.Delete(key) +} + +func (m *Map[K, V]) Load(key K) (value V, ok bool) { + v, ok := m.m.Load(key) + if !ok { + return value, ok + } + return v.(V), ok +} + +func (m *Map[K, V]) LoadAndDelete(key K) (value V, loaded bool) { + v, loaded := m.m.LoadAndDelete(key) + if !loaded { + return value, loaded + } + return v.(V), loaded +} + +func (m *Map[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) { + a, loaded := m.m.LoadOrStore(key, value) + return a.(V), loaded +} + +func (m *Map[K, V]) Range(f func(key K, value V) bool) { + m.m.Range(func(key, value any) bool { return f(key.(K), value.(V)) }) +} + +func (m *Map[K, V]) Store(key K, value V) { m.m.Store(key, value) } + +func (m *Map[K, V]) Len() int { + var n int + m.m.Range(func(_, _ any) bool { + n++ + return true + }) + return n +} diff --git a/tsdb/shard.go b/tsdb/shard.go index 6b6e9c4da79..5737cc1d0f7 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -22,6 +22,7 @@ import ( "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/bytesutil" + "github.com/influxdata/influxdb/pkg/data/gensyncmap" errors2 "github.com/influxdata/influxdb/pkg/errors" "github.com/influxdata/influxdb/pkg/estimator" "github.com/influxdata/influxdb/pkg/file" @@ -707,8 +708,6 @@ func (s *Shard) validateSeriesAndFields(points []models.Point, tracker StatsTrac var validateErr error name := p.Name() mf := engine.MeasurementFields(name) - mf.mu.RLock() - defer mf.mu.RUnlock() // Check with the field validator. if newFields, validateErr = ValidateFields(mf, p, s.options.Config.SkipFieldSizeValidation); validateErr != nil { var err PartialWriteError @@ -1562,25 +1561,21 @@ func (a Shards) ExpandSources(sources influxql.Sources) (influxql.Sources, error // MeasurementFields holds the fields of a measurement and their codec. type MeasurementFields struct { - mu sync.RWMutex - - fields atomic.Value // map[string]*Field + fields gensyncmap.Map[string, *Field] } // NewMeasurementFields returns an initialised *MeasurementFields value. func NewMeasurementFields() *MeasurementFields { - fields := make(map[string]*Field) - mf := &MeasurementFields{} - mf.fields.Store(fields) - return mf + return &MeasurementFields{fields: gensyncmap.Map[string, *Field]{}} } func (m *MeasurementFields) FieldKeys() []string { - fields := m.fields.Load().(map[string]*Field) - a := make([]string, 0, len(fields)) - for key := range fields { - a = append(a, key) - } + var a []string + m.fields.Range(func(k string, _ *Field) bool { + a = append(a, k) + return true + }) + sort.Strings(a) return a } @@ -1588,66 +1583,38 @@ func (m *MeasurementFields) FieldKeys() []string { // bytes estimates the memory footprint of this MeasurementFields, in bytes. func (m *MeasurementFields) bytes() int { var b int - b += 24 // mu RWMutex is 24 bytes - fields := m.fields.Load().(map[string]*Field) - b += int(unsafe.Sizeof(fields)) - for k, v := range fields { + b += int(unsafe.Sizeof(m.fields)) + m.fields.Range(func(k string, v *Field) bool { b += int(unsafe.Sizeof(k)) + len(k) b += int(unsafe.Sizeof(v)+unsafe.Sizeof(*v)) + len(v.Name) - } + return true + }) return b } -// CreateFieldIfNotExists creates a new field with an autoincrementing ID. -// Returns an error if 255 fields have already been created on the measurement or -// the fields already exists with a different type. +// CreateFieldIfNotExists creates a new field with the given name and type. +// Returns an error if the field already exists with a different type. func (m *MeasurementFields) CreateFieldIfNotExists(name []byte, typ influxql.DataType) (bool, error) { - fields := m.fields.Load().(map[string]*Field) - - // Ignore if the field already exists. - if f := fields[string(name)]; f != nil { - if f.Type != typ { - return false, ErrFieldTypeConflict - } - return false, nil + newField := &Field{ + Name: string(name), + Type: typ, } - - m.mu.Lock() - defer m.mu.Unlock() - - fields = m.fields.Load().(map[string]*Field) - // Re-check field and type under write lock. - if f := fields[string(name)]; f != nil { + if f, loaded := m.fields.LoadOrStore(newField.Name, newField); loaded { if f.Type != typ { return false, ErrFieldTypeConflict } return false, nil } - - fieldsUpdate := make(map[string]*Field, len(fields)+1) - for k, v := range fields { - fieldsUpdate[k] = v - } - // Create and append a new field. - f := &Field{ - ID: uint8(len(fields) + 1), - Name: string(name), - Type: typ, - } - fieldsUpdate[string(name)] = f - m.fields.Store(fieldsUpdate) - return true, nil } func (m *MeasurementFields) FieldN() int { - n := len(m.fields.Load().(map[string]*Field)) - return n + return m.fields.Len() } // Field returns the field for name, or nil if there is no field for name. func (m *MeasurementFields) Field(name string) *Field { - f := m.fields.Load().(map[string]*Field)[name] + f, _ := m.fields.Load(name) return f } @@ -1655,8 +1622,8 @@ func (m *MeasurementFields) HasField(name string) bool { if m == nil { return false } - f := m.fields.Load().(map[string]*Field)[name] - return f != nil + _, ok := m.fields.Load(name) + return ok } // FieldBytes returns the field for name, or nil if there is no field for name. @@ -1664,27 +1631,23 @@ func (m *MeasurementFields) HasField(name string) bool { // it avoids a string allocation, which can't be avoided if the caller converts // the []byte to a string and calls Field. func (m *MeasurementFields) FieldBytes(name []byte) *Field { - f := m.fields.Load().(map[string]*Field)[string(name)] - return f + return m.Field(string(name)) } // FieldSet returns the set of fields and their types for the measurement. func (m *MeasurementFields) FieldSet() map[string]influxql.DataType { - fields := m.fields.Load().(map[string]*Field) fieldTypes := make(map[string]influxql.DataType) - for name, f := range fields { - fieldTypes[name] = f.Type - } + m.fields.Range(func(k string, v *Field) bool { + fieldTypes[k] = v.Type + return true + }) return fieldTypes } func (m *MeasurementFields) ForEachField(fn func(name string, typ influxql.DataType) bool) { - fields := m.fields.Load().(map[string]*Field) - for name, f := range fields { - if !fn(name, f.Type) { - return - } - } + m.fields.Range(func(k string, v *Field) bool { + return fn(k, v.Type) + }) } type FieldChanges []*FieldChange @@ -2163,12 +2126,11 @@ func (fs *MeasurementFieldSet) load() (rErr error) { } fs.fields = make(map[string]*MeasurementFields, len(pb.GetMeasurements())) for _, measurement := range pb.GetMeasurements() { - fields := make(map[string]*Field, len(measurement.GetFields())) + set := NewMeasurementFields() for _, field := range measurement.GetFields() { - fields[string(field.GetName())] = &Field{Name: string(field.GetName()), Type: influxql.DataType(field.GetType())} + name := string(field.GetName()) + set.fields.Store(name, &Field{Name: name, Type: influxql.DataType(field.GetType())}) } - set := &MeasurementFields{} - set.fields.Store(fields) fs.fields[string(measurement.GetName())] = set } return nil @@ -2282,7 +2244,6 @@ func (fscm *measurementFieldSetChangeMgr) loadFieldChangeSet(r io.Reader) (Field FieldCreate: FieldCreate{ Measurement: fc.Measurement, Field: &Field{ - ID: 0, Name: string(fc.Field.Name), Type: influxql.DataType(fc.Field.Type), }, @@ -2323,7 +2284,6 @@ func (fs *MeasurementFieldSet) ApplyChanges() error { // Field represents a series field. All of the fields must be hashable. type Field struct { - ID uint8 `json:"id,omitempty"` Name string `json:"name,omitempty"` Type influxql.DataType `json:"type,omitempty"` } diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index 9add6af35e0..13c56e4aa7b 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -1697,7 +1697,7 @@ func TestMeasurementFieldSet_SaveLoad(t *testing.T) { change := tsdb.FieldChange{ FieldCreate: tsdb.FieldCreate{ Measurement: []byte(measurement), - Field: &tsdb.Field{ID: 0, Name: fieldName, Type: influxql.Float}, + Field: &tsdb.Field{Name: fieldName, Type: influxql.Float}, }, ChangeType: tsdb.AddMeasurementField, } @@ -1749,7 +1749,7 @@ func TestMeasurementFieldSet_Corrupt(t *testing.T) { change := tsdb.FieldChange{ FieldCreate: tsdb.FieldCreate{ Measurement: []byte(measurement), - Field: &tsdb.Field{ID: 0, Name: fieldName, Type: influxql.Float}, + Field: &tsdb.Field{Name: fieldName, Type: influxql.Float}, }, ChangeType: tsdb.AddMeasurementField, } @@ -1820,7 +1820,7 @@ func TestMeasurementFieldSet_CorruptChangeFile(t *testing.T) { change := tsdb.FieldChange{ FieldCreate: tsdb.FieldCreate{ Measurement: []byte(f.Measurement), - Field: &tsdb.Field{ID: 0, Name: f.Field, Type: f.FieldType}, + Field: &tsdb.Field{Name: f.Field, Type: f.FieldType}, }, ChangeType: tsdb.AddMeasurementField, } @@ -1883,7 +1883,7 @@ func TestMeasurementFieldSet_DeleteEmpty(t *testing.T) { change := tsdb.FieldChange{ FieldCreate: tsdb.FieldCreate{ Measurement: []byte(measurement), - Field: &tsdb.Field{ID: 0, Name: fieldName, Type: influxql.Float}, + Field: &tsdb.Field{Name: fieldName, Type: influxql.Float}, }, ChangeType: tsdb.AddMeasurementField, } @@ -2018,7 +2018,7 @@ func testFieldMaker(t *testing.T, wg *sync.WaitGroup, mf *tsdb.MeasurementFieldS change := tsdb.FieldChange{ FieldCreate: tsdb.FieldCreate{ Measurement: []byte(measurement), - Field: &tsdb.Field{ID: 0, Name: fieldName, Type: influxql.Float}, + Field: &tsdb.Field{Name: fieldName, Type: influxql.Float}, }, ChangeType: tsdb.AddMeasurementField, }