Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions pkg/data/gensyncmap/gensyncmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package gensyncmap

import "sync"

type Map[K comparable, V any] struct {
m sync.Map
}

func (m *Map[K, V]) Delete(key K) {
m.m.Delete(key)
}

func (m *Map[K, V]) Load(key K) (value V, ok bool) {
v, ok := m.m.Load(key)
if !ok {
return value, ok
}
return v.(V), ok
}

func (m *Map[K, V]) LoadAndDelete(key K) (value V, loaded bool) {
v, loaded := m.m.LoadAndDelete(key)
if !loaded {
return value, loaded
}
return v.(V), loaded
}

func (m *Map[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) {
a, loaded := m.m.LoadOrStore(key, value)
return a.(V), loaded
}

func (m *Map[K, V]) Range(f func(key K, value V) bool) {
m.m.Range(func(key, value any) bool { return f(key.(K), value.(V)) })
}

func (m *Map[K, V]) Store(key K, value V) { m.m.Store(key, value) }

func (m *Map[K, V]) Len() int {
var n int
m.m.Range(func(_, _ any) bool {
n++
return true
})
return n
}
108 changes: 34 additions & 74 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/bytesutil"
"github.com/influxdata/influxdb/pkg/data/gensyncmap"
errors2 "github.com/influxdata/influxdb/pkg/errors"
"github.com/influxdata/influxdb/pkg/estimator"
"github.com/influxdata/influxdb/pkg/file"
Expand Down Expand Up @@ -707,8 +708,6 @@ func (s *Shard) validateSeriesAndFields(points []models.Point, tracker StatsTrac
var validateErr error
name := p.Name()
mf := engine.MeasurementFields(name)
mf.mu.RLock()
defer mf.mu.RUnlock()
// Check with the field validator.
if newFields, validateErr = ValidateFields(mf, p, s.options.Config.SkipFieldSizeValidation); validateErr != nil {
var err PartialWriteError
Expand Down Expand Up @@ -1562,129 +1561,93 @@ func (a Shards) ExpandSources(sources influxql.Sources) (influxql.Sources, error

// MeasurementFields holds the fields of a measurement and their codec.
type MeasurementFields struct {
mu sync.RWMutex

fields atomic.Value // map[string]*Field
fields gensyncmap.Map[string, *Field]
}

// NewMeasurementFields returns an initialised *MeasurementFields value.
func NewMeasurementFields() *MeasurementFields {
fields := make(map[string]*Field)
mf := &MeasurementFields{}
mf.fields.Store(fields)
return mf
return &MeasurementFields{fields: gensyncmap.Map[string, *Field]{}}
}

func (m *MeasurementFields) FieldKeys() []string {
fields := m.fields.Load().(map[string]*Field)
a := make([]string, 0, len(fields))
for key := range fields {
a = append(a, key)
}
var a []string
m.fields.Range(func(k string, _ *Field) bool {
a = append(a, k)
return true
})

sort.Strings(a)
return a
}

// bytes estimates the memory footprint of this MeasurementFields, in bytes.
func (m *MeasurementFields) bytes() int {
var b int
b += 24 // mu RWMutex is 24 bytes
fields := m.fields.Load().(map[string]*Field)
b += int(unsafe.Sizeof(fields))
for k, v := range fields {
b += int(unsafe.Sizeof(m.fields))
m.fields.Range(func(k string, v *Field) bool {
b += int(unsafe.Sizeof(k)) + len(k)
b += int(unsafe.Sizeof(v)+unsafe.Sizeof(*v)) + len(v.Name)
}
return true
})
return b
}

// CreateFieldIfNotExists creates a new field with an autoincrementing ID.
// Returns an error if 255 fields have already been created on the measurement or
// the fields already exists with a different type.
// CreateFieldIfNotExists creates a new field with the given name and type.
// Returns an error if the field already exists with a different type.
func (m *MeasurementFields) CreateFieldIfNotExists(name []byte, typ influxql.DataType) (bool, error) {
fields := m.fields.Load().(map[string]*Field)

// Ignore if the field already exists.
if f := fields[string(name)]; f != nil {
if f.Type != typ {
return false, ErrFieldTypeConflict
}
return false, nil
newField := &Field{
Name: string(name),
Type: typ,
}

m.mu.Lock()
defer m.mu.Unlock()

fields = m.fields.Load().(map[string]*Field)
// Re-check field and type under write lock.
if f := fields[string(name)]; f != nil {
if f, loaded := m.fields.LoadOrStore(newField.Name, newField); loaded {
if f.Type != typ {
return false, ErrFieldTypeConflict
}
return false, nil
}

fieldsUpdate := make(map[string]*Field, len(fields)+1)
for k, v := range fields {
fieldsUpdate[k] = v
}
// Create and append a new field.
f := &Field{
ID: uint8(len(fields) + 1),
Name: string(name),
Type: typ,
}
fieldsUpdate[string(name)] = f
m.fields.Store(fieldsUpdate)

return true, nil
}

func (m *MeasurementFields) FieldN() int {
n := len(m.fields.Load().(map[string]*Field))
return n
return m.fields.Len()
}

// Field returns the field for name, or nil if there is no field for name.
func (m *MeasurementFields) Field(name string) *Field {
f := m.fields.Load().(map[string]*Field)[name]
f, _ := m.fields.Load(name)
return f
}

func (m *MeasurementFields) HasField(name string) bool {
if m == nil {
return false
}
f := m.fields.Load().(map[string]*Field)[name]
return f != nil
_, ok := m.fields.Load(name)
return ok
}

// FieldBytes returns the field for name, or nil if there is no field for name.
// FieldBytes should be preferred to Field when the caller has a []byte, because
// it avoids a string allocation, which can't be avoided if the caller converts
// the []byte to a string and calls Field.
func (m *MeasurementFields) FieldBytes(name []byte) *Field {
f := m.fields.Load().(map[string]*Field)[string(name)]
return f
return m.Field(string(name))
}

// FieldSet returns the set of fields and their types for the measurement.
func (m *MeasurementFields) FieldSet() map[string]influxql.DataType {
fields := m.fields.Load().(map[string]*Field)
fieldTypes := make(map[string]influxql.DataType)
for name, f := range fields {
fieldTypes[name] = f.Type
}
m.fields.Range(func(k string, v *Field) bool {
fieldTypes[k] = v.Type
return true
})
return fieldTypes
}

func (m *MeasurementFields) ForEachField(fn func(name string, typ influxql.DataType) bool) {
fields := m.fields.Load().(map[string]*Field)
for name, f := range fields {
if !fn(name, f.Type) {
return
}
}
m.fields.Range(func(k string, v *Field) bool {
return fn(k, v.Type)
})
}

type FieldChanges []*FieldChange
Expand Down Expand Up @@ -2163,12 +2126,11 @@ func (fs *MeasurementFieldSet) load() (rErr error) {
}
fs.fields = make(map[string]*MeasurementFields, len(pb.GetMeasurements()))
for _, measurement := range pb.GetMeasurements() {
fields := make(map[string]*Field, len(measurement.GetFields()))
set := NewMeasurementFields()
for _, field := range measurement.GetFields() {
fields[string(field.GetName())] = &Field{Name: string(field.GetName()), Type: influxql.DataType(field.GetType())}
name := string(field.GetName())
set.fields.Store(name, &Field{Name: name, Type: influxql.DataType(field.GetType())})
}
set := &MeasurementFields{}
set.fields.Store(fields)
fs.fields[string(measurement.GetName())] = set
}
return nil
Expand Down Expand Up @@ -2282,7 +2244,6 @@ func (fscm *measurementFieldSetChangeMgr) loadFieldChangeSet(r io.Reader) (Field
FieldCreate: FieldCreate{
Measurement: fc.Measurement,
Field: &Field{
ID: 0,
Name: string(fc.Field.Name),
Type: influxql.DataType(fc.Field.Type),
},
Expand Down Expand Up @@ -2323,7 +2284,6 @@ func (fs *MeasurementFieldSet) ApplyChanges() error {

// Field represents a series field. All of the fields must be hashable.
type Field struct {
ID uint8 `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Type influxql.DataType `json:"type,omitempty"`
}
Expand Down
10 changes: 5 additions & 5 deletions tsdb/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1697,7 +1697,7 @@ func TestMeasurementFieldSet_SaveLoad(t *testing.T) {
change := tsdb.FieldChange{
FieldCreate: tsdb.FieldCreate{
Measurement: []byte(measurement),
Field: &tsdb.Field{ID: 0, Name: fieldName, Type: influxql.Float},
Field: &tsdb.Field{Name: fieldName, Type: influxql.Float},
},
ChangeType: tsdb.AddMeasurementField,
}
Expand Down Expand Up @@ -1749,7 +1749,7 @@ func TestMeasurementFieldSet_Corrupt(t *testing.T) {
change := tsdb.FieldChange{
FieldCreate: tsdb.FieldCreate{
Measurement: []byte(measurement),
Field: &tsdb.Field{ID: 0, Name: fieldName, Type: influxql.Float},
Field: &tsdb.Field{Name: fieldName, Type: influxql.Float},
},
ChangeType: tsdb.AddMeasurementField,
}
Expand Down Expand Up @@ -1820,7 +1820,7 @@ func TestMeasurementFieldSet_CorruptChangeFile(t *testing.T) {
change := tsdb.FieldChange{
FieldCreate: tsdb.FieldCreate{
Measurement: []byte(f.Measurement),
Field: &tsdb.Field{ID: 0, Name: f.Field, Type: f.FieldType},
Field: &tsdb.Field{Name: f.Field, Type: f.FieldType},
},
ChangeType: tsdb.AddMeasurementField,
}
Expand Down Expand Up @@ -1883,7 +1883,7 @@ func TestMeasurementFieldSet_DeleteEmpty(t *testing.T) {
change := tsdb.FieldChange{
FieldCreate: tsdb.FieldCreate{
Measurement: []byte(measurement),
Field: &tsdb.Field{ID: 0, Name: fieldName, Type: influxql.Float},
Field: &tsdb.Field{Name: fieldName, Type: influxql.Float},
},
ChangeType: tsdb.AddMeasurementField,
}
Expand Down Expand Up @@ -2018,7 +2018,7 @@ func testFieldMaker(t *testing.T, wg *sync.WaitGroup, mf *tsdb.MeasurementFieldS
change := tsdb.FieldChange{
FieldCreate: tsdb.FieldCreate{
Measurement: []byte(measurement),
Field: &tsdb.Field{ID: 0, Name: fieldName, Type: influxql.Float},
Field: &tsdb.Field{Name: fieldName, Type: influxql.Float},
},
ChangeType: tsdb.AddMeasurementField,
}
Expand Down