diff --git a/pkg/data/gensyncmap/gensyncmap.go b/pkg/data/gensyncmap/gensyncmap.go new file mode 100644 index 00000000000..00b02b68cb5 --- /dev/null +++ b/pkg/data/gensyncmap/gensyncmap.go @@ -0,0 +1,47 @@ +package gensyncmap + +import "sync" + +type Map[K comparable, V any] struct { + m sync.Map +} + +func (m *Map[K, V]) Delete(key K) { + m.m.Delete(key) +} + +func (m *Map[K, V]) Load(key K) (value V, ok bool) { + v, ok := m.m.Load(key) + if !ok { + return value, ok + } + return v.(V), ok +} + +func (m *Map[K, V]) LoadAndDelete(key K) (value V, loaded bool) { + v, loaded := m.m.LoadAndDelete(key) + if !loaded { + return value, loaded + } + return v.(V), loaded +} + +func (m *Map[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) { + a, loaded := m.m.LoadOrStore(key, value) + return a.(V), loaded +} + +func (m *Map[K, V]) Range(f func(key K, value V) bool) { + m.m.Range(func(key, value any) bool { return f(key.(K), value.(V)) }) +} + +func (m *Map[K, V]) Store(key K, value V) { m.m.Store(key, value) } + +func (m *Map[K, V]) Len() int { + var n int + m.m.Range(func(_, _ any) bool { + n++ + return true + }) + return n +} diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 8bf53b4243a..b9d0394770b 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -1264,7 +1264,7 @@ func (e *Engine) addToIndexFromKey(keys [][]byte, fieldTypes []influxql.DataType keys[i], field = SeriesAndFieldFromCompositeKey(keys[i]) name := models.ParseName(keys[i]) mf := e.fieldset.CreateFieldsIfNotExists(name) - if err := mf.CreateFieldIfNotExists(field, fieldTypes[i]); err != nil { + if _, err := mf.CreateFieldIfNotExists(field, fieldTypes[i]); err != nil { return err } diff --git a/tsdb/field_validator.go b/tsdb/field_validator.go index 5a978dd8fd6..09a06557b01 100644 --- a/tsdb/field_validator.go +++ b/tsdb/field_validator.go @@ -13,9 +13,11 @@ const MaxFieldValueLength = 1048576 // ValidateFields will return a PartialWriteError if: // - the point has inconsistent fields, or // - the point has fields that are too long -func ValidateFields(mf *MeasurementFields, point models.Point, skipSizeValidation bool) error { +func ValidateFields(mf *MeasurementFields, point models.Point, skipSizeValidation bool) ([]*FieldCreate, error) { pointSize := point.StringSize() iter := point.FieldIterator() + var fieldsToCreate []*FieldCreate + for iter.Next() { if !skipSizeValidation { // Check for size of field too large. Note it is much cheaper to check the whole point size @@ -23,7 +25,7 @@ func ValidateFields(mf *MeasurementFields, point models.Point, skipSizeValidatio // unescape the string, and must at least parse the string) if pointSize > MaxFieldValueLength && iter.Type() == models.String { if sz := len(iter.StringValue()); sz > MaxFieldValueLength { - return PartialWriteError{ + return nil, PartialWriteError{ Reason: fmt.Sprintf( "input field \"%s\" on measurement \"%s\" is too long, %d > %d", iter.FieldKey(), point.Name(), sz, MaxFieldValueLength), @@ -33,14 +35,9 @@ func ValidateFields(mf *MeasurementFields, point models.Point, skipSizeValidatio } } + fieldKey := iter.FieldKey() // Skip fields name "time", they are illegal. - if bytes.Equal(iter.FieldKey(), timeBytes) { - continue - } - - // If the fields is not present, there cannot be a conflict. - f := mf.FieldBytes(iter.FieldKey()) - if f == nil { + if bytes.Equal(fieldKey, timeBytes) { continue } @@ -49,18 +46,26 @@ func ValidateFields(mf *MeasurementFields, point models.Point, skipSizeValidatio continue } - // If the types are not the same, there is a conflict. - if f.Type != dataType { - return PartialWriteError{ + // If the field is not present, remember to create it. + f := mf.FieldBytes(fieldKey) + if f == nil { + fieldsToCreate = append(fieldsToCreate, &FieldCreate{ + Measurement: point.Name(), + Field: &Field{ + Name: string(fieldKey), + Type: dataType, + }}) + } else if f.Type != dataType { + // If the types are not the same, there is a conflict. + return nil, PartialWriteError{ Reason: fmt.Sprintf( "%s: input field \"%s\" on measurement \"%s\" is type %s, already exists as type %s", - ErrFieldTypeConflict, iter.FieldKey(), point.Name(), dataType, f.Type), + ErrFieldTypeConflict, fieldKey, point.Name(), dataType, f.Type), Dropped: 1, } } } - - return nil + return fieldsToCreate, nil } // dataTypeFromModelsFieldType returns the influxql.DataType that corresponds to the diff --git a/tsdb/shard.go b/tsdb/shard.go index 3e3de2caba2..c16ec21f655 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -14,7 +14,6 @@ import ( "sort" "strings" "sync" - "sync/atomic" "time" "unicode" "unsafe" @@ -23,6 +22,7 @@ import ( "github.com/influxdata/influxdb/v2/logger" "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/pkg/bytesutil" + "github.com/influxdata/influxdb/v2/pkg/data/gensyncmap" errors2 "github.com/influxdata/influxdb/v2/pkg/errors" "github.com/influxdata/influxdb/v2/pkg/estimator" "github.com/influxdata/influxdb/v2/pkg/file" @@ -704,16 +704,17 @@ func (s *Shard) WritePoints(ctx context.Context, points []models.Point) (rErr er // to the caller, but continue on writing the remaining points. writeError = err } - s.stats.fieldsCreated.Add(float64(len(fieldsToCreate))) // add any new fields and keep track of what needs to be saved - if err := s.createFieldsAndMeasurements(fieldsToCreate); err != nil { + if numFieldsCreated, err := s.createFieldsAndMeasurements(fieldsToCreate); err != nil { return err + } else { + s.stats.fieldsCreated.Add(float64(numFieldsCreated)) } // Write to the engine. if err := engine.WritePoints(ctx, points); err != nil { - return fmt.Errorf("engine: %s", err) + return fmt.Errorf("engine: %w", err) } return writeError @@ -815,61 +816,42 @@ func (s *Shard) validateSeriesAndFields(points []models.Point) ([]models.Point, continue } - // Skip any points whos keys have been dropped. Dropped has already been incremented for them. + // Skip any points whose keys have been dropped. Dropped has already been incremented for them. if len(droppedKeys) > 0 && bytesutil.Contains(droppedKeys, keys[i]) { continue } - name := p.Name() - mf := engine.MeasurementFields(name) - - // Check with the field validator. - if err := ValidateFields(mf, p, s.options.Config.SkipFieldSizeValidation); err != nil { - switch err := err.(type) { - case PartialWriteError: - if reason == "" { - reason = err.Reason + err := func(p models.Point, iter models.FieldIterator) error { + var newFields []*FieldCreate + var validateErr error + name := p.Name() + mf := engine.MeasurementFields(name) + // Check with the field validator. + if newFields, validateErr = ValidateFields(mf, p, s.options.Config.SkipFieldSizeValidation); validateErr != nil { + var err PartialWriteError + switch { + case errors.As(validateErr, &err): + // This will turn into an error later, outside this lambda + if reason == "" { + reason = err.Reason + } + dropped += err.Dropped + s.stats.writesDropped.Add(float64(err.Dropped)) + default: + return err } - dropped += err.Dropped - s.stats.writesDropped.Add(float64(err.Dropped)) - default: - return nil, nil, err - } - continue - } - - points[j] = points[i] - j++ - - // Create any fields that are missing. - iter.Reset() - for iter.Next() { - fieldKey := iter.FieldKey() - - // Skip fields named "time". They are illegal. - if bytes.Equal(fieldKey, timeBytes) { - continue - } - - if mf.FieldBytes(fieldKey) != nil { - continue - } - - dataType := dataTypeFromModelsFieldType(iter.Type()) - if dataType == influxql.Unknown { - continue + return nil } - fieldsToCreate = append(fieldsToCreate, &FieldCreate{ - Measurement: name, - Field: &Field{ - Name: string(fieldKey), - Type: dataType, - }, - }) + points[j] = points[i] + j++ + fieldsToCreate = append(fieldsToCreate, newFields...) + return nil + }(p, iter) + if err != nil { + return nil, nil, err } } - if dropped > 0 { err = PartialWriteError{Reason: reason, Dropped: dropped} } @@ -899,30 +881,32 @@ func makePrintable(s string) string { return b.String() } -func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) error { +func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) (int, error) { if len(fieldsToCreate) == 0 { - return nil + return 0, nil } engine, err := s.engineNoLock() if err != nil { - return err + return 0, err } - + numCreated := 0 // add fields changes := make([]*FieldChange, 0, len(fieldsToCreate)) for _, f := range fieldsToCreate { mf := engine.MeasurementFields(f.Measurement) - if err := mf.CreateFieldIfNotExists([]byte(f.Field.Name), f.Field.Type); err != nil { - return err + if created, err := mf.CreateFieldIfNotExists([]byte(f.Field.Name), f.Field.Type); err != nil { + return 0, err + } else if created { + numCreated++ + changes = append(changes, &FieldChange{ + FieldCreate: *f, + ChangeType: AddMeasurementField, + }) } - changes = append(changes, &FieldChange{ - FieldCreate: *f, - ChangeType: AddMeasurementField, - }) } - return engine.MeasurementFieldSet().Save(changes) + return numCreated, engine.MeasurementFieldSet().Save(changes) } // DeleteSeriesRange deletes all values from for seriesKeys between min and max (inclusive) @@ -1837,25 +1821,21 @@ func (a Shards) ExpandSources(sources influxql.Sources) (influxql.Sources, error // MeasurementFields holds the fields of a measurement and their codec. type MeasurementFields struct { - mu sync.Mutex - - fields atomic.Value // map[string]*Field + fields gensyncmap.Map[string, *Field] } // NewMeasurementFields returns an initialised *MeasurementFields value. func NewMeasurementFields() *MeasurementFields { - fields := make(map[string]*Field) - mf := &MeasurementFields{} - mf.fields.Store(fields) - return mf + return &MeasurementFields{fields: gensyncmap.Map[string, *Field]{}} } func (m *MeasurementFields) FieldKeys() []string { - fields := m.fields.Load().(map[string]*Field) - a := make([]string, 0, len(fields)) - for key := range fields { - a = append(a, key) - } + var a []string + m.fields.Range(func(k string, _ *Field) bool { + a = append(a, k) + return true + }) + sort.Strings(a) return a } @@ -1863,66 +1843,38 @@ func (m *MeasurementFields) FieldKeys() []string { // bytes estimates the memory footprint of this MeasurementFields, in bytes. func (m *MeasurementFields) bytes() int { var b int - b += 24 // mu RWMutex is 24 bytes - fields := m.fields.Load().(map[string]*Field) - b += int(unsafe.Sizeof(fields)) - for k, v := range fields { + b += int(unsafe.Sizeof(m.fields)) + m.fields.Range(func(k string, v *Field) bool { b += int(unsafe.Sizeof(k)) + len(k) b += int(unsafe.Sizeof(v)+unsafe.Sizeof(*v)) + len(v.Name) - } + return true + }) return b } -// CreateFieldIfNotExists creates a new field with an autoincrementing ID. -// Returns an error if 255 fields have already been created on the measurement or -// the fields already exists with a different type. -func (m *MeasurementFields) CreateFieldIfNotExists(name []byte, typ influxql.DataType) error { - fields := m.fields.Load().(map[string]*Field) - - // Ignore if the field already exists. - if f := fields[string(name)]; f != nil { - if f.Type != typ { - return ErrFieldTypeConflict - } - return nil +// CreateFieldIfNotExists creates a new field with the given name and type. +// Returns an error if the field already exists with a different type. +func (m *MeasurementFields) CreateFieldIfNotExists(name []byte, typ influxql.DataType) (bool, error) { + newField := &Field{ + Name: string(name), + Type: typ, } - - m.mu.Lock() - defer m.mu.Unlock() - - fields = m.fields.Load().(map[string]*Field) - // Re-check field and type under write lock. - if f := fields[string(name)]; f != nil { + if f, loaded := m.fields.LoadOrStore(newField.Name, newField); loaded { if f.Type != typ { - return ErrFieldTypeConflict + return false, ErrFieldTypeConflict } - return nil - } - - fieldsUpdate := make(map[string]*Field, len(fields)+1) - for k, v := range fields { - fieldsUpdate[k] = v - } - // Create and append a new field. - f := &Field{ - ID: uint8(len(fields) + 1), - Name: string(name), - Type: typ, + return false, nil } - fieldsUpdate[string(name)] = f - m.fields.Store(fieldsUpdate) - - return nil + return true, nil } func (m *MeasurementFields) FieldN() int { - n := len(m.fields.Load().(map[string]*Field)) - return n + return m.fields.Len() } // Field returns the field for name, or nil if there is no field for name. func (m *MeasurementFields) Field(name string) *Field { - f := m.fields.Load().(map[string]*Field)[name] + f, _ := m.fields.Load(name) return f } @@ -1930,8 +1882,8 @@ func (m *MeasurementFields) HasField(name string) bool { if m == nil { return false } - f := m.fields.Load().(map[string]*Field)[name] - return f != nil + _, ok := m.fields.Load(name) + return ok } // FieldBytes returns the field for name, or nil if there is no field for name. @@ -1939,27 +1891,23 @@ func (m *MeasurementFields) HasField(name string) bool { // it avoids a string allocation, which can't be avoided if the caller converts // the []byte to a string and calls Field. func (m *MeasurementFields) FieldBytes(name []byte) *Field { - f := m.fields.Load().(map[string]*Field)[string(name)] - return f + return m.Field(string(name)) } // FieldSet returns the set of fields and their types for the measurement. func (m *MeasurementFields) FieldSet() map[string]influxql.DataType { - fields := m.fields.Load().(map[string]*Field) fieldTypes := make(map[string]influxql.DataType) - for name, f := range fields { - fieldTypes[name] = f.Type - } + m.fields.Range(func(k string, v *Field) bool { + fieldTypes[k] = v.Type + return true + }) return fieldTypes } func (m *MeasurementFields) ForEachField(fn func(name string, typ influxql.DataType) bool) { - fields := m.fields.Load().(map[string]*Field) - for name, f := range fields { - if !fn(name, f.Type) { - return - } - } + m.fields.Range(func(k string, v *Field) bool { + return fn(k, v.Type) + }) } type FieldChanges []*FieldChange @@ -2438,12 +2386,11 @@ func (fs *MeasurementFieldSet) load() (rErr error) { } fs.fields = make(map[string]*MeasurementFields, len(pb.GetMeasurements())) for _, measurement := range pb.GetMeasurements() { - fields := make(map[string]*Field, len(measurement.GetFields())) + set := NewMeasurementFields() for _, field := range measurement.GetFields() { - fields[string(field.GetName())] = &Field{Name: string(field.GetName()), Type: influxql.DataType(field.GetType())} + name := string(field.GetName()) + set.fields.Store(name, &Field{Name: name, Type: influxql.DataType(field.GetType())}) } - set := &MeasurementFields{} - set.fields.Store(fields) fs.fields[string(measurement.GetName())] = set } return nil @@ -2557,7 +2504,6 @@ func (fscm *measurementFieldSetChangeMgr) loadFieldChangeSet(r io.Reader) (Field FieldCreate: FieldCreate{ Measurement: fc.Measurement, Field: &Field{ - ID: 0, Name: string(fc.Field.Name), Type: influxql.DataType(fc.Field.Type), }, @@ -2585,7 +2531,7 @@ func (fs *MeasurementFieldSet) ApplyChanges() error { fs.Delete(string(fc.Measurement)) } else { mf := fs.CreateFieldsIfNotExists(fc.Measurement) - if err := mf.CreateFieldIfNotExists([]byte(fc.Field.Name), fc.Field.Type); err != nil { + if _, err := mf.CreateFieldIfNotExists([]byte(fc.Field.Name), fc.Field.Type); err != nil { err = fmt.Errorf("failed creating %q.%q: %w", fc.Measurement, fc.Field.Name, err) log.Error("field creation", zap.Error(err)) return err @@ -2598,7 +2544,6 @@ func (fs *MeasurementFieldSet) ApplyChanges() error { // Field represents a series field. All of the fields must be hashable. type Field struct { - ID uint8 `json:"id,omitempty"` Name string `json:"name,omitempty"` Type influxql.DataType `json:"type,omitempty"` } diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index 0567c545d4d..9dc6cda1a8d 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -7,6 +7,7 @@ import ( "fmt" "math" "os" + "path" "path/filepath" "reflect" "regexp" @@ -14,12 +15,10 @@ import ( "sort" "strings" "sync" + "sync/atomic" "testing" "time" - assert2 "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/davecgh/go-spew/spew" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" @@ -30,8 +29,11 @@ import ( "github.com/influxdata/influxdb/v2/pkg/testing/assert" "github.com/influxdata/influxdb/v2/tsdb" _ "github.com/influxdata/influxdb/v2/tsdb/engine" + "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" _ "github.com/influxdata/influxdb/v2/tsdb/index" "github.com/influxdata/influxql" + assert2 "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestShardWriteAndIndex(t *testing.T) { @@ -1602,13 +1604,13 @@ func TestMeasurementFieldSet_SaveLoad(t *testing.T) { } defer checkMeasurementFieldSetClose(t, mf) fields := mf.CreateFieldsIfNotExists([]byte(measurement)) - if err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil { + if _, err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil { t.Fatalf("create field error: %v", err) } change := tsdb.FieldChange{ FieldCreate: tsdb.FieldCreate{ Measurement: []byte(measurement), - Field: &tsdb.Field{ID: 0, Name: fieldName, Type: influxql.Float}, + Field: &tsdb.Field{Name: fieldName, Type: influxql.Float}, }, ChangeType: tsdb.AddMeasurementField, } @@ -1653,13 +1655,13 @@ func TestMeasurementFieldSet_Corrupt(t *testing.T) { measurement := []byte("cpu") fields := mf.CreateFieldsIfNotExists(measurement) fieldName := "value" - if err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil { + if _, err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil { t.Fatalf("create field error: %v", err) } change := tsdb.FieldChange{ FieldCreate: tsdb.FieldCreate{ Measurement: []byte(measurement), - Field: &tsdb.Field{ID: 0, Name: fieldName, Type: influxql.Float}, + Field: &tsdb.Field{Name: fieldName, Type: influxql.Float}, }, ChangeType: tsdb.AddMeasurementField, } @@ -1723,13 +1725,13 @@ func TestMeasurementFieldSet_CorruptChangeFile(t *testing.T) { defer checkMeasurementFieldSetClose(t, mf) for _, f := range testFields { fields := mf.CreateFieldsIfNotExists([]byte(f.Measurement)) - if err := fields.CreateFieldIfNotExists([]byte(f.Field), f.FieldType); err != nil { + if _, err := fields.CreateFieldIfNotExists([]byte(f.Field), f.FieldType); err != nil { t.Fatalf("create field error: %v", err) } change := tsdb.FieldChange{ FieldCreate: tsdb.FieldCreate{ Measurement: []byte(f.Measurement), - Field: &tsdb.Field{ID: 0, Name: f.Field, Type: f.FieldType}, + Field: &tsdb.Field{Name: f.Field, Type: f.FieldType}, }, ChangeType: tsdb.AddMeasurementField, } @@ -1784,14 +1786,14 @@ func TestMeasurementFieldSet_DeleteEmpty(t *testing.T) { defer checkMeasurementFieldSetClose(t, mf) fields := mf.CreateFieldsIfNotExists([]byte(measurement)) - if err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil { + if _, err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil { t.Fatalf("create field error: %v", err) } change := tsdb.FieldChange{ FieldCreate: tsdb.FieldCreate{ Measurement: []byte(measurement), - Field: &tsdb.Field{ID: 0, Name: fieldName, Type: influxql.Float}, + Field: &tsdb.Field{Name: fieldName, Type: influxql.Float}, }, ChangeType: tsdb.AddMeasurementField, } @@ -1933,14 +1935,15 @@ func testFieldMaker(t *testing.T, wg *sync.WaitGroup, mf *tsdb.MeasurementFieldS fields := mf.CreateFieldsIfNotExists([]byte(measurement)) for _, fieldName := range fieldNames { - if err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil { - t.Errorf("create field error: %v", err) + if _, err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil { + t.Logf("create field error: %v", err) + t.Fail() return } change := tsdb.FieldChange{ FieldCreate: tsdb.FieldCreate{ Measurement: []byte(measurement), - Field: &tsdb.Field{ID: 0, Name: fieldName, Type: influxql.Float}, + Field: &tsdb.Field{Name: fieldName, Type: influxql.Float}, }, ChangeType: tsdb.AddMeasurementField, } @@ -2582,3 +2585,160 @@ func (a seriesIDSets) ForEach(f func(ids *tsdb.SeriesIDSet)) error { } return nil } + +// Tests concurrently writing to the same shard with different field types which +// can trigger a panic when the shard is snapshotted to TSM files. +func TestShard_WritePoints_ForceFieldConflictConcurrent(t *testing.T) { + const Runs = 50 + if testing.Short() || runtime.GOOS == "windows" { + t.Skip("Skipping on short or windows") + } + for i := 0; i < Runs; i++ { + conflictShard(t, i) + } +} + +func conflictShard(t *testing.T, run int) { + const measurement = "cpu" + const field = "value" + const numTypes = 4 // float, int, bool, string + const pointCopies = 10 + const trialsPerShard = 10 + + tmpDir, _ := os.MkdirTemp("", "shard_test") + defer func() { + require.NoError(t, os.RemoveAll(tmpDir), "removing %s", tmpDir) + }() + tmpShard := filepath.Join(tmpDir, "shard") + tmpWal := filepath.Join(tmpDir, "wal") + + sfile := MustOpenSeriesFile(t) + defer func() { + require.NoError(t, sfile.Close(), "closing series file") + require.NoError(t, os.RemoveAll(sfile.Path()), "removing series file %s", sfile.Path()) + }() + + opts := tsdb.NewEngineOptions() + opts.Config.WALDir = tmpWal + opts.SeriesIDSets = seriesIDSets([]*tsdb.SeriesIDSet{}) + sh := tsdb.NewShard(1, tmpShard, tmpWal, sfile.SeriesFile, opts) + require.NoError(t, sh.Open(context.Background()), "opening shard: %s", sh.Path()) + defer func() { + require.NoError(t, sh.Close(), "closing shard %s", tmpShard) + }() + var wg sync.WaitGroup + mu := sync.RWMutex{} + maxConcurrency := atomic.Int64{} + + currentTime := time.Now() + + points := make([]models.Point, 0, pointCopies*numTypes) + + for i := 0; i < pointCopies; i++ { + points = append(points, models.MustNewPoint( + measurement, + models.NewTags(map[string]string{"host": "server"}), + map[string]interface{}{field: 1.0}, + currentTime.Add(time.Duration(i)*time.Second), + )) + points = append(points, models.MustNewPoint( + measurement, + models.NewTags(map[string]string{"host": "server"}), + map[string]interface{}{field: int64(1)}, + currentTime.Add(time.Duration(i)*time.Second), + )) + points = append(points, models.MustNewPoint( + measurement, + models.NewTags(map[string]string{"host": "server"}), + map[string]interface{}{field: "one"}, + currentTime.Add(time.Duration(i)*time.Second), + )) + points = append(points, models.MustNewPoint( + measurement, + models.NewTags(map[string]string{"host": "server"}), + map[string]interface{}{field: true}, + currentTime.Add(time.Duration(i)*time.Second), + )) + } + concurrency := atomic.Int64{} + + for i := 0; i < trialsPerShard; i++ { + mu.Lock() + wg.Add(len(points)) + // Write points concurrently + for i := 0; i < pointCopies; i++ { + for j := 0; j < numTypes; j++ { + concurrency.Add(1) + go func(mp models.Point) { + mu.RLock() + defer concurrency.Add(-1) + defer mu.RUnlock() + defer wg.Done() + if err := sh.WritePoints(context.Background(), []models.Point{mp}); err == nil { + fs, err := mp.Fields() + require.NoError(t, err, "getting fields") + require.Equal(t, + sh.MeasurementFields([]byte(measurement)).Field(field).Type, + influxql.InspectDataType(fs[field]), + "field types mismatch on run %d: types exp: %s, got: %s", run+1, sh.MeasurementFields([]byte(measurement)).Field(field).Type.String(), influxql.InspectDataType(fs[field]).String()) + } else { + require.ErrorContains(t, err, tsdb.ErrFieldTypeConflict.Error(), "unexpected error") + } + if c := concurrency.Load(); maxConcurrency.Load() < c { + maxConcurrency.Store(c) + } + }(points[i*numTypes+j]) + } + } + mu.Unlock() + wg.Wait() + dir, err := sh.CreateSnapshot(false) + require.NoError(t, err, "creating snapshot: %s", sh.Path()) + require.NoError(t, os.RemoveAll(dir), "removing snapshot directory %s", dir) + } + keyType := map[string]byte{} + files, err := os.ReadDir(tmpShard) + require.NoError(t, err, "reading shard directory %s", tmpShard) + for i, file := range files { + if !strings.HasSuffix(path.Ext(file.Name()), tsm1.TSMFileExtension) { + continue + } + ffile := path.Join(tmpShard, file.Name()) + fh, err := os.Open(ffile) + require.NoError(t, err, "opening snapshot file %s", ffile) + tr, err := tsm1.NewTSMReader(fh) + require.NoError(t, err, "creating TSM reader for %s", ffile) + key, typ := tr.KeyAt(0) + if oldTyp, ok := keyType[string(key)]; ok { + require.Equal(t, oldTyp, typ, + "field type mismatch in run %d TSM file %d -- %q in %s\nfirst seen: %s, newest: %s, field type: %s", + run+1, + i+1, + string(key), + ffile, + blockTypeString(oldTyp), + blockTypeString(typ), + sh.MeasurementFields([]byte(measurement)).Field(field).Type.String()) + } else { + keyType[string(key)] = typ + } + // Must close after all uses of key (mapped memory) + require.NoError(t, tr.Close(), "closing TSM reader") + } + // t.Logf("Type %s wins run %d with concurrency: %d", sh.MeasurementFields([]byte(measurement)).Field(field).Type.String(), run+1, maxConcurrency.Load()) +} + +func blockTypeString(typ byte) string { + switch typ { + case tsm1.BlockFloat64: + return "float64" + case tsm1.BlockInteger: + return "int64" + case tsm1.BlockBoolean: + return "bool" + case tsm1.BlockString: + return "string" + default: + return "unknown" + } +}