-
Notifications
You must be signed in to change notification settings - Fork 3.7k
chore: refactor field creation for maintainability #26028
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -703,36 +703,31 @@ func (s *Shard) validateSeriesAndFields(points []models.Point, tracker StatsTrac | |
| continue | ||
| } | ||
|
|
||
| err := func(p models.Point, iter models.FieldIterator) error { | ||
| var newFields []*FieldCreate | ||
| var validateErr error | ||
| name := p.Name() | ||
| mf := engine.MeasurementFields(name) | ||
| // Check with the field validator. | ||
| if newFields, validateErr = ValidateFields(mf, p, s.options.Config.SkipFieldSizeValidation); validateErr != nil { | ||
| var err PartialWriteError | ||
| switch { | ||
| case errors.As(validateErr, &err): | ||
| // This will turn into an error later, outside this lambda | ||
| if reason == "" { | ||
| reason = err.Reason | ||
| } | ||
| dropped += err.Dropped | ||
| atomic.AddInt64(&s.stats.WritePointsDropped, int64(err.Dropped)) | ||
| default: | ||
| return err | ||
| var newFields []*FieldCreate | ||
| var validateErr error | ||
| name := p.Name() | ||
| mf := engine.MeasurementFields(name) | ||
| // Check with the field validator. | ||
| if newFields, validateErr = ValidateFields(mf, p, s.options.Config.SkipFieldSizeValidation); validateErr != nil { | ||
| var err PartialWriteError | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know we're now in a mix of original code (before this PR and before #25998), but we are touching this and I now recommend another change for clarity, not correctness: removal of dead/unreachable code.
My recommendation is to change
I especially recommend this change since determining what to return in the default case was already tricky as this pr description notes that it: coda:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I disagree with this, because I can imagine |
||
| switch { | ||
| case errors.As(validateErr, &err): | ||
| // This will turn into an error later, outside this lambda | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No longer a lambda |
||
| if reason == "" { | ||
| reason = err.Reason | ||
| } | ||
| return nil | ||
| dropped += err.Dropped | ||
| atomic.AddInt64(&s.stats.WritePointsDropped, int64(err.Dropped)) | ||
| continue | ||
| default: | ||
| // Return validateErr, because err will be nil here | ||
| return nil, nil, validateErr | ||
| } | ||
|
|
||
| points[j] = points[i] | ||
| j++ | ||
| fieldsToCreate = append(fieldsToCreate, newFields...) | ||
| return nil | ||
| }(p, iter) | ||
| if err != nil { | ||
| return nil, nil, err | ||
| } | ||
|
|
||
| points[j] = points[i] | ||
| j++ | ||
| fieldsToCreate = append(fieldsToCreate, newFields...) | ||
| } | ||
| if dropped > 0 { | ||
| err = PartialWriteError{Reason: reason, Dropped: dropped, Database: s.database, RetentionPolicy: s.retentionPolicy} | ||
|
|
@@ -777,7 +772,7 @@ func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) (int, | |
| changes := make([]*FieldChange, 0, len(fieldsToCreate)) | ||
| for _, f := range fieldsToCreate { | ||
| mf := engine.MeasurementFields(f.Measurement) | ||
| if created, err := mf.CreateFieldIfNotExists([]byte(f.Field.Name), f.Field.Type); err != nil { | ||
| if created, err := mf.CreateFieldIfNotExists(f.Field.Name, f.Field.Type); err != nil { | ||
| return 0, err | ||
| } else if created { | ||
| numCreated++ | ||
|
|
@@ -1594,9 +1589,9 @@ func (m *MeasurementFields) bytes() int { | |
|
|
||
| // CreateFieldIfNotExists creates a new field with the given name and type. | ||
| // Returns an error if the field already exists with a different type. | ||
| func (m *MeasurementFields) CreateFieldIfNotExists(name []byte, typ influxql.DataType) (bool, error) { | ||
| func (m *MeasurementFields) CreateFieldIfNotExists(name string, typ influxql.DataType) (bool, error) { | ||
| newField := &Field{ | ||
| Name: string(name), | ||
| Name: name, | ||
| Type: typ, | ||
| } | ||
| if f, loaded := m.fields.LoadOrStore(newField.Name, newField); loaded { | ||
|
|
@@ -1626,14 +1621,6 @@ func (m *MeasurementFields) HasField(name string) bool { | |
| return ok | ||
| } | ||
|
|
||
| // FieldBytes returns the field for name, or nil if there is no field for name. | ||
| // FieldBytes should be preferred to Field when the caller has a []byte, because | ||
| // it avoids a string allocation, which can't be avoided if the caller converts | ||
| // the []byte to a string and calls Field. | ||
| func (m *MeasurementFields) FieldBytes(name []byte) *Field { | ||
| return m.Field(string(name)) | ||
| } | ||
|
|
||
| // FieldSet returns the set of fields and their types for the measurement. | ||
| func (m *MeasurementFields) FieldSet() map[string]influxql.DataType { | ||
| fieldTypes := make(map[string]influxql.DataType) | ||
|
|
@@ -2271,7 +2258,7 @@ func (fs *MeasurementFieldSet) ApplyChanges() error { | |
| fs.Delete(string(fc.Measurement)) | ||
| } else { | ||
| mf := fs.CreateFieldsIfNotExists(fc.Measurement) | ||
| if _, err := mf.CreateFieldIfNotExists([]byte(fc.Field.Name), fc.Field.Type); err != nil { | ||
| if _, err := mf.CreateFieldIfNotExists(fc.Field.Name, fc.Field.Type); err != nil { | ||
| err = fmt.Errorf("failed creating %q.%q: %w", fc.Measurement, fc.Field.Name, err) | ||
| log.Error("field creation", zap.Error(err)) | ||
| return err | ||
|
|
@@ -2284,8 +2271,8 @@ func (fs *MeasurementFieldSet) ApplyChanges() error { | |
|
|
||
| // Field represents a series field. All of the fields must be hashable. | ||
| type Field struct { | ||
| Name string `json:"name,omitempty"` | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we removing the json tagging? Just a question, not really a change comment.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't JSON-ize this struct anywhere; it gets copied to a protobuf for serialization these days. |
||
| Type influxql.DataType `json:"type,omitempty"` | ||
| Name string | ||
| Type influxql.DataType | ||
| } | ||
|
|
||
| type FieldChange struct { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also now notice (and should have noticed in #25998 instead that the point's field names ("key") is checked against
timeBytestwice.First here in
validateSeriesAndFields:influxdb/tsdb/shard.go
Lines 684 to 690 in cda0ef4
Actually - I think this first check against timeBytes is totally broken. The check is inverted - it's missing anEDIT: see next comment: this is doing some work but its nearly redundant with ValidateFields and a minor update there could delete one of the two checks for illegal!meaning the second check is actually doing the work and this first check is a just a waste.timefields.and then later in
ValidateFields:influxdb/tsdb/field_validator.go
Lines 38 to 42 in 89b4b97
My recommendation is to clean this up to but in a separate pr: It'll be one of the 5 of us who will next be reading this code so perhaps think kindly on our future selves!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not seeing this. The first one, in
validateSeriesAndFieldschecks that there is at least one field not namedtimein the point, otherwise it drops the point.The second check in
ValidateFieldsskips creating any fields namedtime.Am I misreading the code?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aren't those the same thing? aka they cause the same effect?
For these two points, the first
p0will be rejected by what is invalidateSeriesAndFields(all fields =time), but it would also be rejected byValidateFields. For the secondp1, it would not be rejected byvalidateSeriesAndFields(one field not equal to "time"), and then inValidateFieldsboth of thetimefields would be skipped but thefoofield would be marked for creation.If the check in
validateSeriesAndFieldsis removed, the result is the same, onlyfoois marked for creation.BUT I see that the test
TestWriteTimeTagfails because it's looking for a PartialWriteError in thep0case whichValidateFieldsdoesn't return - this is a latent bug in Validatefields. So the rework is more than just a deletion - this is all a suggested change - not important.I admit i misread the intention of the check in
validateSeriesAndFieldsin my initial comment too! its not "wrong" as it is excluding all points where every field is an illegal one (and returning an error), but there's only one illegal field (time) so its redundant now. Perhaps there used to be more illegal ones or the intention of more illegal ones. It's also kind of weird to validate fields both before and insideValidateFields.All in - this is a pretty minor point (pun intended). And hopefully i'm not misreading it! (I'll edit my original comment some)