Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ var (
// Ensure Engine implements the interface.
_ tsdb.Engine = &Engine{}
// Static objects to prevent small allocs.
timeBytes = []byte("time")
keyFieldSeparatorBytes = []byte(keyFieldSeparator)
emptyBytes = []byte{}
)
Expand Down Expand Up @@ -1360,11 +1359,6 @@ func (e *Engine) WritePoints(points []models.Point, tracker tsdb.StatsTracker) e
npoints++
var nValuesForPoint int64
for iter.Next() {
// Skip fields name "time", they are illegal
if bytes.Equal(iter.FieldKey(), timeBytes) {
continue
}

keyBuf = append(keyBuf[:baseLen], iter.FieldKey()...)

if e.seriesTypeMap != nil {
Expand Down
11 changes: 9 additions & 2 deletions tsdb/field_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func ValidateAndCreateFields(mf *MeasurementFields, point models.Point, skipSize
pointSize := point.StringSize()
iter := point.FieldIterator()
var fieldsToCreate []*FieldCreate
var partialWriteError *PartialWriteError

// We return fieldsToCreate even on error, because other writes
// in parallel may depend on these previous fields having been
Expand All @@ -41,7 +42,13 @@ func ValidateAndCreateFields(mf *MeasurementFields, point models.Point, skipSize

fieldKey := iter.FieldKey()
// Skip fields name "time", they are illegal.
if bytes.Equal(fieldKey, timeBytes) {
if bytes.Equal(fieldKey, TimeBytes) {
partialWriteError = &PartialWriteError{
Reason: fmt.Sprintf(
"invalid field name: input field \"%[1]s\" on measurement \"%s\" is invalid. Field \"%[1]s\" has been stripped from point.",
string(fieldKey), string(point.Name())),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably do not need the cast to string; Sprintf can treat byte slices correctly with %s, I believe

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we handle multiple time fields in a single point? Do we need to return multiple partial write errors, or should we short-circuit the creation of the second and subsequent ones like the following?

if bytes.Equal(fieldKey, TimeBytes) && (nil == partialWriteError) {

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have multiple time fields we want to hit the continue line regardless so I think the existing code is fine.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, output when you have multiple time fields.

> use foo
Using database foo
> INSERT test_foo7,tag1=abc,tag2=xyz uuuu=8989i,time=123i,time=99999i
ERR: {"error":"partial write: invalid field name: input field \"time\" on measurement \"test_foo7\" is invalid. Field \"time\" has been stripped from point. dropped=0 for database: foo for retention policy: autogen"}

> select * from test_foo7
name: test_foo7
time                tag1 tag2 uuuu
----                ---- ---- ----
1764948955470709000 abc  xyz  8989

If you only have time fields (drops the entire point as expected):

> INSERT test_foo8,tag1=abc,tag2=xyz time=12345i,time=123i,time=99999i
ERR: {"error":"partial write: invalid field name: input field \"time\" on measurement \"test_foo8\" is invalid dropped=1 for database: foo for retention policy: autogen"}

> select * from test_foo8
>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eliminate the string() casts for the fmt.Sprintf and put a nested if check whether partialWriteError is nil so you only set it once, even for multiple time fields in a single point.

Dropped: 0,
}
continue
}

Expand Down Expand Up @@ -70,7 +77,7 @@ func ValidateAndCreateFields(mf *MeasurementFields, point models.Point, skipSize
fieldsToCreate = append(fieldsToCreate, &FieldCreate{point.Name(), f})
}
}
return fieldsToCreate, nil
return fieldsToCreate, partialWriteError
}

// dataTypeFromModelsFieldType returns the influxql.DataType that corresponds to the
Expand Down
14 changes: 10 additions & 4 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ var (

var (
// Static objects to prevent small allocs.
timeBytes = []byte("time")
TimeBytes = []byte("time")
)

// A ShardError implements the error interface, and contains extra
Expand Down Expand Up @@ -631,7 +631,7 @@ func (s *Shard) validateSeriesAndFields(points []models.Point, tracker StatsTrac
tags := p.Tags()

// Drop any series w/ a "time" tag, these are illegal
if v := tags.Get(timeBytes); v != nil {
if v := tags.Get(TimeBytes); v != nil {
dropped++
if reason == "" {
reason = fmt.Sprintf(
Expand Down Expand Up @@ -689,7 +689,7 @@ func (s *Shard) validateSeriesAndFields(points []models.Point, tracker StatsTrac
iter := p.FieldIterator()
validField := false
for iter.Next() {
if bytes.Equal(iter.FieldKey(), timeBytes) {
if bytes.Equal(iter.FieldKey(), TimeBytes) {
continue
}
validField = true
Expand All @@ -716,13 +716,19 @@ func (s *Shard) validateSeriesAndFields(points []models.Point, tracker StatsTrac
newFields, partialWriteError := ValidateAndCreateFields(mf, p, s.options.Config.SkipFieldSizeValidation)
createdFieldsToSave = append(createdFieldsToSave, newFields...)

if partialWriteError != nil {
if partialWriteError != nil && partialWriteError.Dropped > 0 {
if reason == "" {
reason = partialWriteError.Reason
}
dropped += partialWriteError.Dropped
atomic.AddInt64(&s.stats.WritePointsDropped, int64(partialWriteError.Dropped))
continue
// Sometimes we will drop fields like 'time' but not an entire point
// we want to inform the writer that something occurred.
} else if partialWriteError != nil && partialWriteError.Dropped <= 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the check for Dropped redundant here?

partialWriteError.Database = s.Database()
partialWriteError.RetentionPolicy = s.RetentionPolicy()
err = *partialWriteError
}
points[j] = points[i]
j++
Expand Down
80 changes: 77 additions & 3 deletions tsdb/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,9 +337,8 @@ func TestWriteTimeTag(t *testing.T) {
time.Unix(1, 2),
)

if err := sh.WritePoints([]models.Point{pt}, tsdb.NoopStatsTracker()); err != nil {
t.Fatalf("unexpected error: %v", err)
}
err := sh.WritePoints([]models.Point{pt}, tsdb.NoopStatsTracker())
require.Error(t, err, "expected partial write error")

mf := sh.MeasurementFields([]byte("cpu"))
if mf == nil {
Expand Down Expand Up @@ -435,6 +434,81 @@ func TestShardWriteAddNewField(t *testing.T) {
}
}

func TestShardWriteDropField(t *testing.T) {
tmpDir, _ := os.MkdirTemp("", "shard_test")
defer func(path string) {
err := os.RemoveAll(path)
require.NoError(t, err, "error removing temp dir")
}(tmpDir)

tmpShard := filepath.Join(tmpDir, "shard")
tmpWal := filepath.Join(tmpDir, "wal")

sfile := MustOpenSeriesFile()
defer func(sfile *SeriesFile) {
err := sfile.Close()
require.NoError(t, err, "error closing series file")
}(sfile)

opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
opts.InmemIndex = inmem.NewIndex(filepath.Base(tmpDir), sfile.SeriesFile)

sh := tsdb.NewShard(1, tmpShard, tmpWal, sfile.SeriesFile, opts)
err := sh.Open()
require.NoError(t, err, "error opening shard")

defer func(sh *tsdb.Shard) {
err := sh.Close()
require.NoError(t, err, "error closing shard")
}(sh)

pt := models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": 1.0},
time.Unix(1, 2),
)

err = sh.WritePoints([]models.Point{pt}, tsdb.NoopStatsTracker())
require.NoError(t, err, "error writing point")

pt = models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": 1.0, "value2": 2.0, "time": time.Now().Unix()},
time.Unix(1, 2),
)

err = sh.WritePoints([]models.Point{pt}, tsdb.NoopStatsTracker())
require.Error(t, err, "writing point should error with partial write")
require.ErrorContains(t, err, "partial write: invalid field name: input field \"time\" on measurement \"cpu\" is invalid. Field \"time\" has been stripped from point. dropped=0 for database:")

// Point should not be written and fully dropped due to having a single "time" field
pt = models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"time": time.Now().Unix()},
time.Unix(1, 2),
)

err = sh.WritePoints([]models.Point{pt}, tsdb.NoopStatsTracker())
require.Error(t, err, "writing point should error with partial write")
require.ErrorContains(t, err, "partial write: invalid field name: input field \"time\" on measurement \"cpu\" is invalid dropped=1 for database:")

require.Equal(t, int64(1), sh.SeriesN(), "wrong number of series")
stats := sh.Statistics(nil)
require.GreaterOrEqual(t, len(stats), 1, "wrong number of stats")
values := stats[0].Values
pointsOK := values["writePointsOk"].(int64)

require.Equal(t, int64(2), pointsOK, "should have written 2 points successfully")

mf := sh.MeasurementFields([]byte("cpu"))
require.NotNil(t, mf, "measurement fields should not be nil")
require.Equal(t, 2, mf.FieldN(), "measurement fields should have 2 values")
}

// Tests concurrently writing to the same shard with different field types which
// can trigger a panic when the shard is snapshotted to TSM files.
func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) {
Expand Down