Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,9 @@ func (s *EtcdServer) linearizableReadLoop() {
return
}

// as a single loop is can unlock multiple reads, it is not very useful
// to propagate the trace from Txn or Range.
trace := traceutil.New("linearizableReadLoop", s.getLogger())
nextnr := newNotifier()

s.readMu.Lock()
Expand Down Expand Up @@ -782,16 +785,26 @@ func (s *EtcdServer) linearizableReadLoop() {
if !done {
continue
}
trace.Step("read index received")

if ai := s.getAppliedIndex(); ai < rs.Index {
index := rs.Index
trace.AddField(traceutil.Field{Key: "readStateIndex", Value: index})

ai := s.getAppliedIndex()
trace.AddField(traceutil.Field{Key: "appliedIndex", Value: ai})

if ai < index {
select {
case <-s.applyWait.Wait(rs.Index):
case <-s.applyWait.Wait(index):
case <-s.stopping:
return
}
}
// unblock all l-reads requested at indices before rs.Index
nr.notify(nil)
trace.Step("applied index is now lower than readState.Index")

trace.LogAllStepsIfLong(traceThreshold)
}
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/traceutil/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,13 @@ func (t *Trace) LogIfLong(threshold time.Duration) {
}
}

// LogAllStepsIfLong dumps all logs if the duration is longer than threshold
func (t *Trace) LogAllStepsIfLong(threshold time.Duration) {
if time.Since(t.startTime) > threshold {
t.LogWithStepThreshold(0)
}
}

// LogWithStepThreshold only dumps step whose duration is longer than step threshold
func (t *Trace) LogWithStepThreshold(threshold time.Duration) {
msg, fs := t.logInfo(threshold)
Expand Down