-
Notifications
You must be signed in to change notification settings - Fork 3.7k
feat: Bubble error up to writer if fields are dropped #26998
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master-1.x
Are you sure you want to change the base?
Conversation
tsdb/shard.go
Outdated
| newFields, partialWriteError := ValidateAndCreateFields(mf, p, s.options.Config.SkipFieldSizeValidation, s.logger) | ||
| createdFieldsToSave = append(createdFieldsToSave, newFields...) | ||
|
|
||
| if partialWriteError != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we would only want to continue if the PartialWriteError.Dropped > 0, otherwise do the point assignment a increment j. Then in the test on 730, check a bool if we had any PartialWriteError, rather than the count of dropped lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Set a bool here partialwrites = true, then use on shard.go:L730 to decide whether to return an error.
tsdb/field_validator.go
Outdated
| fieldKey := iter.FieldKey() | ||
| // Skip fields name "time", they are illegal. | ||
| if bytes.Equal(fieldKey, timeBytes) { | ||
| logger.Warn("invalid field name \"time\" for measurement, dropping \"time\" field during write.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My difficulty with this is that the writer does not get the error. I would prefer creating a PartialWriteError with a Dropped count of zero, and the same error message we give with single field time problems.
In the caller we then, rather than checking for PartialWriteError and continuing if we have one, we check for a PartialWriteError with a Dropped count > 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will be swamped in our log files if we log every time field that occurs; that could be millions of erroneous points filling the logs before the customer catches the problem. We want the error to go back to the writer.
bubble error up to writer.
davidby-influx
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some suggestions. Don't forget to make cherry-pick issues for main-2.x in both related projects.
What should happen with engine.WritePoints check for timeBytes? Is it redundant, or does it plug some gap the other checks leave?
And the check on line 692 of shard.go - redundant, or plugging a gap in other checks?
tsdb/field_validator.go
Outdated
| partialWriteError = &PartialWriteError{ | ||
| Reason: fmt.Sprintf( | ||
| "invalid field name: input field \"%s\" on measurement \"%s\" is invalid. Field \"%s\" has been stripped from point.", | ||
| "time", string(point.Name()), "time"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't use the literal "time" here, use the fieldKey value (use square bracket notation to only pass once to Sprintf). Or maybe rewrite the message to only print the value once.
tsdb/shard.go
Outdated
| // Sometimes we will drop fields like 'time' but not an entire point | ||
| // we want to inform the writer that something occurred. | ||
| } else if partialWriteError != nil && partialWriteError.Dropped <= 0 { | ||
| err = PartialWriteError{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to copy the error here into a new error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No we want to use the existing error since it gets returned
Sorry, misread the question. We don't need to, I was doing that so we could set the DB and RP. But, I can just mutate the error that is already there and assign it to err.
tsdb/field_validator.go
Outdated
|
|
||
| fieldKey := iter.FieldKey() | ||
| // Skip fields name "time", they are illegal. | ||
| if bytes.Equal(fieldKey, timeBytes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's unify the two timeBytes declarations into one tsdb.TimeBytes in shard.go
|
Ticket for backport #27006 |
It does appear this check is not needed. I had removed it and observed the same behavior without it. Taking a look at the code the only place points, fieldsToCreate, err := s.validateSeriesAndFields(points, tracker) // Here is where we validate and strip the field
if err != nil {
if _, ok := err.(PartialWriteError); !ok {
return err
}
// There was a partial write (points dropped), hold onto the error to return
// to the caller, but continue on writing the remaining points.
writeError = err
}
.......
// Write to the engine.
if err := engine.WritePoints(points, engineTracker); err != nil { // Here is where engine.WritePoints gets called
atomic.AddInt64(&s.stats.WritePointsErr, int64(len(points)))
atomic.AddInt64(&s.stats.WriteReqErr, 1)
return fmt.Errorf("engine: %w", err)
}
The check on line 692 is an optimization where if we have a single field using |
davidby-influx
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good changes; two code suggestions, one additional question.
| partialWriteError = &PartialWriteError{ | ||
| Reason: fmt.Sprintf( | ||
| "invalid field name: input field \"%[1]s\" on measurement \"%s\" is invalid. Field \"%[1]s\" has been stripped from point.", | ||
| string(fieldKey), string(point.Name())), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You probably do not need the cast to string; Sprintf can treat byte slices correctly with %s, I believe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we handle multiple time fields in a single point? Do we need to return multiple partial write errors, or should we short-circuit the creation of the second and subsequent ones like the following?
if bytes.Equal(fieldKey, TimeBytes) && (nil == partialWriteError) {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have multiple time fields we want to hit the continue line regardless so I think the existing code is fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, output when you have multiple time fields.
> use foo
Using database foo
> INSERT test_foo7,tag1=abc,tag2=xyz uuuu=8989i,time=123i,time=99999i
ERR: {"error":"partial write: invalid field name: input field \"time\" on measurement \"test_foo7\" is invalid. Field \"time\" has been stripped from point. dropped=0 for database: foo for retention policy: autogen"}
> select * from test_foo7
name: test_foo7
time tag1 tag2 uuuu
---- ---- ---- ----
1764948955470709000 abc xyz 8989
If you only have time fields (drops the entire point as expected):
> INSERT test_foo8,tag1=abc,tag2=xyz time=12345i,time=123i,time=99999i
ERR: {"error":"partial write: invalid field name: input field \"time\" on measurement \"test_foo8\" is invalid dropped=1 for database: foo for retention policy: autogen"}
> select * from test_foo8
>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eliminate the string() casts for the fmt.Sprintf and put a nested if check whether partialWriteError is nil so you only set it once, even for multiple time fields in a single point.
davidby-influx
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few small changes for clarity and memory allocation.
| partialWriteError = &PartialWriteError{ | ||
| Reason: fmt.Sprintf( | ||
| "invalid field name: input field \"%[1]s\" on measurement \"%s\" is invalid. Field \"%[1]s\" has been stripped from point.", | ||
| string(fieldKey), string(point.Name())), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eliminate the string() casts for the fmt.Sprintf and put a nested if check whether partialWriteError is nil so you only set it once, even for multiple time fields in a single point.
| continue | ||
| // Sometimes we will drop fields like 'time' but not an entire point | ||
| // we want to inform the writer that something occurred. | ||
| } else if partialWriteError != nil && partialWriteError.Dropped <= 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the check for Dropped redundant here?
Modify
WritePointsto return an error if a user definedtimefield is inserted. Previously we would silently prune it, this PR will bubble an error up to the writer to inform that a field was dropped but not the entire point.