Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1290,7 +1290,7 @@ func (e *Engine) addToIndexFromKey(keys [][]byte, fieldTypes []influxql.DataType
keys[i], field = SeriesAndFieldFromCompositeKey(keys[i])
name := models.ParseName(keys[i])
mf := e.fieldset.CreateFieldsIfNotExists(name)
if _, err := mf.CreateFieldIfNotExists(field, fieldTypes[i]); err != nil {
if _, err := mf.CreateFieldIfNotExists(string(field), fieldTypes[i]); err != nil {
return err
}

Expand Down
32 changes: 16 additions & 16 deletions tsdb/engine/tsm1/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestEngine_DeleteSeriesAfterCacheSnapshot(t *testing.T) {
t.Fatalf("failed to write points: %s", err.Error())
}

e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}), notrack)
e.CreateSeriesIfNotExists([]byte("cpu,host=B"), []byte("cpu"), models.NewTags(map[string]string{"host": "B"}), notrack)

Expand Down Expand Up @@ -771,7 +771,7 @@ func TestEngine_CreateIterator_Cache_Ascending(t *testing.T) {
e := MustOpenEngine(index)
defer e.Close()

e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}), notrack)

if err := e.WritePointsString(
Expand Down Expand Up @@ -828,7 +828,7 @@ func TestEngine_CreateIterator_Cache_Descending(t *testing.T) {
e := MustOpenEngine(index)
defer e.Close()

e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}), notrack)

if err := e.WritePointsString(
Expand Down Expand Up @@ -884,7 +884,7 @@ func TestEngine_CreateIterator_TSM_Ascending(t *testing.T) {
e := MustOpenEngine(index)
defer e.Close()

e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}), notrack)

if err := e.WritePointsString(
Expand Down Expand Up @@ -942,7 +942,7 @@ func TestEngine_CreateIterator_TSM_Descending(t *testing.T) {
e := MustOpenEngine(index)
defer e.Close()

e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}), notrack)

if err := e.WritePointsString(
Expand Down Expand Up @@ -1000,8 +1000,8 @@ func TestEngine_CreateIterator_Aux(t *testing.T) {
e := MustOpenEngine(index)
defer e.Close()

e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("F"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("F", influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}), notrack)

if err := e.WritePointsString(
Expand Down Expand Up @@ -1060,9 +1060,9 @@ func TestEngine_CreateIterator_Condition(t *testing.T) {
e := MustOpenEngine(index)
defer e.Close()

e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("X"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("Y"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("X", influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("Y", influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}), notrack)
e.SetFieldName([]byte("cpu"), "X")
e.SetFieldName([]byte("cpu"), "Y")
Expand Down Expand Up @@ -1936,7 +1936,7 @@ func TestEngine_CreateCursor_Ascending(t *testing.T) {
e := MustOpenEngine(index)
defer e.Close()

e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}), notrack)

if err := e.WritePointsString(
Expand Down Expand Up @@ -1996,7 +1996,7 @@ func TestEngine_CreateCursor_Descending(t *testing.T) {
e := MustOpenEngine(index)
defer e.Close()

e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}), notrack)

if err := e.WritePointsString(
Expand Down Expand Up @@ -2056,7 +2056,7 @@ func TestEngine_CreateIterator_SeriesKey(t *testing.T) {
e := MustOpenEngine(index)
defer e.Close()

e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A,region=east"), []byte("cpu"), models.NewTags(map[string]string{"host": "A", "region": "east"}), notrack)
e.CreateSeriesIfNotExists([]byte("cpu,host=B,region=east"), []byte("cpu"), models.NewTags(map[string]string{"host": "B", "region": "east"}), notrack)
e.CreateSeriesIfNotExists([]byte("cpu,host=C,region=east"), []byte("cpu"), models.NewTags(map[string]string{"host": "C", "region": "east"}), notrack)
Expand Down Expand Up @@ -2364,7 +2364,7 @@ func BenchmarkEngine_WritePoints(b *testing.B) {
for _, sz := range batchSizes {
for _, index := range tsdb.RegisteredIndexes() {
e := MustOpenEngine(index)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float)
pp := make([]models.Point, 0, sz)
for i := 0; i < sz; i++ {
p := MustParsePointString(fmt.Sprintf("cpu,host=%d value=1.2", i))
Expand All @@ -2390,7 +2390,7 @@ func BenchmarkEngine_WritePoints_Parallel(b *testing.B) {
for _, sz := range batchSizes {
for _, index := range tsdb.RegisteredIndexes() {
e := MustOpenEngine(index)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float)

cpus := runtime.GOMAXPROCS(0)
pp := make([]models.Point, 0, sz*cpus)
Expand Down Expand Up @@ -2623,7 +2623,7 @@ func MustInitDefaultBenchmarkEngine(name string, pointN int) *benchmarkEngine {
e := MustOpenEngine(tsdb.DefaultIndex)

// Initialize metadata.
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists([]byte("value"), influxql.Float)
e.MeasurementFields([]byte("cpu")).CreateFieldIfNotExists("value", influxql.Float)
e.CreateSeriesIfNotExists([]byte("cpu,host=A"), []byte("cpu"), models.NewTags(map[string]string{"host": "A"}), notrack)

// Generate time ascending points with jitterred time & value.
Expand Down
7 changes: 4 additions & 3 deletions tsdb/field_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,21 @@ func ValidateFields(mf *MeasurementFields, point models.Point, skipSizeValidatio
}

// If the field is not present, remember to create it.
f := mf.FieldBytes(fieldKey)
fieldName := string(fieldKey)
f := mf.Field(fieldName)
if f == nil {
fieldsToCreate = append(fieldsToCreate, &FieldCreate{
Measurement: point.Name(),
Field: &Field{
Name: string(fieldKey),
Name: fieldName,
Type: dataType,
}})
} else if f.Type != dataType {
// If the types are not the same, there is a conflict.
return nil, PartialWriteError{
Reason: fmt.Sprintf(
"%s: input field \"%s\" on measurement \"%s\" is type %s, already exists as type %s",
ErrFieldTypeConflict, fieldKey, point.Name(), dataType, f.Type),
ErrFieldTypeConflict, fieldName, point.Name(), dataType, f.Type),
Dropped: 1,
}
}
Expand Down
69 changes: 28 additions & 41 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,36 +703,31 @@ func (s *Shard) validateSeriesAndFields(points []models.Point, tracker StatsTrac
continue
}

err := func(p models.Point, iter models.FieldIterator) error {
var newFields []*FieldCreate
var validateErr error
name := p.Name()
mf := engine.MeasurementFields(name)
// Check with the field validator.
if newFields, validateErr = ValidateFields(mf, p, s.options.Config.SkipFieldSizeValidation); validateErr != nil {
Copy link
Copy Markdown
Contributor

@philjb philjb Feb 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also now notice (and should have noticed in #25998 instead that the point's field names ("key") is checked against timeBytes twice.

First here in validateSeriesAndFields:

influxdb/tsdb/shard.go

Lines 684 to 690 in cda0ef4

for iter.Next() {
if bytes.Equal(iter.FieldKey(), timeBytes) {
continue
}
validField = true
break
}

Actually - I think this first check against timeBytes is totally broken. The check is inverted - it's missing an ! meaning the second check is actually doing the work and this first check is a just a waste. EDIT: see next comment: this is doing some work but its nearly redundant with ValidateFields and a minor update there could delete one of the two checks for illegal time fields.

and then later in ValidateFields:

fieldKey := iter.FieldKey()
// Skip fields name "time", they are illegal.
if bytes.Equal(fieldKey, timeBytes) {
continue
}

My recommendation is to clean this up to but in a separate pr: It'll be one of the 5 of us who will next be reading this code so perhaps think kindly on our future selves!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not seeing this. The first one, in validateSeriesAndFields checks that there is at least one field not named time in the point, otherwise it drops the point.

The second check in ValidateFields skips creating any fields named time.

Am I misreading the code?

Copy link
Copy Markdown
Contributor

@philjb philjb Feb 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't those the same thing? aka they cause the same effect?

p0=Point{fields:["time"]} 
p1=Point{fields:["time", "time", "foo"} 

For these two points, the first p0 will be rejected by what is in validateSeriesAndFields (all fields = time), but it would also be rejected by ValidateFields. For the second p1, it would not be rejected by validateSeriesAndFields (one field not equal to "time"), and then in ValidateFields both of the time fields would be skipped but the foo field would be marked for creation.

If the check in validateSeriesAndFields is removed, the result is the same, only foo is marked for creation.

BUT I see that the test TestWriteTimeTag fails because it's looking for a PartialWriteError in the p0 case which ValidateFields doesn't return - this is a latent bug in Validatefields. So the rework is more than just a deletion - this is all a suggested change - not important.

I admit i misread the intention of the check in validateSeriesAndFields in my initial comment too! its not "wrong" as it is excluding all points where every field is an illegal one (and returning an error), but there's only one illegal field (time) so its redundant now. Perhaps there used to be more illegal ones or the intention of more illegal ones. It's also kind of weird to validate fields both before and inside ValidateFields.

All in - this is a pretty minor point (pun intended). And hopefully i'm not misreading it! (I'll edit my original comment some)

var err PartialWriteError
switch {
case errors.As(validateErr, &err):
// This will turn into an error later, outside this lambda
if reason == "" {
reason = err.Reason
}
dropped += err.Dropped
atomic.AddInt64(&s.stats.WritePointsDropped, int64(err.Dropped))
default:
return err
var newFields []*FieldCreate
var validateErr error
name := p.Name()
mf := engine.MeasurementFields(name)
// Check with the field validator.
if newFields, validateErr = ValidateFields(mf, p, s.options.Config.SkipFieldSizeValidation); validateErr != nil {
var err PartialWriteError
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we're now in a mix of original code (before this PR and before #25998), but we are touching this and I now recommend another change for clarity, not correctness: removal of dead/unreachable code.

ValidateFields() only returns a PartialWriteError at this point so the switch { case errors.As(validateErr, &err) ... is more than is needed as the default: case is unreachable code and therefore complicates the logic flow.

My recommendation is to change ValidateFields to this signature (eliding args) so it returns the specific error it can only return:

func ValidateFields(...) ([]*FieldCreate, *PartialWriteError), update it internally for &PartialWriteError and then update here to

var newFields []*FieldCreate
		var validateErr *PartialWriteError
		name := p.Name()
		mf := engine.MeasurementFields(name)
		// Check with the field validator.
		if newFields, validateErr = ValidateFields(mf, p, s.options.Config.SkipFieldSizeValidation); validateErr != nil {
			if reason == "" {
				// record the first reason only
				reason = validateErr.Reason
			}
			dropped += validateErr.Dropped
			atomic.AddInt64(&s.stats.WritePointsDropped, int64(validateErr.Dropped))
			continue
		}

I especially recommend this change since determining what to return in the default case was already tricky as this pr description notes that it: Also fixes one bug in returning the wrong error.

coda:
I know this is slightly against the usual golang "return error interface in the ultimate return position" but in this case, for a helper method, I think it's clarifying instead of confusing: aka I find the switch with default more confusing than the momentary thought about why this helper doesn't return an error interface.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree with this, because I can imagine ValidateFields returning other errors in the future. There may be validations that have to touch some other resource, which we could fail to obtain, for instance, which would cause more significant errors, so I would prefer to leave this as an error return type and switch on the type.

switch {
case errors.As(validateErr, &err):
// This will turn into an error later, outside this lambda
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No longer a lambda

if reason == "" {
reason = err.Reason
}
return nil
dropped += err.Dropped
atomic.AddInt64(&s.stats.WritePointsDropped, int64(err.Dropped))
continue
default:
// Return validateErr, because err will be nil here
return nil, nil, validateErr
}

points[j] = points[i]
j++
fieldsToCreate = append(fieldsToCreate, newFields...)
return nil
}(p, iter)
if err != nil {
return nil, nil, err
}

points[j] = points[i]
j++
fieldsToCreate = append(fieldsToCreate, newFields...)
}
if dropped > 0 {
err = PartialWriteError{Reason: reason, Dropped: dropped, Database: s.database, RetentionPolicy: s.retentionPolicy}
Expand Down Expand Up @@ -777,7 +772,7 @@ func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*FieldCreate) (int,
changes := make([]*FieldChange, 0, len(fieldsToCreate))
for _, f := range fieldsToCreate {
mf := engine.MeasurementFields(f.Measurement)
if created, err := mf.CreateFieldIfNotExists([]byte(f.Field.Name), f.Field.Type); err != nil {
if created, err := mf.CreateFieldIfNotExists(f.Field.Name, f.Field.Type); err != nil {
return 0, err
} else if created {
numCreated++
Expand Down Expand Up @@ -1594,9 +1589,9 @@ func (m *MeasurementFields) bytes() int {

// CreateFieldIfNotExists creates a new field with the given name and type.
// Returns an error if the field already exists with a different type.
func (m *MeasurementFields) CreateFieldIfNotExists(name []byte, typ influxql.DataType) (bool, error) {
func (m *MeasurementFields) CreateFieldIfNotExists(name string, typ influxql.DataType) (bool, error) {
newField := &Field{
Name: string(name),
Name: name,
Type: typ,
}
if f, loaded := m.fields.LoadOrStore(newField.Name, newField); loaded {
Expand Down Expand Up @@ -1626,14 +1621,6 @@ func (m *MeasurementFields) HasField(name string) bool {
return ok
}

// FieldBytes returns the field for name, or nil if there is no field for name.
// FieldBytes should be preferred to Field when the caller has a []byte, because
// it avoids a string allocation, which can't be avoided if the caller converts
// the []byte to a string and calls Field.
func (m *MeasurementFields) FieldBytes(name []byte) *Field {
return m.Field(string(name))
}

// FieldSet returns the set of fields and their types for the measurement.
func (m *MeasurementFields) FieldSet() map[string]influxql.DataType {
fieldTypes := make(map[string]influxql.DataType)
Expand Down Expand Up @@ -2271,7 +2258,7 @@ func (fs *MeasurementFieldSet) ApplyChanges() error {
fs.Delete(string(fc.Measurement))
} else {
mf := fs.CreateFieldsIfNotExists(fc.Measurement)
if _, err := mf.CreateFieldIfNotExists([]byte(fc.Field.Name), fc.Field.Type); err != nil {
if _, err := mf.CreateFieldIfNotExists(fc.Field.Name, fc.Field.Type); err != nil {
err = fmt.Errorf("failed creating %q.%q: %w", fc.Measurement, fc.Field.Name, err)
log.Error("field creation", zap.Error(err))
return err
Expand All @@ -2284,8 +2271,8 @@ func (fs *MeasurementFieldSet) ApplyChanges() error {

// Field represents a series field. All of the fields must be hashable.
type Field struct {
Name string `json:"name,omitempty"`
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we removing the json tagging? Just a question, not really a change comment.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't JSON-ize this struct anywhere; it gets copied to a protobuf for serialization these days.

Type influxql.DataType `json:"type,omitempty"`
Name string
Type influxql.DataType
}

type FieldChange struct {
Expand Down
10 changes: 5 additions & 5 deletions tsdb/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1691,7 +1691,7 @@ func TestMeasurementFieldSet_SaveLoad(t *testing.T) {
}
defer checkMeasurementFieldSetClose(t, mf)
fields := mf.CreateFieldsIfNotExists([]byte(measurement))
if _, err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil {
if _, err := fields.CreateFieldIfNotExists(fieldName, influxql.Float); err != nil {
t.Fatalf("create field error: %v", err)
}
change := tsdb.FieldChange{
Expand Down Expand Up @@ -1743,7 +1743,7 @@ func TestMeasurementFieldSet_Corrupt(t *testing.T) {
measurement := []byte("cpu")
fields := mf.CreateFieldsIfNotExists(measurement)
fieldName := "value"
if _, err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil {
if _, err := fields.CreateFieldIfNotExists(fieldName, influxql.Float); err != nil {
t.Fatalf("create field error: %v", err)
}
change := tsdb.FieldChange{
Expand Down Expand Up @@ -1814,7 +1814,7 @@ func TestMeasurementFieldSet_CorruptChangeFile(t *testing.T) {
defer checkMeasurementFieldSetClose(t, mf)
for _, f := range testFields {
fields := mf.CreateFieldsIfNotExists([]byte(f.Measurement))
if _, err := fields.CreateFieldIfNotExists([]byte(f.Field), f.FieldType); err != nil {
if _, err := fields.CreateFieldIfNotExists(f.Field, f.FieldType); err != nil {
t.Fatalf("create field error: %v", err)
}
change := tsdb.FieldChange{
Expand Down Expand Up @@ -1876,7 +1876,7 @@ func TestMeasurementFieldSet_DeleteEmpty(t *testing.T) {
defer checkMeasurementFieldSetClose(t, mf)

fields := mf.CreateFieldsIfNotExists([]byte(measurement))
if _, err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil {
if _, err := fields.CreateFieldIfNotExists(fieldName, influxql.Float); err != nil {
t.Fatalf("create field error: %v", err)
}

Expand Down Expand Up @@ -2009,7 +2009,7 @@ func testFieldMaker(t *testing.T, wg *sync.WaitGroup, mf *tsdb.MeasurementFieldS
fields := mf.CreateFieldsIfNotExists([]byte(measurement))

for _, fieldName := range fieldNames {
if _, err := fields.CreateFieldIfNotExists([]byte(fieldName), influxql.Float); err != nil {
if _, err := fields.CreateFieldIfNotExists(fieldName, influxql.Float); err != nil {
t.Logf("create field error: %v", err)
t.Fail()
return
Expand Down