Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1290,7 +1290,7 @@ func (e *Engine) addToIndexFromKey(keys [][]byte, fieldTypes []influxql.DataType
keys[i], field = SeriesAndFieldFromCompositeKey(keys[i])
name := models.ParseName(keys[i])
mf := e.fieldset.CreateFieldsIfNotExists(name)
if err := mf.CreateFieldIfNotExists(field, fieldTypes[i]); err != nil {
if _, err := mf.CreateFieldIfNotExists(field, fieldTypes[i]); err != nil {
return err
}

Expand Down
35 changes: 20 additions & 15 deletions tsdb/field_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,19 @@ const MaxFieldValueLength = 1048576
// ValidateFields will return a PartialWriteError if:
// - the point has inconsistent fields, or
// - the point has fields that are too long
func ValidateFields(mf *MeasurementFields, point models.Point, skipSizeValidation bool) error {
func ValidateFields(mf *MeasurementFields, point models.Point, skipSizeValidation bool) ([]*FieldCreate, error) {
pointSize := point.StringSize()
iter := point.FieldIterator()
var fieldsToCreate []*FieldCreate

for iter.Next() {
if !skipSizeValidation {
// Check for size of field too large. Note it is much cheaper to check the whole point size
// than checking the StringValue size (StringValue potentially takes an allocation if it must
// unescape the string, and must at least parse the string)
if pointSize > MaxFieldValueLength && iter.Type() == models.String {
if sz := len(iter.StringValue()); sz > MaxFieldValueLength {
return PartialWriteError{
return nil, PartialWriteError{
Reason: fmt.Sprintf(
"input field \"%s\" on measurement \"%s\" is too long, %d > %d",
iter.FieldKey(), point.Name(), sz, MaxFieldValueLength),
Expand All @@ -33,14 +35,9 @@ func ValidateFields(mf *MeasurementFields, point models.Point, skipSizeValidatio
}
}

fieldKey := iter.FieldKey()
// Skip fields name "time", they are illegal.
if bytes.Equal(iter.FieldKey(), timeBytes) {
continue
}

// If the fields is not present, there cannot be a conflict.
f := mf.FieldBytes(iter.FieldKey())
if f == nil {
if bytes.Equal(fieldKey, timeBytes) {
Comment on lines -41 to +40
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is this is the first part of the data race, where to concurrent writes would both return nil here, resulting in this continuing, as "there cannot be a conflict" according to the comment.

continue
}

Expand All @@ -49,18 +46,26 @@ func ValidateFields(mf *MeasurementFields, point models.Point, skipSizeValidatio
continue
}

// If the types are not the same, there is a conflict.
if f.Type != dataType {
return PartialWriteError{
// If the field is not present, remember to create it.
f := mf.FieldBytes(fieldKey)
if f == nil {
fieldsToCreate = append(fieldsToCreate, &FieldCreate{
Measurement: point.Name(),
Field: &Field{
Name: string(fieldKey),
Type: dataType,
}})
} else if f.Type != dataType {
// If the types are not the same, there is a conflict.
return nil, PartialWriteError{
Reason: fmt.Sprintf(
"%s: input field \"%s\" on measurement \"%s\" is type %s, already exists as type %s",
ErrFieldTypeConflict, iter.FieldKey(), point.Name(), dataType, f.Type),
ErrFieldTypeConflict, fieldKey, point.Name(), dataType, f.Type),
Dropped: 1,
}
}
}

return nil
return fieldsToCreate, nil
}

// dataTypeFromModelsFieldType returns the influxql.DataType that corresponds to the
Expand Down
123 changes: 54 additions & 69 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,13 +572,13 @@ func (s *Shard) WritePoints(points []models.Point, tracker StatsTracker) error {
// to the caller, but continue on writing the remaining points.
writeError = err
}
atomic.AddInt64(&s.stats.FieldsCreated, int64(len(fieldsToCreate)))

// add any new fields and keep track of what needs to be saved
if err := s.createFieldsAndMeasurements(fieldsToCreate); err != nil {
if numFieldsCreated, err := s.createFieldsAndMeasurements(fieldsToCreate); err != nil {
return err
} else {
atomic.AddInt64(&s.stats.FieldsCreated, int64(numFieldsCreated))
}

engineTracker := tracker
engineTracker.AddedPoints = func(points, values int64) {
if tracker.AddedPoints != nil {
Expand Down Expand Up @@ -697,61 +697,44 @@ func (s *Shard) validateSeriesAndFields(points []models.Point, tracker StatsTrac
continue
}

// Skip any points whos keys have been dropped. Dropped has already been incremented for them.
// Skip any points whose keys have been dropped. Dropped has already been incremented for them.
if len(droppedKeys) > 0 && bytesutil.Contains(droppedKeys, keys[i]) {
continue
}

name := p.Name()
mf := engine.MeasurementFields(name)

// Check with the field validator.
if err := ValidateFields(mf, p, s.options.Config.SkipFieldSizeValidation); err != nil {
switch err := err.(type) {
case PartialWriteError:
if reason == "" {
reason = err.Reason
err := func(p models.Point, iter models.FieldIterator) error {
var newFields []*FieldCreate
var validateErr error
name := p.Name()
mf := engine.MeasurementFields(name)
mf.mu.RLock()
defer mf.mu.RUnlock()
// Check with the field validator.
if newFields, validateErr = ValidateFields(mf, p, s.options.Config.SkipFieldSizeValidation); validateErr != nil {
var err PartialWriteError
switch {
case errors.As(validateErr, &err):
// This will turn into an error later, outside this lambda
if reason == "" {
reason = err.Reason
}
dropped += err.Dropped
atomic.AddInt64(&s.stats.WritePointsDropped, int64(err.Dropped))
default:
return err
}
dropped += err.Dropped
atomic.AddInt64(&s.stats.WritePointsDropped, int64(err.Dropped))
default:
return nil, nil, err
return nil
}
continue
}

points[j] = points[i]
j++

// Create any fields that are missing.
iter.Reset()
for iter.Next() {
fieldKey := iter.FieldKey()

// Skip fields named "time". They are illegal.
if bytes.Equal(fieldKey, timeBytes) {
continue
}

if mf.FieldBytes(fieldKey) != nil {
continue
}
Comment on lines -736 to -738
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the second part is here, where the thread one has now created the field, of (for example) data type float, and thread two reaches this point and continues as the field key now exists, but thread two wants to create a field of type integer.

I believe the other part of the race is that a snapshot has started for the data from thread one, so a new cache accepts the writes for thread two of the new data type, as the cache will accept the value, as it does not have any previous data to validate.


dataType := dataTypeFromModelsFieldType(iter.Type())
if dataType == influxql.Unknown {
continue
}

fieldsToCreate = append(fieldsToCreate, &FieldCreate{
Measurement: name,
Field: &Field{
Name: string(fieldKey),
Type: dataType,
},
})
points[j] = points[i]
j++
fieldsToCreate = append(fieldsToCreate, newFields...)
return nil
}(p, iter)
if err != nil {
return nil, nil, err
}
}

if dropped > 0 {
err = PartialWriteError{Reason: reason, Dropped: dropped, Database: s.database, RetentionPolicy: s.retentionPolicy}
}
Expand Down Expand Up @@ -781,31 +764,33 @@ func makePrintable(s string) string {
return b.String()
}

func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) error {
func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) (int, error) {
if len(fieldsToCreate) == 0 {
return nil
return 0, nil
}

engine, err := s.engineNoLock()
if err != nil {
return err
return 0, err
}

numCreated := 0
// add fields
changes := make([]*FieldChange, 0, len(fieldsToCreate))
for _, f := range fieldsToCreate {
mf := engine.MeasurementFields(f.Measurement)
if err := mf.CreateFieldIfNotExists([]byte(f.Field.Name), f.Field.Type); err != nil {
return err
if created, err := mf.CreateFieldIfNotExists([]byte(f.Field.Name), f.Field.Type); err != nil {
return 0, err
} else if created {
numCreated++
s.index.SetFieldName(f.Measurement, f.Field.Name)
changes = append(changes, &FieldChange{
FieldCreate: *f,
ChangeType: AddMeasurementField,
})
}
s.index.SetFieldName(f.Measurement, f.Field.Name)
changes = append(changes, &FieldChange{
FieldCreate: *f,
ChangeType: AddMeasurementField,
})
}

return engine.MeasurementFieldSet().Save(changes)
return numCreated, engine.MeasurementFieldSet().Save(changes)
}

// DeleteSeriesRange deletes all values from for seriesKeys between min and max (inclusive)
Expand Down Expand Up @@ -1577,7 +1562,7 @@ func (a Shards) ExpandSources(sources influxql.Sources) (influxql.Sources, error

// MeasurementFields holds the fields of a measurement and their codec.
type MeasurementFields struct {
mu sync.Mutex
mu sync.RWMutex

fields atomic.Value // map[string]*Field
}
Expand Down Expand Up @@ -1616,15 +1601,15 @@ func (m *MeasurementFields) bytes() int {
// CreateFieldIfNotExists creates a new field with an autoincrementing ID.
// Returns an error if 255 fields have already been created on the measurement or
// the fields already exists with a different type.
func (m *MeasurementFields) CreateFieldIfNotExists(name []byte, typ influxql.DataType) error {
func (m *MeasurementFields) CreateFieldIfNotExists(name []byte, typ influxql.DataType) (bool, error) {
fields := m.fields.Load().(map[string]*Field)

// Ignore if the field already exists.
if f := fields[string(name)]; f != nil {
if f.Type != typ {
return ErrFieldTypeConflict
return false, ErrFieldTypeConflict
}
return nil
return false, nil
}

m.mu.Lock()
Expand All @@ -1634,9 +1619,9 @@ func (m *MeasurementFields) CreateFieldIfNotExists(name []byte, typ influxql.Dat
// Re-check field and type under write lock.
if f := fields[string(name)]; f != nil {
if f.Type != typ {
return ErrFieldTypeConflict
return false, ErrFieldTypeConflict
}
return nil
return false, nil
}

fieldsUpdate := make(map[string]*Field, len(fields)+1)
Expand All @@ -1652,7 +1637,7 @@ func (m *MeasurementFields) CreateFieldIfNotExists(name []byte, typ influxql.Dat
fieldsUpdate[string(name)] = f
m.fields.Store(fieldsUpdate)

return nil
return true, nil
}

func (m *MeasurementFields) FieldN() int {
Expand Down Expand Up @@ -2325,7 +2310,7 @@ func (fs *MeasurementFieldSet) ApplyChanges() error {
fs.Delete(string(fc.Measurement))
} else {
mf := fs.CreateFieldsIfNotExists(fc.Measurement)
if err := mf.CreateFieldIfNotExists([]byte(fc.Field.Name), fc.Field.Type); err != nil {
if _, err := mf.CreateFieldIfNotExists([]byte(fc.Field.Name), fc.Field.Type); err != nil {
err = fmt.Errorf("failed creating %q.%q: %w", fc.Measurement, fc.Field.Name, err)
log.Error("field creation", zap.Error(err))
return err
Expand Down
Loading