Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1075,11 +1075,13 @@ func (ec *endCmds) move() endCmds {
// the timestamp cache using the final timestamp of each command.
//
// No-op if the receiver has been zeroed out by a call to move.
// Idempotent and is safe to call more than once.
func (ec *endCmds) done(ba *roachpb.BatchRequest, br *roachpb.BatchResponse, pErr *roachpb.Error) {
if ec.repl == nil {
// The endCmds were cleared.
return
}
defer ec.move() // clear

// Update the timestamp cache if the request is not being re-evaluated. Each
// request is considered in turn; only those marked as affecting the cache are
Expand Down
16 changes: 11 additions & 5 deletions pkg/storage/replica_application_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
opentracing "github.com/opentracing/opentracing-go"
"go.etcd.io/etcd/raft/raftpb"
)

Expand Down Expand Up @@ -48,9 +50,13 @@ type replicatedCmd struct {
// proposal is populated on the proposing Replica only and comes from the
// Replica's proposal map.
proposal *ProposalData
// ctx will be the proposal's context if proposed locally, otherwise it will
// be populated with the handleCommittedEntries ctx.

// ctx is a context that follows from the proposal's context if it was
// proposed locally. Otherwise, it will follow from the context passed to
// ApplyCommittedEntries. sp is the corresponding tracing span, which is
// closed in FinishAndAckOutcome.
ctx context.Context
sp opentracing.Span

// The following fields are set in shouldApplyCommand when we validate that
// a command applies given the current lease and GC threshold. The process
Expand Down Expand Up @@ -112,10 +118,10 @@ func (c *replicatedCmd) Rejected() bool {

// FinishAndAckOutcome implements the apply.AppliedCommand interface.
func (c *replicatedCmd) FinishAndAckOutcome() error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's documented that this must only be called once, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is in #39468.

if !c.IsLocal() {
return nil
tracing.FinishSpan(c.sp)
if c.IsLocal() {
c.proposal.finishApplication(c.response)
}
c.proposal.finishApplication(c.response)
return nil
}

Expand Down
49 changes: 30 additions & 19 deletions pkg/storage/replica_application_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/storage/apply"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"go.etcd.io/etcd/raft/raftpb"
)

Expand Down Expand Up @@ -52,7 +53,9 @@ func (d *replicaDecoder) DecodeAndBind(ctx context.Context, ents []raftpb.Entry)
if err := d.decode(ctx, ents); err != nil {
return false, err
}
return d.retrieveLocalProposals(ctx), nil
anyLocal := d.retrieveLocalProposals(ctx)
d.createTracingSpans(ctx)
return anyLocal, nil
}

// decode decodes the provided entries into the decoder.
Expand Down Expand Up @@ -81,23 +84,7 @@ func (d *replicaDecoder) retrieveLocalProposals(ctx context.Context) (anyLocal b
for it.init(&d.cmdBuf); it.Valid(); it.Next() {
cmd := it.cur()
cmd.proposal = d.r.mu.proposals[cmd.idKey]
if cmd.IsLocal() && cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex {
// If this entry does not have the most up-to-date view of the
// corresponding proposal's maximum lease index then the proposal
// must have been reproposed with a higher lease index. (see
// tryReproposeWithNewLeaseIndex). In that case, there's a newer
// version of the proposal in the pipeline, so don't consider this
// entry to have been proposed locally. The entry must necessarily be
// rejected by checkForcedErr.
cmd.proposal = nil
}
if cmd.IsLocal() {
// We initiated this command, so use the caller-supplied context.
cmd.ctx = cmd.proposal.ctx
anyLocal = true
} else {
cmd.ctx = ctx
}
anyLocal = anyLocal || cmd.IsLocal()
}
if !anyLocal && d.r.mu.proposalQuota == nil {
// Fast-path.
Expand All @@ -106,7 +93,16 @@ func (d *replicaDecoder) retrieveLocalProposals(ctx context.Context) (anyLocal b
for it.init(&d.cmdBuf); it.Valid(); it.Next() {
cmd := it.cur()
toRelease := int64(0)
if cmd.IsLocal() {
shouldRemove := cmd.IsLocal() &&
// If this entry does not have the most up-to-date view of the
// corresponding proposal's maximum lease index then the proposal
// must have been reproposed with a higher lease index. (see
// tryReproposeWithNewLeaseIndex). In that case, there's a newer
// version of the proposal in the pipeline, so don't remove the
// proposal from the map. We expect this entry to be rejected by
// checkForcedErr.
cmd.raftCmd.MaxLeaseIndex == cmd.proposal.command.MaxLeaseIndex
if shouldRemove {
// Delete the proposal from the proposals map. There may be reproposals
// of the proposal in the pipeline, but those will all have the same max
// lease index, meaning that they will all be rejected after this entry
Expand All @@ -131,6 +127,21 @@ func (d *replicaDecoder) retrieveLocalProposals(ctx context.Context) (anyLocal b
return anyLocal
}

// createTracingSpans creates and assigns a new tracing span for each decoded
// command. If a command was proposed locally, it will be given a tracing span
// that follows from its proposal's span.
func (d *replicaDecoder) createTracingSpans(ctx context.Context) {
var it replicatedCmdBufSlice
for it.init(&d.cmdBuf); it.Valid(); it.Next() {
cmd := it.cur()
parentCtx := ctx
if cmd.IsLocal() {
parentCtx = cmd.proposal.ctx
}
cmd.ctx, cmd.sp = tracing.ForkCtxSpan(parentCtx, "raft application")
}
}

// NewCommandIter implements the apply.Decoder interface.
func (d *replicaDecoder) NewCommandIter() apply.CommandIterator {
it := d.cmdBuf.newIter()
Expand Down
24 changes: 11 additions & 13 deletions pkg/storage/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,22 +839,20 @@ func (sm *replicaStateMachine) ApplySideEffects(

// Mark the command as applied and return it as an apply.AppliedCommand.
if cmd.IsLocal() {
if !cmd.Rejected() && cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex {
log.Fatalf(ctx, "finishing proposal with outstanding reproposal at a higher max lease index")
}
if cmd.proposal.applied {
// If the command already applied then we shouldn't be "finishing" its
// application again because it should only be able to apply successfully
// once. We expect that when any reproposal for the same command attempts
// to apply it will be rejected by the below raft lease sequence or lease
// index check in checkForcedErr.
if !cmd.Rejected() {
if !cmd.Rejected() {
if cmd.raftCmd.MaxLeaseIndex != cmd.proposal.command.MaxLeaseIndex {
log.Fatalf(ctx, "finishing proposal with outstanding reproposal at a higher max lease index")
}
if cmd.proposal.applied {
// If the command already applied then we shouldn't be "finishing" its
// application again because it should only be able to apply successfully
// once. We expect that when any reproposal for the same command attempts
// to apply it will be rejected by the below raft lease sequence or lease
// index check in checkForcedErr.
log.Fatalf(ctx, "command already applied: %+v; unexpected successful result", cmd)
}
cmd.proposal = nil
} else {
cmd.proposal.applied = true
}
cmd.proposal.applied = true
}
return cmd, nil
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ type ProposalData struct {
// order to allow the original client to be canceled. (When the original client
// is canceled, it won't be listening to this done channel, and so it can't be
// counted on to invoke endCmds itself.)
//
// The method is safe to call more than once, but only the first result will be
// returned to the client.
func (proposal *ProposalData) finishApplication(pr proposalResult) {
proposal.ec.done(proposal.Request, pr.Reply, pr.Err)
proposal.signalProposalResult(pr)
Expand All @@ -139,6 +142,9 @@ func (proposal *ProposalData) finishApplication(pr proposalResult) {
// has not already been signaled. The method can be called even before the
// proposal has finished replication and command application, and does not
// release the request's latches.
//
// The method is safe to call more than once, but only the first result will be
// returned to the client.
func (proposal *ProposalData) signalProposalResult(pr proposalResult) {
if proposal.doneCh != nil {
proposal.doneCh <- pr
Expand Down