From d06405992f63ec553f804706e69ec1848bd77efb Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Mon, 15 Nov 2021 11:40:51 -0500 Subject: [PATCH 1/3] kvserver/apply: use a better ctx for cmd.AckSuccess Before this patch, CheckedCommand.AckSuccess() was called with a Raft worker context. That's wasn't great because each command captures a better context to use - one that derives from the proposal's ctx in the case of local proposals. This patch switches to using that by exposing the captured context through the Command interface. Taking advantage of the new ctx, we also log a message now about early acks, as it seems like a notable hint to see in a trace. This patch also cleans up most existing uses of that captured context to use the new interface method; before, various code paths were type asserting the implementation of the Command, and getting the internal context that way. This patch moves the resposibility of deciding what context to use upwards, to callers. Release note: None --- pkg/kv/kvserver/apply/cmd.go | 18 ++++++++++++---- pkg/kv/kvserver/apply/task.go | 6 +++--- pkg/kv/kvserver/apply/task_test.go | 11 +++++----- pkg/kv/kvserver/replica_application_cmd.go | 8 ++++++- .../replica_application_state_machine.go | 21 +++++++++++-------- .../replica_application_state_machine_test.go | 4 ++-- 6 files changed, 44 insertions(+), 24 deletions(-) diff --git a/pkg/kv/kvserver/apply/cmd.go b/pkg/kv/kvserver/apply/cmd.go index f3b161b41d1a..b459b622a729 100644 --- a/pkg/kv/kvserver/apply/cmd.go +++ b/pkg/kv/kvserver/apply/cmd.go @@ -24,6 +24,14 @@ type Command interface { // that were locally proposed typically have a client waiting on a // response, so there is additional urgency to apply them quickly. IsLocal() bool + // Ctx returns the Context in which operations on this Command should be + // performed. + // + // A Command does the unusual thing of capturing a Context because commands + // are generally processed in batches, but different commands might want their + // events going to different places. In particular, commands that have been + // proposed locally get a tracing span tied to the local proposal. + Ctx() context.Context // AckErrAndFinish signals that the application of the command has been // rejected due to the provided error. It also relays this rejection of // the command to its client if it was proposed locally. An error will @@ -167,12 +175,13 @@ func takeWhileCmdIter(iter CommandIterator, pred func(Command) bool) CommandIter // responsible for converting Commands into CheckedCommand. The function // closes the provided iterator. func mapCmdIter( - iter CommandIterator, fn func(Command) (CheckedCommand, error), + iter CommandIterator, fn func(context.Context, Command) (CheckedCommand, error), ) (CheckedCommandIterator, error) { defer iter.Close() ret := iter.NewCheckedList() for iter.Valid() { - checked, err := fn(iter.Cur()) + cur := iter.Cur() + checked, err := fn(cur.Ctx(), cur) if err != nil { ret.Close() return nil, err @@ -188,12 +197,13 @@ func mapCmdIter( // is responsible for converting CheckedCommand into AppliedCommand. The // function closes the provided iterator. func mapCheckedCmdIter( - iter CheckedCommandIterator, fn func(CheckedCommand) (AppliedCommand, error), + iter CheckedCommandIterator, fn func(context.Context, CheckedCommand) (AppliedCommand, error), ) (AppliedCommandIterator, error) { defer iter.Close() ret := iter.NewAppliedList() for iter.Valid() { - applied, err := fn(iter.CurChecked()) + curChecked := iter.CurChecked() + applied, err := fn(curChecked.Ctx(), curChecked) if err != nil { ret.Close() return nil, err diff --git a/pkg/kv/kvserver/apply/task.go b/pkg/kv/kvserver/apply/task.go index 25928b39ee81..1f5a244b5a62 100644 --- a/pkg/kv/kvserver/apply/task.go +++ b/pkg/kv/kvserver/apply/task.go @@ -52,7 +52,7 @@ type StateMachine interface { // an untimely crash. This means that applying these side-effects will // typically update the in-memory representation of the state machine // to the same state that it would be in if the process restarted. - ApplySideEffects(CheckedCommand) (AppliedCommand, error) + ApplySideEffects(context.Context, CheckedCommand) (AppliedCommand, error) } // ErrRemoved can be returned from ApplySideEffects which will stop the task @@ -67,7 +67,7 @@ var ErrRemoved = errors.New("replica removed") type Batch interface { // Stage inserts a Command into the Batch. In doing so, the Command is // checked for rejection and a CheckedCommand is returned. - Stage(Command) (CheckedCommand, error) + Stage(context.Context, Command) (CheckedCommand, error) // ApplyToStateMachine applies the persistent state transitions staged // in the Batch to the StateMachine, atomically. ApplyToStateMachine(context.Context) error @@ -225,7 +225,7 @@ func (t *Task) AckCommittedEntriesBeforeApplication(ctx context.Context, maxInde // want to retry the command instead of returning the error to the client. return forEachCheckedCmdIter(ctx, stagedIter, func(cmd CheckedCommand, ctx context.Context) error { if !cmd.Rejected() && cmd.IsLocal() && cmd.CanAckBeforeApplication() { - return cmd.AckSuccess(ctx) + return cmd.AckSuccess(cmd.Ctx()) } return nil }) diff --git a/pkg/kv/kvserver/apply/task_test.go b/pkg/kv/kvserver/apply/task_test.go index 5ae8bd90097c..bc4a65728056 100644 --- a/pkg/kv/kvserver/apply/task_test.go +++ b/pkg/kv/kvserver/apply/task_test.go @@ -52,9 +52,10 @@ type appliedCmd struct { *checkedCmd } -func (c *cmd) Index() uint64 { return c.index } -func (c *cmd) IsTrivial() bool { return !c.nonTrivial } -func (c *cmd) IsLocal() bool { return !c.nonLocal } +func (c *cmd) Index() uint64 { return c.index } +func (c *cmd) IsTrivial() bool { return !c.nonTrivial } +func (c *cmd) IsLocal() bool { return !c.nonLocal } +func (c *cmd) Ctx() context.Context { return context.Background() } func (c *cmd) AckErrAndFinish(_ context.Context, err error) error { c.acked = true c.finished = true @@ -138,7 +139,7 @@ func (sm *testStateMachine) NewBatch(ephemeral bool) apply.Batch { return &testBatch{sm: sm, ephemeral: ephemeral} } func (sm *testStateMachine) ApplySideEffects( - cmdI apply.CheckedCommand, + _ context.Context, cmdI apply.CheckedCommand, ) (apply.AppliedCommand, error) { cmd := cmdI.(*checkedCmd) sm.appliedSideEffects = append(sm.appliedSideEffects, cmd.index) @@ -160,7 +161,7 @@ type testBatch struct { staged []uint64 } -func (b *testBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error) { +func (b *testBatch) Stage(_ context.Context, cmdI apply.Command) (apply.CheckedCommand, error) { cmd := cmdI.(*cmd) b.staged = append(b.staged, cmd.index) ccmd := checkedCmd{cmd: cmd, rejected: cmd.shouldReject} diff --git a/pkg/kv/kvserver/replica_application_cmd.go b/pkg/kv/kvserver/replica_application_cmd.go index ec16d55477cd..af86c8a89ea9 100644 --- a/pkg/kv/kvserver/replica_application_cmd.go +++ b/pkg/kv/kvserver/replica_application_cmd.go @@ -114,6 +114,11 @@ func (c *replicatedCmd) IsLocal() bool { return c.proposal != nil } +// Ctx implements the apply.Command interface. +func (c *replicatedCmd) Ctx() context.Context { + return c.ctx +} + // AckErrAndFinish implements the apply.Command interface. func (c *replicatedCmd) AckErrAndFinish(ctx context.Context, err error) error { if c.IsLocal() { @@ -143,7 +148,7 @@ func (c *replicatedCmd) CanAckBeforeApplication() bool { } // AckSuccess implements the apply.CheckedCommand interface. -func (c *replicatedCmd) AckSuccess(_ context.Context) error { +func (c *replicatedCmd) AckSuccess(ctx context.Context) error { if !c.IsLocal() { return nil } @@ -158,6 +163,7 @@ func (c *replicatedCmd) AckSuccess(_ context.Context) error { resp.Reply = &reply resp.EncounteredIntents = c.proposal.Local.DetachEncounteredIntents() resp.EndTxns = c.proposal.Local.DetachEndTxns(false /* alwaysOnly */) + log.Event(ctx, "ack-ing replication success to the client; application will continue async w.r.t. the client") c.proposal.signalProposalResult(resp) return nil } diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index a82fa62cbad8..e9d3c2aa5b9c 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -428,9 +428,10 @@ type replicaAppBatch struct { // the batch. This allows the batch to make an accurate determination about // whether to accept or reject the next command that is staged without needing // to actually update the replica state machine in between. -func (b *replicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error) { +func (b *replicaAppBatch) Stage( + ctx context.Context, cmdI apply.Command, +) (apply.CheckedCommand, error) { cmd := cmdI.(*replicatedCmd) - ctx := cmd.ctx if cmd.ent.Index == 0 { return nil, makeNonDeterministicFailure("processRaftCommand requires a non-zero index") } @@ -457,7 +458,7 @@ func (b *replicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error cmd.raftCmd.LogicalOpLog = nil cmd.raftCmd.ClosedTimestamp = nil } else { - if err := b.assertNoCmdClosedTimestampRegression(cmd); err != nil { + if err := b.assertNoCmdClosedTimestampRegression(ctx, cmd); err != nil { return nil, err } if err := b.assertNoWriteBelowClosedTimestamp(cmd); err != nil { @@ -992,7 +993,9 @@ func (b *replicaAppBatch) assertNoWriteBelowClosedTimestamp(cmd *replicatedCmd) // Assert that the closed timestamp carried by the command is not below one from // previous commands. -func (b *replicaAppBatch) assertNoCmdClosedTimestampRegression(cmd *replicatedCmd) error { +func (b *replicaAppBatch) assertNoCmdClosedTimestampRegression( + ctx context.Context, cmd *replicatedCmd, +) error { if !raftClosedTimestampAssertionsEnabled { return nil } @@ -1012,7 +1015,7 @@ func (b *replicaAppBatch) assertNoCmdClosedTimestampRegression(cmd *replicatedCm prevReq.SafeString("") } - logTail, err := b.r.printRaftTail(cmd.ctx, 100 /* maxEntries */, 2000 /* maxCharsPerEntry */) + logTail, err := b.r.printRaftTail(ctx, 100 /* maxEntries */, 2000 /* maxCharsPerEntry */) if err != nil { if logTail != "" { logTail = logTail + "\n; error printing log: " + err.Error() @@ -1043,9 +1046,10 @@ type ephemeralReplicaAppBatch struct { } // Stage implements the apply.Batch interface. -func (mb *ephemeralReplicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error) { +func (mb *ephemeralReplicaAppBatch) Stage( + ctx context.Context, cmdI apply.Command, +) (apply.CheckedCommand, error) { cmd := cmdI.(*replicatedCmd) - ctx := cmd.ctx mb.r.shouldApplyCommand(ctx, cmd, &mb.state) mb.state.LeaseAppliedIndex = cmd.leaseIndex @@ -1071,10 +1075,9 @@ func (mb *ephemeralReplicaAppBatch) Close() { // side effects of commands, such as finalizing splits/merges and informing // raft about applied config changes. func (sm *replicaStateMachine) ApplySideEffects( - cmdI apply.CheckedCommand, + ctx context.Context, cmdI apply.CheckedCommand, ) (apply.AppliedCommand, error) { cmd := cmdI.(*replicatedCmd) - ctx := cmd.ctx // Deal with locking during side-effect handling, which is sometimes // associated with complex commands such as splits and merged. diff --git a/pkg/kv/kvserver/replica_application_state_machine_test.go b/pkg/kv/kvserver/replica_application_state_machine_test.go index 388f75035202..bbe1bbd39a3c 100644 --- a/pkg/kv/kvserver/replica_application_state_machine_test.go +++ b/pkg/kv/kvserver/replica_application_state_machine_test.go @@ -110,7 +110,7 @@ func TestReplicaStateMachineChangeReplicas(t *testing.T) { }, } - checkedCmd, err := b.Stage(cmd) + checkedCmd, err := b.Stage(cmd.ctx, cmd) require.NoError(t, err) require.Equal(t, !add, b.changeRemovesReplica) require.Equal(t, b.state.RaftAppliedIndex, cmd.ent.Index) @@ -129,7 +129,7 @@ func TestReplicaStateMachineChangeReplicas(t *testing.T) { require.NoError(t, err) // Apply the side effects of the command to the StateMachine. - _, err = sm.ApplySideEffects(checkedCmd) + _, err = sm.ApplySideEffects(checkedCmd.Ctx(), checkedCmd) if add { require.NoError(t, err) } else { From f058121f864b8c87158de0bf09a652ca9ca23ffd Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Sun, 14 Nov 2021 16:25:32 -0500 Subject: [PATCH 2/3] kvserver: include Raft application on leaseholder in request trace Before this patch, the decoding and application of a Raft command was not included in the recording of the request that generated the respective command, even in the case where consensus is synchronous with respect to the request (i.e. non-AsyncConsensus requests). This was because, although we plumb tracing information below Raft, the span under which Raft processing was occurring was Fork()ed from the parent span (i.e. the request evaluation's span). The reasons why that was are not good: 1) forking (as opposed to creating a regular child) was introduced in #39425. I'm not sure whether there was a particular reason for this decision. Perhaps there was fear at the time about the child outliving the parent - although generally it doesn't because, in the case of async consensus, we fork a parent which I think will outlive the child: https://github.com/cockroachdb/cockroach/blame/13669f9c9bd92a4c3b0378a558d7735f122c4e72/pkg/kv/kvserver/replica_raft.go#L157 In case of sync consensus requests, it's possible for the Raft application span to outlive the evaluation span in cases when the application part (as opposed to the consensus part) can be done async (see CanAckBeforeApplication). Still, regardless of the exact details of the lifetimes, with time it has become clear that it's appropriate to create a child when it's expected that it will usually not outlive the parent even if, on occasion, it can outlive it. 2) forked spans used to be included in the parent's recording until #59815. This explains why we regressed here - Raft application used to be included in recordings properly. This patch makes the application span be a regular child, and so the raft application span is included in the request trace. Touches #70864. That issue asks for a new tracing feature but I think what motivated it is the application info missing from query traces. This patch is sufficient for that. Release note (general change): Tracing transaction commits now includes details about replication. --- pkg/kv/kvserver/replica_application_decoder.go | 2 +- pkg/kv/kvserver/replica_test.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/kv/kvserver/replica_application_decoder.go b/pkg/kv/kvserver/replica_application_decoder.go index ada38b11da4d..fe7b14b97fb6 100644 --- a/pkg/kv/kvserver/replica_application_decoder.go +++ b/pkg/kv/kvserver/replica_application_decoder.go @@ -139,7 +139,7 @@ func (d *replicaDecoder) createTracingSpans(ctx context.Context) { for it.init(&d.cmdBuf); it.Valid(); it.Next() { cmd := it.cur() if cmd.IsLocal() { - cmd.ctx, cmd.sp = tracing.ForkSpan(cmd.proposal.ctx, opName) + cmd.ctx, cmd.sp = tracing.ChildSpan(cmd.proposal.ctx, opName) } else if cmd.raftCmd.TraceData != nil { // The proposal isn't local, and trace data is available. Extract // the remote span and start a server-side span that follows from it. diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 16696639f69e..f20e3a6ecb81 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -8359,7 +8359,6 @@ func TestFailureToProcessCommandClearsLocalResult(t *testing.T) { r.mu.Unlock() tr := tc.store.cfg.AmbientCtx.Tracer - tr.TestingRecordAsyncSpans() // we assert on async span traces in this test opCtx, getRecAndFinish := tracing.ContextWithRecordingSpan(ctx, tr, "test-recording") defer getRecAndFinish() From f2bc7d91be4cb442cdb1dc8eb4ce72bcc72c181d Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Sun, 14 Nov 2021 17:20:04 -0500 Subject: [PATCH 3/3] kvserver: don't fork Raft application spans on followers Raft application was forking a span on followers, instead of the more usual creation of child spans. This was done probably because the parent span is usually a long-running Raft worker span, and adding children to such long-running spans is generally not a great idea. But these days there are better ways of dealing with long-running tasks - they can have Sterile spans, meaning that the parent declares that it doesn't want any children. And indeed the task in question uses such a Sterile span: https://github.com/cockroachdb/cockroach/blob/2ad2bee257e78970ce2c457ddd6996099ed6727a/pkg/kv/kvserver/scheduler.go#L217 So, the forking is not needed and this patch changes it to a regular child relationship. This has a couple of benefits: - the parent was not always a long-running taks. I believe it can also be a snapshot application, in which case we want the span in question to be a regular child (e.g. because tracing a snapshot application should also include this span). This shows that the decision about what kind if relationship a child should have to its parent is not appropriate to be taken by the child. - less contention on the active spans registry. Forking a span internally creates a "local root" span that needs to be added to the registry directly, instead of being added indirectly through its parent. Release note: None --- pkg/kv/kvserver/replica_application_decoder.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/kv/kvserver/replica_application_decoder.go b/pkg/kv/kvserver/replica_application_decoder.go index fe7b14b97fb6..399efe911a36 100644 --- a/pkg/kv/kvserver/replica_application_decoder.go +++ b/pkg/kv/kvserver/replica_application_decoder.go @@ -159,7 +159,7 @@ func (d *replicaDecoder) createTracingSpans(ctx context.Context) { ) } } else { - cmd.ctx, cmd.sp = tracing.ForkSpan(ctx, opName) + cmd.ctx, cmd.sp = tracing.ChildSpan(ctx, opName) } } }