Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
Expand Down Expand Up @@ -1342,9 +1343,14 @@ func (ds *DistSender) sendPartialBatchAsync(
batchIdx int,
responseCh chan response,
) bool {
if err := ds.rpcContext.Stopper.RunLimitedAsyncTask(
ctx, "kv.DistSender: sending partial batch",
ds.asyncSenderSem, false, /* wait */
if err := ds.rpcContext.Stopper.RunAsyncTaskEx(
ctx,
stop.TaskOpts{
TaskName: "kv.DistSender: sending partial batch",
ChildSpan: true,
Sem: ds.asyncSenderSem,
WaitForSem: false,
},
func(ctx context.Context) {
ds.metrics.AsyncSentCount.Inc(1)
responseCh <- ds.sendPartialBatch(
Expand Down
32 changes: 18 additions & 14 deletions pkg/kv/kvserver/intentresolver/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,13 +419,15 @@ func (ir *IntentResolver) runAsyncTask(
if ir.testingKnobs.DisableAsyncIntentResolution {
return errors.New("intents not processed as async resolution is disabled")
}
err := ir.stopper.RunLimitedAsyncTask(
err := ir.stopper.RunAsyncTaskEx(
// If we've successfully launched a background task, dissociate
// this work from our caller's context and timeout.
ir.ambientCtx.AnnotateCtx(context.Background()),
"storage.IntentResolver: processing intents",
ir.sem,
false, /* wait */
stop.TaskOpts{
TaskName: "storage.IntentResolver: processing intents",
Sem: ir.sem,
WaitForSem: false,
},
taskFn,
)
if err != nil {
Expand Down Expand Up @@ -619,20 +621,22 @@ func (ir *IntentResolver) CleanupTxnIntentsOnGCAsync(
now hlc.Timestamp,
onComplete func(pushed, succeeded bool),
) error {
return ir.stopper.RunLimitedAsyncTask(
return ir.stopper.RunAsyncTaskEx(
// If we've successfully launched a background task,
// dissociate this work from our caller's context and
// timeout.
ir.ambientCtx.AnnotateCtx(context.Background()),
"processing txn intents",
ir.sem,
// We really do not want to hang up the GC queue on this kind of
// processing, so it's better to just skip txns which we can't
// pass to the async processor (wait=false). Their intents will
// get cleaned up on demand, and we'll eventually get back to
// them. Not much harm in having old txn records lying around in
// the meantime.
false, /* wait */
stop.TaskOpts{
TaskName: "processing txn intents",
Sem: ir.sem,
// We really do not want to hang up the GC queue on this kind of
// processing, so it's better to just skip txns which we can't
// pass to the async processor (wait=false). Their intents will
// get cleaned up on demand, and we'll eventually get back to
// them. Not much harm in having old txn records lying around in
// the meantime.
WaitForSem: false,
},
func(ctx context.Context) {
var pushed, succeeded bool
defer func() {
Expand Down
7 changes: 6 additions & 1 deletion pkg/kv/kvserver/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,12 @@ func (bq *baseQueue) Async(
log.InfofDepth(ctx, 2, "%s", log.Safe(opName))
}
opName += " (" + bq.name + ")"
if err := bq.store.stopper.RunLimitedAsyncTask(context.Background(), opName, bq.addOrMaybeAddSem, wait,
if err := bq.store.stopper.RunAsyncTaskEx(context.Background(),
stop.TaskOpts{
TaskName: opName,
Sem: bq.addOrMaybeAddSem,
WaitForSem: wait,
},
func(ctx context.Context) {
fn(ctx, baseQueueHelper{bq})
}); err != nil && bq.addLogN.ShouldLog() {
Expand Down
11 changes: 8 additions & 3 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1051,7 +1051,7 @@ func (s *Store) SetDraining(drain bool, reporter func(int, redact.SafeString)) {
newStoreReplicaVisitor(s).Visit(func(r *Replica) bool {
//
// We need to be careful about the case where the ctx has been canceled
// prior to the call to (*Stopper).RunLimitedAsyncTask(). In that case,
// prior to the call to (*Stopper).RunAsyncTaskEx(). In that case,
// the goroutine is not even spawned. However, we don't want to
// mis-count the missing goroutine as the lack of transfer attempted.
// So what we do here is immediately increase numTransfersAttempted
Expand All @@ -1060,8 +1060,13 @@ func (s *Store) SetDraining(drain bool, reporter func(int, redact.SafeString)) {
// not raft leader).
atomic.AddInt32(&numTransfersAttempted, 1)
wg.Add(1)
if err := s.stopper.RunLimitedAsyncTask(
r.AnnotateCtx(ctx), "storage.Store: draining replica", sem, true, /* wait */
if err := s.stopper.RunAsyncTaskEx(
r.AnnotateCtx(ctx),
stop.TaskOpts{
TaskName: "storage.Store: draining replica",
Sem: sem,
WaitForSem: true,
},
func(ctx context.Context) {
defer wg.Done()

Expand Down
66 changes: 35 additions & 31 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
Expand Down Expand Up @@ -780,39 +781,42 @@ func (txn *Txn) rollback(ctx context.Context) *roachpb.Error {
}
}

// We don't have a client whose context we can attach to, but we do want to
// limit how long this request is going to be around for to avoid leaking a
// goroutine (in case of a long-lived network partition). If it gets through
// Raft, and the intent resolver has free async task capacity, the actual
// cleanup will be independent of this context.
// We want to limit how long this request is going to be around for to avoid
// leaking a goroutine (in case of a long-lived network partition). If it gets
// through Raft, and the intent resolver has free async task capacity, the
// actual cleanup will be independent of this context.
stopper := txn.db.ctx.Stopper
ctx, cancel := stopper.WithCancelOnQuiesce(txn.db.AnnotateCtx(context.Background()))
if err := stopper.RunAsyncTask(ctx, "async-rollback", func(ctx context.Context) {
defer cancel()
// A batch with only endTxnReq is not subject to admission control, in
// order to reduce contention by releasing locks. In multi-tenant
// settings, it will be subject to admission control, and the zero
// CreateTime will give it preference within the tenant.
var ba roachpb.BatchRequest
ba.Add(endTxnReq(false /* commit */, nil /* deadline */, false /* systemConfigTrigger */))
_ = contextutil.RunWithTimeout(ctx, "async txn rollback", asyncRollbackTimeout,
func(ctx context.Context) error {
if _, pErr := txn.Send(ctx, ba); pErr != nil {
if statusErr, ok := pErr.GetDetail().(*roachpb.TransactionStatusError); ok &&
statusErr.Reason == roachpb.TransactionStatusError_REASON_TXN_COMMITTED {
// A common cause of these async rollbacks failing is when they're
// triggered by a ctx canceled while a commit is in-flight (and it's too
// late for it to be canceled), and so the rollback finds the txn to be
// already committed. We don't spam the logs with those.
log.VEventf(ctx, 2, "async rollback failed: %s", pErr)
} else {
log.Infof(ctx, "async rollback failed: %s", pErr)
if err := stopper.RunAsyncTaskEx(ctx,
stop.TaskOpts{
TaskName: "aync-rollback",
DontInheritCancellation: true,
CancelOnQuiesce: true,
ChildSpan: false,
},
func(ctx context.Context) {
// A batch with only endTxnReq is not subject to admission control, in
// order to reduce contention by releasing locks. In multi-tenant
// settings, it will be subject to admission control, and the zero
// CreateTime will give it preference within the tenant.
var ba roachpb.BatchRequest
ba.Add(endTxnReq(false /* commit */, nil /* deadline */, false /* systemConfigTrigger */))
_ = contextutil.RunWithTimeout(ctx, "async txn rollback", asyncRollbackTimeout,
func(ctx context.Context) error {
if _, pErr := txn.Send(ctx, ba); pErr != nil {
if statusErr, ok := pErr.GetDetail().(*roachpb.TransactionStatusError); ok &&
statusErr.Reason == roachpb.TransactionStatusError_REASON_TXN_COMMITTED {
// A common cause of these async rollbacks failing is when they're
// triggered by a ctx canceled while a commit is in-flight (and it's too
// late for it to be canceled), and so the rollback finds the txn to be
// already committed. We don't spam the logs with those.
log.VEventf(ctx, 2, "async rollback failed: %s", pErr)
} else {
log.Infof(ctx, "async rollback failed: %s", pErr)
}
}
}
return nil
})
}); err != nil {
cancel()
return nil
})
}); err != nil {
return roachpb.NewError(err)
}
return nil
Expand Down
20 changes: 14 additions & 6 deletions pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -1955,9 +1955,13 @@ func (s *statusServer) iterateNodes(
defer cancel()
for nodeID := range nodeStatuses {
nodeID := nodeID // needed to ensure the closure below captures a copy.
if err := s.stopper.RunLimitedAsyncTask(
ctx, fmt.Sprintf("server.statusServer: requesting %s", errorCtx),
sem, true, /* wait */
if err := s.stopper.RunAsyncTaskEx(
ctx,
stop.TaskOpts{
TaskName: fmt.Sprintf("server.statusServer: requesting %s", errorCtx),
Sem: sem,
WaitForSem: true,
},
func(ctx context.Context) { nodeQuery(ctx, nodeID) },
); err != nil {
return err
Expand Down Expand Up @@ -2046,9 +2050,13 @@ func (s *statusServer) paginatedIterateNodes(
for idx, nodeID := range nodeIDs {
nodeID := nodeID // needed to ensure the closure below captures a copy.
idx := idx
if err := s.stopper.RunLimitedAsyncTask(
ctx, fmt.Sprintf("server.statusServer: requesting %s", errorCtx),
sem, true, /* wait */
if err := s.stopper.RunAsyncTaskEx(
ctx,
stop.TaskOpts{
TaskName: fmt.Sprintf("server.statusServer: requesting %s", errorCtx),
Sem: sem,
WaitForSem: true,
},
func(ctx context.Context) { paginator.queryNode(ctx, nodeID, idx) },
); err != nil {
return pagState, err
Expand Down
20 changes: 16 additions & 4 deletions pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -1932,8 +1932,14 @@ func (m *Manager) refreshSomeLeases(ctx context.Context) {
for i := range ids {
id := ids[i]
wg.Add(1)
if err := m.stopper.RunLimitedAsyncTask(
ctx, fmt.Sprintf("refresh descriptor: %d lease", id), m.sem, true /*wait*/, func(ctx context.Context) {
if err := m.stopper.RunAsyncTaskEx(
ctx,
stop.TaskOpts{
TaskName: fmt.Sprintf("refresh descriptor: %d lease", id),
Sem: m.sem,
WaitForSem: true,
},
func(ctx context.Context) {
defer wg.Done()
if _, err := acquireNodeLease(ctx, m, id); err != nil {
log.Infof(ctx, "refreshing descriptor: %d lease failed: %s", id, err)
Expand Down Expand Up @@ -1996,8 +2002,14 @@ SELECT "descID", version, expiration FROM system.public.lease AS OF SYSTEM TIME
version: int(tree.MustBeDInt(row[1])),
expiration: tree.MustBeDTimestamp(row[2]),
}
if err := m.stopper.RunLimitedAsyncTask(
ctx, fmt.Sprintf("release lease %+v", lease), m.sem, true /*wait*/, func(ctx context.Context) {
if err := m.stopper.RunAsyncTaskEx(
ctx,
stop.TaskOpts{
TaskName: fmt.Sprintf("release lease %+v", lease),
Sem: m.sem,
WaitForSem: true,
},
func(ctx context.Context) {
m.storage.release(ctx, m.stopper, &lease)
log.Infof(ctx, "released orphaned lease: %+v", lease)
wg.Done()
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/opt/exec/execbuilder/testdata/select
Original file line number Diff line number Diff line change
Expand Up @@ -1772,6 +1772,7 @@ WHERE message LIKE 'querying next range at /Table/74/1%' OR
message = '=== SPAN START: kv.DistSender: sending partial batch ==='
----
querying next range at /Table/74/1/0/0
=== SPAN START: kv.DistSender: sending partial batch ===
querying next range at /Table/74/1/10/0

# Test for 42202 -- ensure filters can get pushed down through project-set.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ SET tracing = on,kv,results; INSERT INTO t.kv(k, v) VALUES (1,2); SET tracing =

query TT
SELECT operation, message FROM [SHOW KV TRACE FOR SESSION]
WHERE operation != 'dist sender send'
WHERE operation != 'dist sender send' AND operation != 'kv.DistSender: sending partial batch'
----
batch flow coordinator CPut /Table/54/1/1/0 -> /TUPLE/2:2:Int/2
batch flow coordinator InitPut /Table/54/2/2/0 -> /BYTES/0x89
Expand All @@ -87,7 +87,7 @@ SET tracing = on,kv,results; INSERT INTO t.kv(k, v) VALUES (1,2); SET tracing =
query TT
set tracing=off;
SELECT operation, message FROM [SHOW KV TRACE FOR SESSION]
WHERE operation != 'dist sender send'
WHERE operation != 'dist sender send' AND operation != 'kv.DistSender: sending partial batch'
----
batch flow coordinator CPut /Table/54/1/1/0 -> /TUPLE/2:2:Int/2
batch flow coordinator InitPut /Table/54/2/2/0 -> /BYTES/0x89
Expand All @@ -99,7 +99,7 @@ SET tracing = on,kv,results; INSERT INTO t.kv(k, v) VALUES (2,2); SET tracing =
query TT
set tracing=off;
SELECT operation, message FROM [SHOW KV TRACE FOR SESSION]
WHERE operation != 'dist sender send'
WHERE operation != 'dist sender send' AND operation != 'kv.DistSender: sending partial batch'
----
batch flow coordinator CPut /Table/54/1/2/0 -> /TUPLE/2:2:Int/2
batch flow coordinator InitPut /Table/54/2/2/0 -> /BYTES/0x8a
Expand Down Expand Up @@ -176,7 +176,7 @@ SET tracing = on,kv,results; DELETE FROM t.kv; SET tracing = off

query TT
SELECT operation, message FROM [SHOW KV TRACE FOR SESSION]
WHERE operation != 'dist sender send'
WHERE operation != 'dist sender send' AND operation != 'kv.DistSender: sending partial batch'
----
colbatchscan Scan /Table/54/{1-2}
colbatchscan fetched: /kv/primary/1/v -> /2
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/opt/exec/execbuilder/testdata/upsert_nonmetamorphic
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ SET tracing = on,kv,results; UPSERT INTO t.kv(k, v) VALUES (2,3); SET tracing =

query TT
SELECT operation, message FROM [SHOW KV TRACE FOR SESSION]
WHERE operation != 'dist sender send'
WHERE operation != 'dist sender send' AND operation != 'kv.DistSender: sending partial batch'
----
colbatchscan Scan /Table/55/1/2/0
batch flow coordinator CPut /Table/55/1/2/0 -> /TUPLE/2:2:Int/3
Expand All @@ -49,7 +49,7 @@ SET tracing = on,kv,results; UPSERT INTO t.kv(k, v) VALUES (1,2); SET tracing =

query TT
SELECT operation, message FROM [SHOW KV TRACE FOR SESSION]
WHERE operation != 'dist sender send'
WHERE operation != 'dist sender send' AND operation != 'kv.DistSender: sending partial batch'
----
colbatchscan Scan /Table/55/1/1/0
batch flow coordinator CPut /Table/55/1/1/0 -> /TUPLE/2:2:Int/2
Expand All @@ -63,7 +63,7 @@ SET tracing = on,kv,results; UPSERT INTO t.kv(k, v) VALUES (2,2); SET tracing =
query TT
set tracing=off;
SELECT operation, message FROM [SHOW KV TRACE FOR SESSION]
WHERE operation != 'dist sender send'
WHERE operation != 'dist sender send' AND operation != 'kv.DistSender: sending partial batch'
----
colbatchscan Scan /Table/55/1/2/0
colbatchscan fetched: /kv/primary/2/v -> /3
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ func TestTrace(t *testing.T) {

// These are always appended, even without the test specifying it.
alwaysOptionalSpans := []string{
"[async] drain",
"[async] storage.pendingLeaseRequest: requesting lease",
"[async] storage.Store: gossip on capacity change",
"drain",
"storage.pendingLeaseRequest: requesting lease",
"storage.Store: gossip on capacity change",
"outbox",
"request range lease",
"range lookup",
Expand Down
12 changes: 7 additions & 5 deletions pkg/ts/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (s *Server) Query(
}

// Start a task which is itself responsible for starting per-query worker
// tasks. This is needed because RunLimitedAsyncTask can block; in the
// tasks. This is needed because RunAsyncTaskEx can block; in the
// case where a single request has more queries than the semaphore limit,
// a deadlock would occur because queries cannot complete until
// they have written their result to the "output" channel, which is
Expand All @@ -233,11 +233,13 @@ func (s *Server) Query(
queryIdx := queryIdx
query := query

if err := s.stopper.RunLimitedAsyncTask(
if err := s.stopper.RunAsyncTaskEx(
ctx,
"ts.Server: query",
s.workerSem,
true, /* wait */
stop.TaskOpts{
TaskName: "ts.Server: query",
Sem: s.workerSem,
WaitForSem: true,
},
func(ctx context.Context) {
// Estimated source count is either the count of requested sources
// *or* the estimated cluster node count if no sources are specified.
Expand Down
Loading