Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1290,7 +1290,7 @@ func (e *Engine) addToIndexFromKey(keys [][]byte, fieldTypes []influxql.DataType
keys[i], field = SeriesAndFieldFromCompositeKey(keys[i])
name := models.ParseName(keys[i])
mf := e.fieldset.CreateFieldsIfNotExists(name)
if err := mf.CreateFieldIfNotExists(field, fieldTypes[i]); err != nil {
if _, err := mf.CreateFieldIfNotExists(field, fieldTypes[i]); err != nil {
return err
}

Expand Down
134 changes: 72 additions & 62 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,13 +572,13 @@ func (s *Shard) WritePoints(points []models.Point, tracker StatsTracker) error {
// to the caller, but continue on writing the remaining points.
writeError = err
}
atomic.AddInt64(&s.stats.FieldsCreated, int64(len(fieldsToCreate)))

// add any new fields and keep track of what needs to be saved
if err := s.createFieldsAndMeasurements(fieldsToCreate); err != nil {
if numFieldsCreated, err := s.createFieldsAndMeasurements(fieldsToCreate); err != nil {
return err
} else {
atomic.AddInt64(&s.stats.FieldsCreated, int64(numFieldsCreated))
}

engineTracker := tracker
engineTracker.AddedPoints = func(points, values int64) {
if tracker.AddedPoints != nil {
Expand Down Expand Up @@ -702,56 +702,64 @@ func (s *Shard) validateSeriesAndFields(points []models.Point, tracker StatsTrac
continue
}

name := p.Name()
mf := engine.MeasurementFields(name)

// Check with the field validator.
if err := ValidateFields(mf, p, s.options.Config.SkipFieldSizeValidation); err != nil {
switch err := err.(type) {
case PartialWriteError:
if reason == "" {
reason = err.Reason
cont, err := func(p models.Point, iter models.FieldIterator) (cont bool, err error) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the only time we don't have cont == true is where there is an error, and then the continue is at the bottom of the loop. Is cont really doing anything?

name := p.Name()
mf := engine.MeasurementFields(name)
mf.mu.Lock()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like there is still a potential race condition. We lock the mf while looking up fields here, but then unlock it while we continue to the next point. Another incoming write in a different goroutine could then look up fields in mf before this goroutine can create the new fields. Or am I missing something?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field creation has its own locking and checks yet again for field type conflicts. So either of the go routines may win, and the other will report an error.

So, the race you describe is real, but gets sequenced in field creation.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the MeasurementFields lock to a RWMutex and used an RLock in Shard.validateSeriesAndFields to allow greater parallelism.

The lock has to be taken on a per point basis, because points can differ in which measurement they pertain to, and thus which MeasurementFields object has to be locked.

MeasurementFields.CreateFieldIfNotExists still uses the full lock, of course.

defer mf.mu.Unlock()
// Check with the field validator.
if err := ValidateFields(mf, p, s.options.Config.SkipFieldSizeValidation); err != nil {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ValidateFields knows which fields are not currently in the measurement fields, and this is the only place in the code it is called from. We could avoid iterating over the fields again below looking for unknown fields if ValidateFields collected them for us.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. My question was how wide-ranging the changes should be. This PR is the minimal set of changes I could find that fixed the bug, but you are correct that there is much room for improvement in the code as it is in the product and in this PR. Let's discuss how radical to get.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to replace the atomic.Value storing the fields map with a sync.Map with a generic wrapper for type safety, for instance. Lots of locking goes away if we do that, but it's a big, scary change.

switch err := err.(type) {
case PartialWriteError:
if reason == "" {
reason = err.Reason
}
dropped += err.Dropped
atomic.AddInt64(&s.stats.WritePointsDropped, int64(err.Dropped))
default:
return false, err
}
dropped += err.Dropped
atomic.AddInt64(&s.stats.WritePointsDropped, int64(err.Dropped))
default:
return nil, nil, err
return true, nil
}
continue
}

points[j] = points[i]
j++
points[j] = points[i]
j++

// Create any fields that are missing.
iter.Reset()
for iter.Next() {
fieldKey := iter.FieldKey()
// Create any fields that are missing.
iter.Reset()
for iter.Next() {
fieldKey := iter.FieldKey()

// Skip fields named "time". They are illegal.
if bytes.Equal(fieldKey, timeBytes) {
continue
}
// Skip fields named "time". They are illegal.
if bytes.Equal(fieldKey, timeBytes) {
continue
}

if mf.FieldBytes(fieldKey) != nil {
continue
}
Comment on lines -736 to -738
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the second part is here, where the thread one has now created the field, of (for example) data type float, and thread two reaches this point and continues as the field key now exists, but thread two wants to create a field of type integer.

I believe the other part of the race is that a snapshot has started for the data from thread one, so a new cache accepts the writes for thread two of the new data type, as the cache will accept the value, as it does not have any previous data to validate.

if mf.FieldBytes(fieldKey) != nil {
continue
}

dataType := dataTypeFromModelsFieldType(iter.Type())
if dataType == influxql.Unknown {
continue
}
dataType := dataTypeFromModelsFieldType(iter.Type())
if dataType == influxql.Unknown {
continue
}

fieldsToCreate = append(fieldsToCreate, &FieldCreate{
Measurement: name,
Field: &Field{
Name: string(fieldKey),
Type: dataType,
},
})
fieldsToCreate = append(fieldsToCreate, &FieldCreate{
Measurement: name,
Field: &Field{
Name: string(fieldKey),
Type: dataType,
},
})
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a write has two points that both have a new field (newField), but the two points have different types for newField, they would both end up in fieldsToCreate, correct? Would that create issues later?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old code had that same pattern. The first gets created, the second rejected.

}
return true, nil
}(p, iter)
if cont {
continue
} else if err != nil {
return nil, nil, err
}
}

if dropped > 0 {
err = PartialWriteError{Reason: reason, Dropped: dropped, Database: s.database, RetentionPolicy: s.retentionPolicy}
}
Expand Down Expand Up @@ -781,31 +789,33 @@ func makePrintable(s string) string {
return b.String()
}

func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) error {
func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) (int, error) {
if len(fieldsToCreate) == 0 {
return nil
return 0, nil
}

engine, err := s.engineNoLock()
if err != nil {
return err
return 0, err
}

numCreated := 0
// add fields
changes := make([]*FieldChange, 0, len(fieldsToCreate))
for _, f := range fieldsToCreate {
mf := engine.MeasurementFields(f.Measurement)
if err := mf.CreateFieldIfNotExists([]byte(f.Field.Name), f.Field.Type); err != nil {
return err
if created, err := mf.CreateFieldIfNotExists([]byte(f.Field.Name), f.Field.Type); err != nil {
return 0, err
} else if created {
numCreated++
s.index.SetFieldName(f.Measurement, f.Field.Name)
changes = append(changes, &FieldChange{
FieldCreate: *f,
ChangeType: AddMeasurementField,
})
}
s.index.SetFieldName(f.Measurement, f.Field.Name)
changes = append(changes, &FieldChange{
FieldCreate: *f,
ChangeType: AddMeasurementField,
})
}

return engine.MeasurementFieldSet().Save(changes)
return numCreated, engine.MeasurementFieldSet().Save(changes)
}

// DeleteSeriesRange deletes all values from for seriesKeys between min and max (inclusive)
Expand Down Expand Up @@ -1616,15 +1626,15 @@ func (m *MeasurementFields) bytes() int {
// CreateFieldIfNotExists creates a new field with an autoincrementing ID.
// Returns an error if 255 fields have already been created on the measurement or
// the fields already exists with a different type.
func (m *MeasurementFields) CreateFieldIfNotExists(name []byte, typ influxql.DataType) error {
func (m *MeasurementFields) CreateFieldIfNotExists(name []byte, typ influxql.DataType) (bool, error) {
fields := m.fields.Load().(map[string]*Field)

// Ignore if the field already exists.
if f := fields[string(name)]; f != nil {
if f.Type != typ {
return ErrFieldTypeConflict
return false, ErrFieldTypeConflict
}
return nil
return false, nil
}

m.mu.Lock()
Expand All @@ -1634,9 +1644,9 @@ func (m *MeasurementFields) CreateFieldIfNotExists(name []byte, typ influxql.Dat
// Re-check field and type under write lock.
if f := fields[string(name)]; f != nil {
if f.Type != typ {
return ErrFieldTypeConflict
return false, ErrFieldTypeConflict
}
return nil
return false, nil
}

fieldsUpdate := make(map[string]*Field, len(fields)+1)
Expand All @@ -1652,7 +1662,7 @@ func (m *MeasurementFields) CreateFieldIfNotExists(name []byte, typ influxql.Dat
fieldsUpdate[string(name)] = f
m.fields.Store(fieldsUpdate)

return nil
return true, nil
}

func (m *MeasurementFields) FieldN() int {
Expand Down Expand Up @@ -2325,7 +2335,7 @@ func (fs *MeasurementFieldSet) ApplyChanges() error {
fs.Delete(string(fc.Measurement))
} else {
mf := fs.CreateFieldsIfNotExists(fc.Measurement)
if err := mf.CreateFieldIfNotExists([]byte(fc.Field.Name), fc.Field.Type); err != nil {
if _, err := mf.CreateFieldIfNotExists([]byte(fc.Field.Name), fc.Field.Type); err != nil {
err = fmt.Errorf("failed creating %q.%q: %w", fc.Measurement, fc.Field.Name, err)
log.Error("field creation", zap.Error(err))
return err
Expand Down
Loading