@@ -14,6 +14,7 @@ import (
1414 "context"
1515
1616 "github.com/cockroachdb/cockroach/pkg/storage/apply"
17+ "github.com/cockroachdb/cockroach/pkg/util/tracing"
1718 "go.etcd.io/etcd/raft/raftpb"
1819)
1920
@@ -52,7 +53,9 @@ func (d *replicaDecoder) DecodeAndBind(ctx context.Context, ents []raftpb.Entry)
5253 if err := d .decode (ctx , ents ); err != nil {
5354 return false , err
5455 }
55- return d .retrieveLocalProposals (ctx ), nil
56+ anyLocal := d .retrieveLocalProposals (ctx )
57+ d .createTracingSpans (ctx )
58+ return anyLocal , nil
5659}
5760
5861// decode decodes the provided entries into the decoder.
@@ -81,23 +84,7 @@ func (d *replicaDecoder) retrieveLocalProposals(ctx context.Context) (anyLocal b
8184 for it .init (& d .cmdBuf ); it .Valid (); it .Next () {
8285 cmd := it .cur ()
8386 cmd .proposal = d .r .mu .proposals [cmd .idKey ]
84- if cmd .IsLocal () && cmd .raftCmd .MaxLeaseIndex != cmd .proposal .command .MaxLeaseIndex {
85- // If this entry does not have the most up-to-date view of the
86- // corresponding proposal's maximum lease index then the proposal
87- // must have been reproposed with a higher lease index. (see
88- // tryReproposeWithNewLeaseIndex). In that case, there's a newer
89- // version of the proposal in the pipeline, so don't consider this
90- // entry to have been proposed locally. The entry must necessarily be
91- // rejected by checkForcedErr.
92- cmd .proposal = nil
93- }
94- if cmd .IsLocal () {
95- // We initiated this command, so use the caller-supplied context.
96- cmd .ctx = cmd .proposal .ctx
97- anyLocal = true
98- } else {
99- cmd .ctx = ctx
100- }
87+ anyLocal = anyLocal || cmd .IsLocal ()
10188 }
10289 if ! anyLocal && d .r .mu .proposalQuota == nil {
10390 // Fast-path.
@@ -106,7 +93,16 @@ func (d *replicaDecoder) retrieveLocalProposals(ctx context.Context) (anyLocal b
10693 for it .init (& d .cmdBuf ); it .Valid (); it .Next () {
10794 cmd := it .cur ()
10895 toRelease := int64 (0 )
109- if cmd .IsLocal () {
96+ shouldRemove := cmd .IsLocal () &&
97+ // If this entry does not have the most up-to-date view of the
98+ // corresponding proposal's maximum lease index then the proposal
99+ // must have been reproposed with a higher lease index. (see
100+ // tryReproposeWithNewLeaseIndex). In that case, there's a newer
101+ // version of the proposal in the pipeline, so don't remove the
102+ // proposal from the map. We expect this entry to be rejected by
103+ // checkForcedErr.
104+ cmd .raftCmd .MaxLeaseIndex == cmd .proposal .command .MaxLeaseIndex
105+ if shouldRemove {
110106 // Delete the proposal from the proposals map. There may be reproposals
111107 // of the proposal in the pipeline, but those will all have the same max
112108 // lease index, meaning that they will all be rejected after this entry
@@ -131,6 +127,21 @@ func (d *replicaDecoder) retrieveLocalProposals(ctx context.Context) (anyLocal b
131127 return anyLocal
132128}
133129
130+ // createTracingSpans creates and assigns a new tracing span for each decoded
131+ // command. If a command was proposed locally, it will be given a tracing span
132+ // that follows from its proposal's span.
133+ func (d * replicaDecoder ) createTracingSpans (ctx context.Context ) {
134+ var it replicatedCmdBufSlice
135+ for it .init (& d .cmdBuf ); it .Valid (); it .Next () {
136+ cmd := it .cur ()
137+ parentCtx := ctx
138+ if cmd .IsLocal () {
139+ parentCtx = cmd .proposal .ctx
140+ }
141+ cmd .ctx , cmd .sp = tracing .ForkCtxSpan (parentCtx , "raft application" )
142+ }
143+ }
144+
134145// NewCommandIter implements the apply.Decoder interface.
135146func (d * replicaDecoder ) NewCommandIter () apply.CommandIterator {
136147 it := d .cmdBuf .newIter ()
0 commit comments